Any one tried poll method with integrant?
Integrant doesn’t really care about what you do in your components when they’re started, it just care about how to start and stop them.
Have a look at https://github.com/FundingCircle/jackdaw/blob/b4f67106cc12a78d9dcc64b6cba668ab6a804d04/examples/word-count/dev/user.clj
yeah, but starting the polling with integrant, is stoping the rest of the process.
Like as I start the app, all I can see is the poll method running and I can apply the graphql mutations and subscriptions as it says connection refused(not running) and REPL is hung!
Polling is a blocking operation. Are you spinning up and polling the consumer in the main thread? https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
I mean, I'm running it with integrant, so yeah maybe it is running in the main thread!
this is the method:
(defmethod ig/init-key ::poll [_ {consumer :consumer channel :channel}]
(timbre/info "Starting polling")
(with-open [consumer (jc/seek-to-end-eager consumer)]
(while true
(doseq [{:keys [value]} (jackclient/poll consumer 100)]
(>!! (:channel channel) {:msg-type :update :data {:msg value}}))))
(.commitSync consumer))
Have a look at https://stackoverflow.com/questions/4560594/how-can-i-create-a-constantly-running-background-process-in-clojure
Ya that'll block the main thread. I’m fairly certain integrant won’t wrap states in threads
I'm guessing the other issue is it doesn’t return anything...? like the init-key method needs something to be returning right? so that it can be the key?
It just polls the value and puts it in a channel...
The value can (and likely should) be a thread so you can shut it down on halt
Or a go-loop or whatever concurrency primitive you choose to use
I've tried it two ways:
(defmethod ig/init-key ::poll [_ {consumer :consumer channel :channel}]
(timbre/info "Starting polling")
(with-open [consumer (jc/seek-to-end-eager consumer)]
(future
(while true
(doseq [{:keys [value]} (jc/poll consumer 100)]
(>!! (:channel channel) {:msg-type :update :data {:msg value}}))))
); should future end here?
(.commitSync consumer)))
This way the polling is running in the background and isn't blocking the main thread i.e, the REPL is function but it isn't actually putting anything in the channel, if I add a print statement there it isn't even showing up.
Other one is I replaced future with thread, and got an exception saying Kafka is not safe for multi-threaded access.That is correct, kafka consumers are not thread safe, so you’d want to move the consumer creation into the future
Which means NOT getting the consumer from another init-key
So, how do I get the consumer then?
I use a prep-key method for the consumer config, then pass the config into an init-key method, where I get the consumer to subscribe to the topics and ultimately return it.
What about keeping the config in the init-key, and use that to create the consumer?
No config comes from prep-key
Your init-key code should call something like this (start-consumer!)
fn:
(defn- poll!
[consumer producer running]
(while @running
(let [records (jd/poll consumer (Duration/ofMillis 1000))]
(doseq [{:keys [value]} records]
;; do stuff)
(.commitSync consumer))))
(defn- start-consumer! [consumer producer topic-config running]
(let [consumer-thread (Thread. #(poll! consumer producer running))]
(jd/subscribe consumer [topic-config])
(.start consumer-thread)
consumer-thread))
So integrant with keep this Thread
object under the system key you’ve associated it with.
In case you want to stop the system, you need to get hold of that object via the halt-key!
lifecycle fn and stop the thread.
consumer init-key method?
Yes
How can the consumer init-key method call start-consumer! method? I mean consumer init-key method is the one that is returning the consumer...
Your init-key
function will call the start-consumer!
fn which will start the background thread and return the Thread object. Integrant will just keep that in his system map, ready for you to look it up whenever you want if you want to stop it (or do anything you want with that Thread object).
Yeah but start-consumer! needs the consumer as an argument...? but init-key will return the consumer...
Yeah but they are different, there’s the one that sits at the system (integrant) level, it’s what we call a system component and you should name differently, eg GraphqlExposer
and under the hood it uses one (or maybe many) kafka consumer.
You can read https://github.com/stuartsierra/component to understand a bit more on those components
Integrant is just another take at handling components lifecycle.