Hi. First, just wanted to say thanks for missionary, I'm using it a lot and it is such a joy to work with. Makes complex concurrent systems much easier to build.
Second, my question is what would be the most idiomatic/best way to consume a BlockingQueue and turn it into a flow?
Currently I have this:
(->> (ms/observe (fn [event]
(let [q (d/tx-report-queue (./dev-conn))
f (future
(loop [u (.take q)]
(try
(event [u])
(catch Exception ex
(tap> [::ex ex])))
(recur (.take q))))]
#(do
(future-cancel f)
(d/remove-tx-report-queue (./dev-conn))))))
(ms/relieve #(into %1 %2)))
But I'm not sure if there is a better way to do it that does not involve future
@mjmeintjes more idiomatic would be to register/deregister the queue at the edge of your app, then start the pipeline with (m/ap (m/? (m/?> (m/seed (repeat (m/via m/blk (.take q)))))))
observe
/`relieve` works but there's no backpressure so it's effectively an unbounded buffer, it could grow out of control if the consumer is too slow