jackdaw

https://github.com/FundingCircle/jackdaw
finchharold 2021-04-24T02:28:03.056800Z

Let's say there are two topics A and B and we want to do something like this if the data is produced to topic A then do something if data is produced to topic B then do something. Is this achievable? I mean is there a way to see if a specific topic has been updated when we've multiple topics...?

finchharold 2021-04-24T08:20:16.057200Z

or just read a message from a specific topic?

finchharold 2021-04-24T09:08:25.058300Z

poll
(poll consumer timeout)
Polls kafka for new messages, returning a potentially empty sequence
of datafied messages.
That's for reading the messages right? But if it only takes the consumer as the argument, then how to poll from a certain topic if one consumer is subscribed to 2 topics...?

2021-04-24T09:15:36.058400Z

If you want to do something different for messages in each topic, why not have a different consumer for each one rather than consuming both with the same consumer?

finchharold 2021-04-24T09:17:32.059Z

I'm using it with graphql, so I don't know if it's feasible to have a consumer for each streamer...?

finchharold 2021-04-24T09:17:44.059300Z

Can't I read from a specific topic?

2021-04-24T09:19:08.059400Z

Not if you set it up to read from many no

finchharold 2021-04-24T09:19:41.060Z

How do I do that? Now I'm subscribing to two topics and I just want to read from each separately...

2021-04-24T09:20:23.060100Z

Subscribe to just one topic and read the messages from it

finchharold 2021-04-24T09:22:29.062300Z

Yeah, that is fine, I want to have all the chatroom-related data in the chatroom topic and all the templates related data in the template topic. So, now I'm producing data into both the topics and made the consumer subscribe to both. Now, in one function I want to read the messages from the chatroom topic, and in another, I want to read from the template topic.

2021-04-24T09:23:09.062400Z

What do you do with the data once you've read it?

finchharold 2021-04-24T09:23:28.062800Z

I'm throwing it onto a core.async channel

2021-04-24T09:24:23.063Z

Ok but after that? Is it going into some database?

finchharold 2021-04-24T09:25:30.064200Z

Now, inside the graphql streamer, I'm using it. Like once the channel gets any data, it triggers the streamer thereby the streamer sends the callback in real-time...

finchharold 2021-04-24T09:27:14.065400Z

Basically before I was using the atom, like whenever the mutation updates the data in database it throws that into the atom as well and inside the streamer I used to have an add-watch method which triggers whenever an atom changes and then it used to send a callback.

finchharold 2021-04-24T09:27:59.066400Z

Now I have debezium producing the data to the Kafka topics, but I need something to trigger the streamer that a certain topic has been updated so that it can send the callback.

finchharold 2021-04-24T09:28:29.067100Z

So, I thought to take the data from a topic and throw it into a channel and then from that the streamer acts...

2021-04-24T09:30:57.067200Z

I'm not sure exactly what a graphql streamer is. Is the scenario something like this? User submits a request Request causes update in dB dB changes cascade into the two Kafka topics Your app reads the topics and needs to respond to the user request? Or otherwise update some state on the client?

finchharold 2021-04-24T09:33:27.068Z

Graphql streamer shows the updates in real-time whenever any data changes in the database.

finchharold 2021-04-24T09:35:22.068300Z

So can I do this: In one function I want to read the messages from the chatroom topic, and in another, I want to read from the template topic.

2021-04-24T09:35:49.068400Z

I'd probably adjust the payload you put on core.async to include the topic name. Then where you're consuming the channel, you can add a filter to exclude messages from the topic you don't want

finchharold 2021-04-24T09:37:48.069900Z

But then I again it doesn’t work for graphql streamers na...? The chatroom streamer should trigger when the chatroom topic has some updates and the template streamer should trigger when the template topic has the updates.

2021-04-24T09:38:41.070Z

But the streamer is fed by a core.async channel right?

finchharold 2021-04-24T09:41:25.070600Z

Yeah, I mean but wait, yeah it makes sense!!!

finchharold 2021-04-24T09:41:41.071Z

So, basically how do I read the messages from any topic though?

finchharold 2021-04-24T09:41:48.071200Z

with poll method?

2021-04-24T09:42:34.071400Z

Again, not super familiar with core.async but as I understand them, I think you can create multiple channels and link them through transducers. So one channel receives the data from Kafka (all subscribed topics), and then two channels are created and connected to the first each with a transducer that adapts the raw Kafka input as necessary for its target streamer

finchharold 2021-04-24T09:43:04.071800Z

Got that, thank you. How do I read the data from kafka topic?

finchharold 2021-04-24T09:43:21.072100Z

This is the method right?

poll
(poll consumer timeout)
Polls kafka for new messages, returning a potentially empty sequence
of datafied messages.

2021-04-24T09:43:25.072200Z

Yep

