missionary

mjmeintjes 2021-06-03T00:32:06.001Z

Hi. First, just wanted to say thanks for missionary, I'm using it a lot and it is such a joy to work with. Makes complex concurrent systems much easier to build.

mjmeintjes 2021-06-03T00:32:27.001500Z

Second, my question is what would be the most idiomatic/best way to consume a BlockingQueue and turn it into a flow?

mjmeintjes 2021-06-03T00:33:09.002Z

Currently I have this: (->> (ms/observe (fn [event] (let [q (d/tx-report-queue (./dev-conn)) f (future (loop [u (.take q)] (try (event [u]) (catch Exception ex (tap> [::ex ex]))) (recur (.take q))))] #(do (future-cancel f) (d/remove-tx-report-queue (./dev-conn)))))) (ms/relieve #(into %1 %2)))

mjmeintjes 2021-06-03T00:33:38.002500Z

But I'm not sure if there is a better way to do it that does not involve future

2021-06-03T06:10:00.007700Z

@mjmeintjes more idiomatic would be to register/deregister the queue at the edge of your app, then start the pipeline with (m/ap (m/? (m/?> (m/seed (repeat (m/via m/blk (.take q))))))) observe/`relieve` works but there's no backpressure so it's effectively an unbounded buffer, it could grow out of control if the consumer is too slow