aleph

igrishaev 2020-01-23T10:15:02.001700Z

A small question: what is the best way to implement a “worker” entity? By this I mean a separate loop/recur in the background which consumes a stream? Say I have a stream of events and would like to process them by four workers.

igrishaev 2020-01-23T10:16:40.002Z

(let [stream (get-stream)
      number 4]
  (doseq [n number]
    (d/loop []
      (let [message (get-from-stream stream)]
        (process-message message)
        (d/recur)))))

igrishaev 2020-01-23T10:17:09.002400Z

Am I right?

flowthing 2020-01-23T10:29:03.002600Z

Well, I guess you could use manifold.stream/consume or manifold.stream/consume-async, at least. Or, depending on what you're trying to do, you could also look at manifold.bus.

2020-01-23T11:41:56.004400Z

The devil is in the detail. Do you care about order? What if item-a got popped off the stream by the first loop and for some reason takes a long time to process, meanwhile item-b got popped off by the second loop and gets processed immediately.

igrishaev 2020-01-23T12:15:39.004800Z

No the order is not important.

mccraigmccraig 2020-01-23T12:38:27.004900Z

@igrishaev we have map-concurrently which uses stream-buffers to control concurrency of non-blocking ops (or Futures wrapping blocking ops) - https://gist.github.com/mccraigmccraig/b10156ed0b59de6ccc93dbb1115df7c9

igrishaev 2020-01-23T12:39:34.005100Z

Thanks, looks interesting, let me read.

mccraigmccraig 2020-01-23T12:46:13.005300Z

our general pattern is to map-concurrently then reduce to consume the stream

mccraigmccraig 2020-01-23T12:47:18.005500Z

we've also wrapped all the core manifold.stream functions to propagate errors sensibly - errors during map get captured in a marker record on the output stream, and returned (as an errored deferred) by any reduce

igrishaev 2020-01-23T12:49:38.005700Z

Still, I’m looking for something like that:

(let [stream (...)
      limit 4]
  (doseq [n limit]
    (spawn-worker stream msg-function)))

igrishaev 2020-01-23T12:49:59.005900Z

what should be in spawn-worker ?

mccraigmccraig 2020-01-23T13:08:37.006100Z

we would do (->> stream (map-concurrently 4 #(run-task whatever %)) (reduce conj [])) to run 4 concurrent tasks over a stream - we don't really use "workers" as such, which would probably need a queue to feed them etc.

mccraigmccraig 2020-01-23T13:13:08.006300Z

most of our tasks are non-blocking, but for the (rare) cases in our platform where run-task is a blocking thing we wrap it in a deferred/future