Do you use docker-compose? I think the ip's of the advertised listeners might be wrong. I usually do it like https://github.com/gklijs/bkes-demo/blob/main/docker-cluster.yml so that I can easily connect from other docker compose files, or locally.
Hey, I've removed all the docker containers and set the kafka and zookeeper everything manually.
I can read the events now
bin/kafka-console-consumer.sh --topic public.app.chatrooms --from-beginning --bootstrap-server localhost:9092
this is returning the updates whenever I change a chatroom titleNow when I try to poll the topic like this:
(defn poll-kafka
[{consumer :com.app.graphql.kafka/consumer}]
(doseq [{:keys [value]} (jc/poll consumer 100)]
(prn "Got record from Kafka" value)
)
(.commitSync consumer))
It returned this:
INFO [org.apache.kafka.clients.Metadata:365] - Cluster ID: AIfhN5dkSsCmMx7QOpcIQQ
INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675] - [Consumer clientId=consumer-1, groupId=app] Discovered group coordinator 192.168.0.100:9092 (id: 2147483647 rack: null)
INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:459] - [Consumer clientId=consumer-1, groupId=app] Revoking previously assigned partitions []
INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491] - [Consumer clientId=consumer-1, groupId=app] (Re-)joining group
INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491] - [Consumer clientId=consumer-1, groupId=app] (Re-)joining group
INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator:455] - [Consumer clientId=consumer-1, groupId=app] Successfully joined group with generation 8
INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:290] - [Consumer clientId=consumer-1, groupId=app] Setting newly assigned partitions: public.app.chatrooms-0
nil
Shouldn't it be showing the messages? instead of nil? I mean there aren't any errors though.
Not 100% sure, since I never used jackdaw. But the first or first few times calling poll usually end up empty, since it needs to fetch meta data + connect to the leaders.
You might want to try seeking to the beginning of the topic for your initial testing. Here is some rough code I've been using to read a topic:
(defn consume-adsb-msgs
[msgs-chan]
(with-open [consumer (-> (jc/consumer consumer-config topic-serde)
(jc/subscribe [topic-foo])
jc/seek-to-end-eager)]
(while true
(let [messages (jc/poll consumer 1000)]
(println "poll returned, count:" (count messages))
(when (and messages (not (empty? messages)))
(>!! msgs-chan messages)
(.commitSync consumer))))))
On the other end of that channel, I doseq
on the messages, and process each message indivdually
something like this:
(defn prt-msgs-now
[]
(let [msgs-chan (chan)
consumer-thread (thread (consume-adsb-msgs msgs-chan))]
(while true
(let [adsb-messages (<!! msgs-chan)]
(doseq [{:keys [key value partition timestamp offset] :as message} adsb-messages]