I set this to seek-to-beginning-eager, and still it's returning count=0.
and you know there are messages in that topic?
Yeah, just now I ran this command, bin/kafka-console-consumer.sh --topic app.public.chatrooms --from-beginning --bootstrap-server http://localhost:9092/
It should all the messages from beginning and even showed whenever I update the chatroom data...
showed*
And you set consumer-config and topic-serde ?
So, the consumer is already set and is subscribed to the topic...
Wait wait
@dcj Now it is returning this poll returned, count: 19 poll returned, count: 0 poll returned, count: 0 poll returned, count: 0 poll returned, count: 0
19 is good
you got 19
Yeah so now how can I get just the latest message? like if I changed something now, then it should return only that... is that possible?
seek to end eager
then publish more msgs to topic
Here, I'm running the prt-msgs-now function and it's returning only the count....?
It should return the message too right, along with the count?
the poll msg is in 1 thread what r u doing when you read the msgs out of the channel?
in the dosync u should print value
Yeah, I mean I did it like this
(defn prt-msgs-now
[{consumer :com.app.graphql.kafka/consumer}]
(let [msgs-chan (chan)
consumer-thread (thread (consume-adsb-msgs msgs-chan consumer))]
(while true
(let [adsb-messages (<!! msgs-chan)]
(doseq [{:keys [key value partition timestamp offset] :as message} adsb-messages]
(println value))))))
It is showing the message, thank you so much @dcj.
Just one query though
I tried it this way
(defn poll-kafka
[{consumer :com.app.graphql.kafka/consumer}]
(doseq [{:keys [value]} (jc/poll consumer 100)]
(prn "Got record from Kafka" value)
)
(.commitSync consumer))
Why do you think this didn't work?
Yeah got it, because I missed the seek?
Thank you again @dcj
I don't really get why you would want to send the messages to a channel through. It's hard to get either at least once or are most once guarantees that way imho.
I'm using it for graphql streamer
Actually I'd like to know is there a way to let the streamer know that there's something new in the kafka topic?
Can I do it like this...?
(with-open [consumer (jc/seek-to-end-eager consumer)]
(while true
(doseq [{:keys [value]} (jc/poll consumer 100)]
(send any method here???))))
(.commitSync consumer)
Can't I pole multiple times? Like run the poll-methd multiple times?
If you never commit, but not very efficient. I just check every message from Kafka if it should be send to one of the active subscriptions. https://github.com/gklijs/bkes-demo/blob/7d86b9b2ef2b014c0ef85b045de8559013d9e9ee/graphql-endpoint/src/nl/openweb/graphql_endpoint/transaction_service.clj#L42