Skip to content

Commit

Permalink
adopted version; not ready for normal use yet
Browse files Browse the repository at this point in the history
  • Loading branch information
source-c committed Oct 15, 2018
1 parent 20de27b commit e2127d2
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 32 deletions.
2 changes: 1 addition & 1 deletion README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Add the following to your http://github.com/technomancy/leiningen[Leiningen's]

[source,clojure]
----
[net.tbt-post/clj-kafka-x "0.3.0"]
[net.tbt-post/clj-kafka-x "0.3.1"]
----

== Usage
Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject net.tbt-post/clj-kafka-x "0.3.0"
(defproject net.tbt-post/clj-kafka-x "0.3.1"
:description "A Clojure wrapper for Apache Kafka v2.0.0 client"
:url "https://github.com/source-c/clj-kafka-x"
:license {:name "Apache License 2.0"
Expand Down
16 changes: 8 additions & 8 deletions src/clj_kafka_x/admin.clj
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
(ns clj-kafka-x.admin
(:require [clj-kafka-x.data :refer [map->properties]])
(:import kafka.admin.AdminUtils
kafka.admin.RackAwareMode.Enforced
(kafka.admin RackAwareMode RackAwareMode$Enforced$)
kafka.utils.ZkUtils))

(defn zk-utils
""
[zk-url & {:keys [session-timeout connection-timeout security-enabled]
:or {session-timeout 1000
connection-timeout 1000
security-enabled false}}]
:or {session-timeout 1000
connection-timeout 1000
security-enabled false}}]
(ZkUtils/apply zk-url session-timeout connection-timeout (Boolean/valueOf security-enabled)))


Expand All @@ -30,15 +30,15 @@
(create-topic z-utils \"topic-b\" :topic-config {\"cleanup.policy\" \"compact\"})
"
[z-utils topic & {:keys [partitions replication-factor topic-config]
:or {partitions 1
replication-factor 1
topic-config nil}}]
:or {partitions 1
replication-factor 1
topic-config nil}}]
(AdminUtils/createTopic z-utils
topic
(int partitions)
(int replication-factor)
(map->properties topic-config)
(Enforced)))
RackAwareMode$Enforced$))

(defn topic-exists?
"Returns true or false dependant on the existance of the given topic"
Expand Down
17 changes: 7 additions & 10 deletions src/clj_kafka_x/consumers/ballanced.clj
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
(ns clj-kafka-x.consumers.ballanced
(:require [clojure.string :refer [join]])
(:import (java.util.concurrent Executors)
(kafka.consumer Consumer ConsumerConfig)
(org.apache.kafka.clients.producer KafkaProducer)
(org.apache.kafka.clients.consumer KafkaConsumer Consumer ConsumerConfig ConsumerRecords)
(java.util Properties)
(kafka.consumer KafkaStream)
(clojure.lang PersistentArrayMap)
(org.apache.kafka.clients.producer ProducerRecord)))
(clojure.lang PersistentArrayMap)))

(defrecord KafkaMessage [topic partition offset key message])

Expand All @@ -18,15 +15,15 @@
([stream thread-num spec]
(consume-messages stream thread-num spec (:id spec)))
([stream thread-num spec id]
(let [it (.iterator ^KafkaStream stream)]
(let [it (.iterator ^ConsumerRecords stream)]
(while (.hasNext it)
(let [msg (.next it)
kmsg (KafkaMessage.
(.topic msg)
(.partition msg)
(.offset msg)
(.key msg)
(.message msg))
(.value msg))
prc @(resolve (:processor spec))]

(process-msg prc kmsg id)))
Expand Down Expand Up @@ -84,7 +81,7 @@
([config blist prefix]
(create-consumer blist prefix (:group (:consumer config))))
([config blist prefix grname]
(Consumer/createJavaConsumerConnector
(KafkaConsumer.
(create-consumer-config config (->zklist blist prefix) grname))))

(defn- shutdown-topic [topic obj]
Expand All @@ -110,10 +107,10 @@
tpool (Executors/newFixedThreadPool psize)
consumer (create-consumer zkpool zkpref group)]
(try (swap! storage merge
{T {:instance (hk/consume-topic
{T {:instance (consume-topic
consumer T
tpool psize
(get topics-list T))
:pool tpool}})
(catch Exception e (do (shutdown-topic T {:pool tpool}))))))
storage)
storage)
13 changes: 1 addition & 12 deletions src/clj_kafka_x/data.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
(:import [java.util HashMap Map Properties]
[org.apache.kafka.clients.consumer ConsumerRecord ConsumerRecords OffsetAndMetadata]
org.apache.kafka.clients.producer.RecordMetadata
[org.apache.kafka.common Metric MetricName Node PartitionInfo TopicPartition]

(kafka.consumer Consumer ConsumerConfig KafkaStream)
(kafka.message MessageAndMetadata)))
[org.apache.kafka.common Metric MetricName Node PartitionInfo TopicPartition]))

(defprotocol ToClojure
""
Expand All @@ -30,14 +27,6 @@
:key (.key x)
:value (.value x)})

MessageAndMetadata
(to-clojure [x]
{:topic (.topic x)
:partition (.partition x)
:offset (.offset x)
:key (.key x)
:value (.message x)}) ;; FIME: :value/:message

ConsumerRecords
(to-clojure [x]
(mapv to-clojure x))
Expand Down

0 comments on commit e2127d2

Please sign in to comment.