jackdaw

https://github.com/FundingCircle/jackdaw
finchharold 2021-04-22T11:16:52.041600Z

Hey I got it, thank you very much @gphilipp, @dstephens, and @ddouglass.

finchharold 2021-04-22T11:17:57.042400Z

As I was using the Debezium connector, it will create topics for all tables in the database. So, I can directly refer to those.

finchharold 2021-04-22T11:18:27.043100Z

The main reason I got confused was that I was using integrate to setup the whole thing and that kinda got messed up!

finchharold 2021-04-22T11:20:24.044200Z

Now jackdaw.client/subscribe will just subscribe the given consumer to a specified topic and returns the consumer right? So, how to print the consumed data?

Daniel Stephens 2021-04-22T11:33:39.044600Z

when using the consumer patter you usually start a thread that repeatedly polls the consumer, if you want to avoid all the complexities of that, using kafka streams handles a lot of the thread management for you

finchharold 2021-04-22T12:17:16.044800Z

Oh. Basically, I'm trying to use the consumer inside the graphql streamer. Like just if the consumer has some data then just send a callback function, nothing more. So, any suggestions on how to do this? just if the consumer has data then send a callback...

finchharold 2021-04-22T14:02:24.045300Z

Have you guys tried Kafka with graphql?

👍 1
Daniel Stephens 2021-04-22T14:04:14.045400Z

Is this to serve subscriptions? That's my only understanding of graphql streamers, in which case I would do exactly as my previous comments says, and use a thread to poll and consumer and push messages down any open web sockets

finchharold 2021-04-22T14:16:45.045800Z

Yeah for subscriptions... I wrote a method for polling...

(defmethod ig/init-key ::should-poll? [_ should-poll?]
  (volatile! should-poll?))


(defmethod ig/halt-key! ::should-poll? [_ should-poll?]
  (vreset! should-poll? false))


(defmethod ig/prep-key ::poll-timeout [_ timeout]
  (read-string (or timeout "100")))


(defmethod ig/init-key ::poll-timeout [_ timeout]
  timeout)

(defn start-polling
  [system]
  (let [consumer (::consumer system)
        poll-timeout (::poll-timeout system)
        should-poll? (::should-poll? system)]
    (timbre/info "Starting message polling" poll-timeout @should-poll?)
    (while (true? @should-poll?)
      (doseq [{topic :topic-name :as record} (jc/poll consumer
                                                      poll-timeout)]
        ;; Process message synchronous
        (channel/process-event! {:topic (->kebab-case-keyword topic)
                                 :system system
                                 :data record})))))
Now how do I use this for graphql subscriptions? like should I just call this method...?

Daniel Stephens 2021-04-22T15:43:09.046100Z

pretty much, it's a bit involved as there's lots of moving parts so you might have to work out the smaller details. There's basically one big issue with what you are doing now, that if you have multiple subscribers then they share a consumer and so they will get only a share of all the messages. The solution I take to this is to take everything from the consumer and put messages onto another event-bus that you can subscribe multiple consumers to, I use manifold. Then in start polling rather than referencing the consumer you reference the manifold event-bus and subscribe to that. An alternative might be to create a new consumer for each subscription with a new group-id, but I expect this would be slow

finchharold 2021-04-22T15:56:31.046500Z

Yeah that's what I just thought. I currently have a single consumer and there are 3 topics and 3 streamers so if I just do poll inside the 3 streamers then how can a streamer know when to trigger if any of those 3 topics have changed...?

finchharold 2021-04-22T16:01:40.046700Z

Each streamer have to get triggered the moment the respective topic it is related to gets updated.

Daniel Stephens 2021-04-22T16:13:47.046900Z

when the streamer function gets called, you begin consuming continuously, so it always gets everything that happens whilst the streamer is running.

finchharold 2021-04-22T16:23:24.047100Z

Yeah so can't we filter it? like lets say 1-5 no's belong to topic A, 5-10 belong to topic B. There's a streamer for A and streamer for B. Now, if we just use the poll method then it shows whatever the topic has changed last right? so now if both he streamers are running and we have a poll method inside then if topic A changes then both the streamers may get triggered an instead of only A... so can't filter it like when topic A gets data change then streamer A should trigger and similarly for B.

finchharold 2021-04-22T16:28:22.047300Z

I just tried calling the above start-polling method inside the streamer function and now when I run the subscription, I'm getting this:

An error occurred in subscription.
Error: Cannot invoke "java.util.concurrent.Future.get()" because "fut" is null