
igrishaev 2020-01-23T10:15:02.001700Z

A small question: what is the best way to implement a “worker” entity? By this I mean a separate loop/recur in the background which consumes a stream? Say I have a stream of events and would like to process them by four workers.

igrishaev 2020-01-23T10:16:40.002Z

(let [stream (get-stream)
      number 4]
  (doseq [n number]
    (d/loop []
      (let [message (get-from-stream stream)]
        (process-message message)

igrishaev 2020-01-23T10:17:09.002400Z

Am I right?

flowthing 2020-01-23T10:29:03.002600Z

Well, I guess you could use or, at least. Or, depending on what you're trying to do, you could also look at manifold.bus.


The devil is in the detail. Do you care about order? What if item-a got popped off the stream by the first loop and for some reason takes a long time to process, meanwhile item-b got popped off by the second loop and gets processed immediately.

igrishaev 2020-01-23T12:15:39.004800Z

No the order is not important.

mccraigmccraig 2020-01-23T12:38:27.004900Z

@igrishaev we have map-concurrently which uses stream-buffers to control concurrency of non-blocking ops (or Futures wrapping blocking ops) -

igrishaev 2020-01-23T12:39:34.005100Z

Thanks, looks interesting, let me read.

mccraigmccraig 2020-01-23T12:46:13.005300Z

our general pattern is to map-concurrently then reduce to consume the stream

mccraigmccraig 2020-01-23T12:47:18.005500Z

we've also wrapped all the core functions to propagate errors sensibly - errors during map get captured in a marker record on the output stream, and returned (as an errored deferred) by any reduce

igrishaev 2020-01-23T12:49:38.005700Z

Still, I’m looking for something like that:

(let [stream (...)
      limit 4]
  (doseq [n limit]
    (spawn-worker stream msg-function)))

igrishaev 2020-01-23T12:49:59.005900Z

what should be in spawn-worker ?

mccraigmccraig 2020-01-23T13:08:37.006100Z

we would do (->> stream (map-concurrently 4 #(run-task whatever %)) (reduce conj [])) to run 4 concurrent tasks over a stream - we don't really use "workers" as such, which would probably need a queue to feed them etc.

mccraigmccraig 2020-01-23T13:13:08.006300Z

most of our tasks are non-blocking, but for the (rare) cases in our platform where run-task is a blocking thing we wrap it in a deferred/future