jackdaw

https://github.com/FundingCircle/jackdaw
finchharold 2021-04-19T11:09:41.020400Z

I'm trying to use the Kafka producer and consumer concept, and in my case, the producer is the debezium-connector and the topics are also created by it. So, I just need to use the consumer to read the messages from the topics

finchharold 2021-04-19T11:10:20.020900Z

So, I configured my consumer via integrant like this...

(defmethod ig/prep-key ::consumer
  [_ {:keys [kafka-brokers kafka-group enable-auto-commit
             topics max-poll-records]
      :or {kafka-brokers "localhost:9092" kafka-group "myapp"
           enable-auto-commit false max-poll-records "100"}}]
  (timbre/info "Preparing consumer")
  {"bootstrap.servers" kafka-brokers
   "group.id" kafka-group
   "enable.auto.commit" enable-auto-commit
   "auto.offset.reset" "earliest"
   ;; Enviroment variable is always string
   "max.poll.records" (Integer/parseInt max-poll-records)
   "topics" (if topics
              (mapv #(hash-map :topic-name %) (str/split topics #","))
              (throw (IllegalArgumentException. "Kafka topics are
              required. You need specify atleast one topic.")))
   "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
   "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"})


(defmethod ig/init-key ::consumer [_ config]
  (timbre/info "Configuring Kafka consumer" config)
  (-> (jc/consumer (dissoc config :topics))
      (jc/subscribe (get config "topics"))))


(defmethod ig/halt-key! ::consumer [_ consumer]
  (timbre/info "Stopping Kafka consumer")
  (when consumer
    (.close consumer)))

finchharold 2021-04-19T11:10:36.021300Z

Now any idea how do I consume the messages using this consumer...?

finchharold 2021-04-19T11:12:16.022600Z

basically, I got stuck on how to get the topic name, like if the debezium is the one that is creating the topic and producing into it then how do I refer that to the consumer to use it...?

finchharold 2021-04-19T11:15:23.025100Z

I can even keep an eye on the Kafka logs for all the updates also via this command

docker run -it --network=docker-debezium_default --rm edenhill/kafkacat:1.6.0 kafkacat -C -b kafka:9092 -t myapp.public.chatrooms -o -10

finchharold 2021-04-19T11:15:41.025500Z

where myapp.public.chatrooms is the topic where all the updates are being produced

finchharold 2021-04-19T11:15:48.025800Z

but how do I use it in the code?

gphilipp 2021-04-19T11:16:16.026200Z

Hi @kishorekaranam99 I’ll have a look at your example shortly

finchharold 2021-04-19T11:16:39.026600Z

Thank you @gphilipp, much appreciated.

finchharold 2021-04-19T12:11:32.027400Z

Hey @gphilipp, did you have the chance to take a look at it? if not please do it whenever you can. Thank you.

gphilipp 2021-04-19T12:16:25.027600Z

Not yet

finchharold 2021-04-19T13:40:10.028Z

ohkay

Daniel Stephens 2021-04-19T15:11:23.030100Z

jc/subscribe (and jc/consumer) both return the consumer, you then need to call jc/poll on that consumer to read the messages. jc/poll returns just the first 'batch' of readable messages so you tend to call this in a while loop continuously:

(while @run
  (doseq [msg (jc/poll consumer 1000)]
    (println msg)))

Daniel Stephens 2021-04-19T15:11:30.030300Z

@kishorekaranam99

Daniel Stephens 2021-04-19T15:13:36.031300Z

The topic name format on the connector is likely to be configurable, the default is apparently server.schema.table so you should know the value in advance

Daniel Stephens 2021-04-19T15:22:00.033200Z

Looks like jackdaw doesn't wrap it, but you can use java interop on the consumer to subscribe to a topic-name Pattern which may be more helpful here, so you could subscribe to #"myapp\.public\..*" for example to pick up all the created topics.

gphilipp 2021-04-19T15:39:16.035400Z

When using the consumer directly the pattern to go with is like this:

(while my-app-is-running
  (let [records (jackdaw.client/poll consumer (Duration/ofMillis 1000))]
      (doseq [{:keys [value]} records]
        ;; do something with the value
      )
      (.commitSync consumer)))
  

gphilipp 2021-04-19T15:40:22.036Z

The other alternative is to use the Kafka Streams DSL, have a look at https://github.com/FundingCircle/jackdaw/blob/master/examples/word-count/src/word_count.clj