Let's say there are two topics A and B and we want to do something like this if the data is produced to topic A then do something if data is produced to topic B then do something. Is this achievable? I mean is there a way to see if a specific topic has been updated when we've multiple topics...?
or just read a message from a specific topic?
poll
(poll consumer timeout)
Polls kafka for new messages, returning a potentially empty sequence
of datafied messages.
That's for reading the messages right? But if it only takes the consumer as the argument, then how to poll from a certain topic if one consumer is subscribed to 2 topics...?If you want to do something different for messages in each topic, why not have a different consumer for each one rather than consuming both with the same consumer?
I'm using it with graphql, so I don't know if it's feasible to have a consumer for each streamer...?
Can't I read from a specific topic?
Not if you set it up to read from many no
How do I do that? Now I'm subscribing to two topics and I just want to read from each separately...
Subscribe to just one topic and read the messages from it
Yeah, that is fine, I want to have all the chatroom-related data in the chatroom topic and all the templates related data in the template topic. So, now I'm producing data into both the topics and made the consumer subscribe to both. Now, in one function I want to read the messages from the chatroom topic, and in another, I want to read from the template topic.
What do you do with the data once you've read it?
I'm throwing it onto a core.async channel
Ok but after that? Is it going into some database?
Now, inside the graphql streamer, I'm using it. Like once the channel gets any data, it triggers the streamer thereby the streamer sends the callback in real-time...
Basically before I was using the atom, like whenever the mutation updates the data in database it throws that into the atom as well and inside the streamer I used to have an add-watch method which triggers whenever an atom changes and then it used to send a callback.
Now I have debezium producing the data to the Kafka topics, but I need something to trigger the streamer that a certain topic has been updated so that it can send the callback.
So, I thought to take the data from a topic and throw it into a channel and then from that the streamer acts...
I'm not sure exactly what a graphql streamer is. Is the scenario something like this? User submits a request Request causes update in dB dB changes cascade into the two Kafka topics Your app reads the topics and needs to respond to the user request? Or otherwise update some state on the client?
Graphql streamer shows the updates in real-time whenever any data changes in the database.
So can I do this: In one function I want to read the messages from the chatroom topic, and in another, I want to read from the template topic.
I'd probably adjust the payload you put on core.async to include the topic name. Then where you're consuming the channel, you can add a filter to exclude messages from the topic you don't want
But then I again it doesn’t work for graphql streamers na...? The chatroom streamer should trigger when the chatroom topic has some updates and the template streamer should trigger when the template topic has the updates.
But the streamer is fed by a core.async channel right?
Yeah, I mean but wait, yeah it makes sense!!!
So, basically how do I read the messages from any topic though?
with poll method?
Again, not super familiar with core.async but as I understand them, I think you can create multiple channels and link them through transducers. So one channel receives the data from Kafka (all subscribed topics), and then two channels are created and connected to the first each with a transducer that adapts the raw Kafka input as necessary for its target streamer
Got that, thank you. How do I read the data from kafka topic?
This is the method right?
poll
(poll consumer timeout)
Polls kafka for new messages, returning a potentially empty sequence
of datafied messages.
Yep
Now it takes only the consumer as the arg, so if topic 1 has produced a message then running this poll func returns what?
message from topic 1?
Yep
Same for topic 2. So, whichever the topic has got the data, the poll method returns from it/
?
Yep exactly
The messages returned from poll are "records" which include not only the key/value of the message but additional metadata like offset, partition, and topic so I think you can use that to augment the payload you feed to the core.async channel
So, basically, it's like consuming from a specific topic now right? As poll returns the recently updated topic data....?
This is terrific!!
Yeah in core.async I'll flter it...
I tried to poll it like this,
(while true
(doseq [msg (jc/poll consumer 1000)]
(println msg)))
But I'm getting this:
class clojure.lang.PersistentArrayMap cannot be cast to class org.apache.kafka.clients.consumer.Consumer (clojure.lang.PersistentArrayMap and org.apache.kafka.clients.consumer.Consumer are in unnamed module of loader 'app'
May I see the code that constructs that consumer?
Think it's time to read the huge stuff printed out when you printed your consumer: https://clojurians.slack.com/archives/CEA3C7UG0/p1619182766054100
Sure...? its pretty hughe
Well, let's make it smaller by getting just the keys
Saying its 25k characters long and can't send it here!
Even just the keys?
No No
the entire thing
how to get only the keys?
(keys consumer)
should I print that? or ??
Yes. We're trying to debug your program. There's something which at the moment we think is a consumer object but in reality it is a map. We want to print out the keys to give us an idea of what that map is supposed to be in the hope that perhaps you can extract your consumer out of it.
I mean yeah I got that but should I do println keys consumer or ... ?
yes
Or just upload the whole "consumer" as a gist (and then provide a link) rather than copying it into slack which has the limit as you saw
If you opt for the latter, you might need to make sure there's no secrets you wouldn't want exposed to the public
I did this:
(println "This is consumer" (keys consumer))
Got this:
This is consumer (:request :com.walmartlabs.lacinia.constants/parsed-query :com.walmartlabs.lacinia.constants/schema :com.walmartlabs.lacinia/container-type-name :com.walmartlabs.lacinia/selection)
Gist it's pasting in a single line...is screenshot good enough?
ok, so clearly that looks more like an http request than a consumer instance.
Now...?
So, I guess, it means I'm passing map as consumer not the instance of consumer...?
I suppose back to the integrant docs to see how you're supposed to get at the services defined in integrant config
Yes that is what it means
I think there must be an ig/init
in your code somewhere that sets a "system" var. I think the consumer will be in that.
I dunno, suspect you'd get better advice about this aspect in #integrant
Okay. Thank you for your time.
No worries. Sorry I couldn't help you get over the line with your problem but I think you're close to a solution :)
Yeah just need to get the poll working...