A small question: what is the best way to implement a “worker” entity? By this I mean a separate loop/recur in the background which consumes a stream? Say I have a stream of events and would like to process them by four workers.
(let [stream (get-stream)
number 4]
(doseq [n number]
(d/loop []
(let [message (get-from-stream stream)]
(process-message message)
(d/recur)))))
Am I right?
Well, I guess you could use manifold.stream/consume
or manifold.stream/consume-async
, at least. Or, depending on what you're trying to do, you could also look at manifold.bus
.
The devil is in the detail. Do you care about order? What if item-a
got popped off the stream by the first loop and for some reason takes a long time to process, meanwhile item-b
got popped off by the second loop and gets processed immediately.
No the order is not important.
@igrishaev we have map-concurrently
which uses stream-buffers to control concurrency of non-blocking ops (or Futures
wrapping blocking ops) - https://gist.github.com/mccraigmccraig/b10156ed0b59de6ccc93dbb1115df7c9
Thanks, looks interesting, let me read.
our general pattern is to map-concurrently
then reduce
to consume the stream
we've also wrapped all the core manifold.stream
functions to propagate errors sensibly - errors during map
get captured in a marker record on the output stream, and returned (as an errored deferred) by any reduce
Still, I’m looking for something like that:
(let [stream (...)
limit 4]
(doseq [n limit]
(spawn-worker stream msg-function)))
what should be in spawn-worker
?
we would do (->> stream (map-concurrently 4 #(run-task whatever %)) (reduce conj []))
to run 4 concurrent tasks over a stream - we don't really use "workers" as such, which would probably need a queue to feed them etc.
most of our tasks are non-blocking, but for the (rare) cases in our platform where run-task
is a blocking thing we wrap it in a deferred/future