Yes, several poc's and the place I'm currently at also kind of combines them, but with a traditional dB in the middle. See https://github.com/gklijs/bob2021 or https://github.com/gklijs/ksqlDB-GraphQL-poc also there is a blog in the works in cooperation with Confluent on exactly this.
I understand the issue here, I was running the poll inside the streamer so it kept calling every time I run the subscription...
I guess it'd be better to poll and then put the data into a core.async channel
Hey @dstephens, I thought it'd be better to poll the messages and then put those messages in a core.async channel and from the streamer I can read that channel and then send a callback... So, I tried polling them inside the mutation like this,
(while true
(let [records (jc/poll consumer 1000)]
(doseq [{:keys [value]} records]
(println value))
(.commitSync consumer)))
But I'm getting this:
class clojure.lang.PersistentArrayMap cannot be cast to class org.apache.kafka.clients.consumer.Consumer (clojure.lang.PersistentArrayMap and org.apache.kafka.clients.consumer.Consumer are in unnamed module of loader 'app'
What does this mean?Pretty standard error in clojure, it means you are calling a function that expects a consumer on a map and it can't be cast.
It's usually pretty simple to debug these in a repl as you can divide and conquer to work out where the error is coming from.
In this case since it's a method that's expecting a consumer, as a guess it's quite likely you are calling poll
or commitSync
on a map by accident, I'd probably start with checking the value of consumer in your snippet
I printed out consumer and it returned huge stuff