jackdaw

https://github.com/FundingCircle/jackdaw
gklijs 2021-04-29T06:55:04.108Z

Do you use docker-compose? I think the ip's of the advertised listeners might be wrong. I usually do it like https://github.com/gklijs/bkes-demo/blob/main/docker-cluster.yml so that I can easily connect from other docker compose files, or locally.

finchharold 2021-04-29T16:09:51.109100Z

Hey, I've removed all the docker containers and set the kafka and zookeeper everything manually.

finchharold 2021-04-29T16:10:20.109400Z

I can read the events now

bin/kafka-console-consumer.sh --topic public.app.chatrooms --from-beginning --bootstrap-server localhost:9092
this is returning the updates whenever I change a chatroom title

finchharold 2021-04-29T16:11:16.110200Z

Now when I try to poll the topic like this:

(defn poll-kafka
  [{consumer :com.app.graphql.kafka/consumer}]
  (doseq [{:keys [value]} (jc/poll consumer 100)]
    (prn "Got record from Kafka" value)
    )
  (.commitSync consumer))
It returned this:
INFO [org.apache.kafka.clients.Metadata:365] - Cluster ID: AIfhN5dkSsCmMx7QOpcIQQ
INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675] - [Consumer clientId=consumer-1, groupId=app] Discovered group coordinator 192.168.0.100:9092 (id: 2147483647 rack: null)
INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:459] - [Consumer clientId=consumer-1, groupId=app] Revoking previously assigned partitions []
INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491] - [Consumer clientId=consumer-1, groupId=app] (Re-)joining group
INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491] - [Consumer clientId=consumer-1, groupId=app] (Re-)joining group
INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator:455] - [Consumer clientId=consumer-1, groupId=app] Successfully joined group with generation 8
INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:290] - [Consumer clientId=consumer-1, groupId=app] Setting newly assigned partitions: public.app.chatrooms-0
nil

finchharold 2021-04-29T16:11:42.110800Z

Shouldn't it be showing the messages? instead of nil? I mean there aren't any errors though.

finchharold 2021-04-29T16:12:15.111200Z

@cddr @dstephens @gphilipp @gklijs

gklijs 2021-04-29T16:17:32.111500Z

Not 100% sure, since I never used jackdaw. But the first or first few times calling poll usually end up empty, since it needs to fetch meta data + connect to the leaders.

2021-04-29T17:55:21.112900Z

You might want to try seeking to the beginning of the topic for your initial testing. Here is some rough code I've been using to read a topic:

(defn consume-adsb-msgs
  [msgs-chan]
  (with-open [consumer (-> (jc/consumer consumer-config topic-serde)
                              (jc/subscribe [topic-foo])
                              jc/seek-to-end-eager)]
    (while true
      (let [messages (jc/poll consumer 1000)]
        (println "poll returned, count:" (count messages))
        (when (and messages (not (empty? messages)))
          (>!! msgs-chan messages)
          (.commitSync consumer))))))

2021-04-29T18:10:02.113700Z

On the other end of that channel, I doseq on the messages, and process each message indivdually

2021-04-29T18:13:26.114400Z

something like this:

(defn prt-msgs-now
  []
  (let [msgs-chan   (chan)
        consumer-thread (thread (consume-adsb-msgs msgs-chan))]
    (while true
      (let [adsb-messages (<!! msgs-chan)]
        (doseq [{:keys [key value partition timestamp offset] :as message} adsb-messages]