finchharold 2021-04-24T09:43:57.072900Z

Now it takes only the consumer as the arg, so if topic 1 has produced a message then running this poll func returns what?

finchharold 2021-04-24T09:44:03.073100Z

message from topic 1?

2021-04-24T09:46:03.073200Z

Yep

finchharold 2021-04-24T09:46:53.073900Z

Same for topic 2. So, whichever the topic has got the data, the poll method returns from it/

finchharold 2021-04-24T09:46:55.074100Z

?

2021-04-24T09:47:34.074200Z

Yep exactly

2021-04-24T09:49:58.076Z

The messages returned from poll are "records" which include not only the key/value of the message but additional metadata like offset, partition, and topic so I think you can use that to augment the payload you feed to the core.async channel

finchharold 2021-04-24T09:50:36.076800Z

So, basically, it's like consuming from a specific topic now right? As poll returns the recently updated topic data....?

finchharold 2021-04-24T09:50:42.077Z

This is terrific!!

🚀 1
finchharold 2021-04-24T09:50:54.077300Z

Yeah in core.async I'll flter it...

finchharold 2021-04-24T09:51:50.077700Z

I tried to poll it like this,

(while true
  (doseq [msg (jc/poll consumer 1000)]
    (println msg)))

finchharold 2021-04-24T09:52:05.078Z

But I'm getting this:

class clojure.lang.PersistentArrayMap cannot be cast to class org.apache.kafka.clients.consumer.Consumer (clojure.lang.PersistentArrayMap and org.apache.kafka.clients.consumer.Consumer are in unnamed module of loader 'app'

2021-04-24T09:54:33.078800Z

May I see the code that constructs that consumer?

2021-04-24T09:58:14.079900Z

Think it's time to read the huge stuff printed out when you printed your consumer: https://clojurians.slack.com/archives/CEA3C7UG0/p1619182766054100

finchharold 2021-04-24T09:59:53.080300Z

Sure...? its pretty hughe

2021-04-24T10:00:55.081100Z

Well, let's make it smaller by getting just the keys

finchharold 2021-04-24T10:01:43.082Z

Saying its 25k characters long and can't send it here!

2021-04-24T10:01:53.082200Z

Even just the keys?

finchharold 2021-04-24T10:01:59.082400Z

No No

finchharold 2021-04-24T10:02:07.082600Z

the entire thing

finchharold 2021-04-24T10:02:13.082800Z

how to get only the keys?

2021-04-24T10:02:25.083200Z

(keys consumer)

finchharold 2021-04-24T10:02:45.083500Z

should I print that? or ??

2021-04-24T10:04:06.085Z

Yes. We're trying to debug your program. There's something which at the moment we think is a consumer object but in reality it is a map. We want to print out the keys to give us an idea of what that map is supposed to be in the hope that perhaps you can extract your consumer out of it.

finchharold 2021-04-24T10:05:34.086Z

I mean yeah I got that but should I do println keys consumer or ... ?

2021-04-24T10:05:52.086300Z

yes

2021-04-24T10:06:58.087200Z

Or just upload the whole "consumer" as a gist (and then provide a link) rather than copying it into slack which has the limit as you saw

2021-04-24T10:08:34.088100Z

If you opt for the latter, you might need to make sure there's no secrets you wouldn't want exposed to the public

finchharold 2021-04-24T10:09:09.088400Z

I did this:

(println "This is consumer" (keys consumer))
Got this:
This is consumer (:request :com.walmartlabs.lacinia.constants/parsed-query :com.walmartlabs.lacinia.constants/schema :com.walmartlabs.lacinia/container-type-name :com.walmartlabs.lacinia/selection)
Gist it's pasting in a single line...

finchharold 2021-04-24T10:09:41.089Z

is screenshot good enough?

2021-04-24T10:09:45.089100Z

ok, so clearly that looks more like an http request than a consumer instance.

finchharold 2021-04-24T10:11:08.089300Z

Now...?

finchharold 2021-04-24T10:13:23.090500Z

So, I guess, it means I'm passing map as consumer not the instance of consumer...?

2021-04-24T10:13:27.090700Z

I suppose back to the integrant docs to see how you're supposed to get at the services defined in integrant config

2021-04-24T10:13:36.091100Z

Yes that is what it means

2021-04-24T10:14:10.092Z

I think there must be an ig/init in your code somewhere that sets a "system" var. I think the consumer will be in that.

2021-04-24T10:18:06.094800Z

I dunno, suspect you'd get better advice about this aspect in #integrant

finchharold 2021-04-24T10:22:58.095100Z

Okay. Thank you for your time.

2021-04-24T10:24:23.095200Z

No worries. Sorry I couldn't help you get over the line with your problem but I think you're close to a solution :)

finchharold 2021-04-24T10:24:41.095500Z

Yeah just need to get the poll working...