jackdaw

https://github.com/FundingCircle/jackdaw
finchharold 2021-05-11T09:38:42.164600Z

Any one tried poll method with integrant?

gphilipp 2021-05-11T09:40:03.166Z

Integrant doesn’t really care about what you do in your components when they’re started, it just care about how to start and stop them.

finchharold 2021-05-11T10:28:21.166900Z

yeah, but starting the polling with integrant, is stoping the rest of the process.

finchharold 2021-05-11T10:29:09.167900Z

Like as I start the app, all I can see is the poll method running and I can apply the graphql mutations and subscriptions as it says connection refused(not running) and REPL is hung!

Darin Douglass 2021-05-11T10:41:28.169500Z

Polling is a blocking operation. Are you spinning up and polling the consumer in the main thread? https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

finchharold 2021-05-11T10:42:32.170Z

I mean, I'm running it with integrant, so yeah maybe it is running in the main thread!

finchharold 2021-05-11T10:43:24.170900Z

this is the method:

(defmethod ig/init-key ::poll [_ {consumer :consumer channel :channel}]
  (timbre/info "Starting polling")
  (with-open [consumer (jc/seek-to-end-eager consumer)]
    (while true
      (doseq [{:keys [value]} (jackclient/poll consumer 100)]
        (>!! (:channel channel) {:msg-type :update :data {:msg value}}))))
  (.commitSync consumer))

Darin Douglass 2021-05-11T10:44:14.172400Z

Ya that'll block the main thread. I’m fairly certain integrant won’t wrap states in threads

finchharold 2021-05-11T10:48:43.172500Z

I'm guessing the other issue is it doesn’t return anything...? like the init-key method needs something to be returning right? so that it can be the key?

finchharold 2021-05-11T10:49:10.172900Z

It just polls the value and puts it in a channel...

Darin Douglass 2021-05-11T11:12:04.173700Z

The value can (and likely should) be a thread so you can shut it down on halt

Darin Douglass 2021-05-11T11:14:41.174300Z

Or a go-loop or whatever concurrency primitive you choose to use

finchharold 2021-05-11T11:38:32.177300Z

I've tried it two ways:

(defmethod ig/init-key ::poll [_ {consumer :consumer channel :channel}]
  (timbre/info "Starting polling")
  (with-open [consumer (jc/seek-to-end-eager consumer)]
    (future
      (while true
        (doseq [{:keys [value]} (jc/poll consumer 100)]
          (>!! (:channel channel) {:msg-type :update :data {:msg value}}))))
   ); should future end here?
  (.commitSync consumer)))
This way the polling is running in the background and isn't blocking the main thread i.e, the REPL is function but it isn't actually putting anything in the channel, if I add a print statement there it isn't even showing up. Other one is I replaced future with thread, and got an exception saying Kafka is not safe for multi-threaded access.

Darin Douglass 2021-05-11T12:12:40.178300Z

That is correct, kafka consumers are not thread safe, so you’d want to move the consumer creation into the future

Darin Douglass 2021-05-11T12:14:58.180500Z

Which means NOT getting the consumer from another init-key

finchharold 2021-05-11T12:32:43.180800Z

So, how do I get the consumer then?

finchharold 2021-05-11T12:35:22.181900Z

I use a prep-key method for the consumer config, then pass the config into an init-key method, where I get the consumer to subscribe to the topics and ultimately return it.

gklijs 2021-05-11T12:36:25.182500Z

What about keeping the config in the init-key, and use that to create the consumer?

finchharold 2021-05-11T12:39:34.183Z

No config comes from prep-key

gphilipp 2021-05-11T12:44:04.184700Z

Your init-key code should call something like this (start-consumer!) fn:

(defn- poll!
  [consumer producer running]
  (while @running
    (let [records (jd/poll consumer (Duration/ofMillis 1000))]
      (doseq [{:keys [value]} records]
        ;; do stuff)
      (.commitSync consumer))))


(defn- start-consumer! [consumer producer topic-config running]
  (let [consumer-thread (Thread. #(poll! consumer producer running))]
    (jd/subscribe consumer [topic-config])
    (.start consumer-thread)
    consumer-thread))

gphilipp 2021-05-11T12:45:08.185600Z

So integrant with keep this Thread object under the system key you’ve associated it with.

gphilipp 2021-05-11T12:47:09.186800Z

In case you want to stop the system, you need to get hold of that object via the halt-key! lifecycle fn and stop the thread.

finchharold 2021-05-11T12:49:17.187200Z

consumer init-key method?

gphilipp 2021-05-11T12:49:42.187400Z

Yes

finchharold 2021-05-11T12:50:38.188500Z

How can the consumer init-key method call start-consumer! method? I mean consumer init-key method is the one that is returning the consumer...

gphilipp 2021-05-11T12:54:07.191Z

Your init-key function will call the start-consumer! fn which will start the background thread and return the Thread object. Integrant will just keep that in his system map, ready for you to look it up whenever you want if you want to stop it (or do anything you want with that Thread object).

finchharold 2021-05-11T12:55:57.191900Z

Yeah but start-consumer! needs the consumer as an argument...? but init-key will return the consumer...

gphilipp 2021-05-11T13:09:21.195100Z

Yeah but they are different, there’s the one that sits at the system (integrant) level, it’s what we call a system component and you should name differently, eg GraphqlExposer and under the hood it uses one (or maybe many) kafka consumer.

gphilipp 2021-05-11T13:10:13.195600Z

You can read https://github.com/stuartsierra/component to understand a bit more on those components

gphilipp 2021-05-11T13:10:43.196300Z

Integrant is just another take at handling components lifecycle.