Ugh. Bumped into another thing in #aleph I do not understand. While — thanks to @dm3 — I could run manifold.deferred/future
s in a custom thread pool, I now struggle to do the same via manifold.stream
. I am using stream.onto
to “assign” the same thread pool to the stream. But the execution is serial.
What I was expecting: the put!
s finish immediately, and after one second I get 10 “Received:” lines at once. What I get instead: The put!
s finish immediately, and every second I get a “Received:” line, taking 10 seconds in total.
From https://github.com/ztellman/manifold/blob/master/docs/execution.md I understand that I do not need to put the sleep
function into a future (as in my previous try with manifold.deferred
).
@hansen-pansen you are supposed to respect backpressure when put!
ting into a stream, i.e.
(dotimes [_ num] @(s/put! x delay))
. Due to the stream x
having a default buffer of size 1, the items will be put!
and processed by the sleep
function one at a time.
there’s also put-all!
for putting multiple items
the stream callbacks will run on the supplied pool
executor, but the operations on the stream will not be parallelized automatically
@dm3 Well, I tried to use a buffered-stream
, too, but same result.
If I would have all the inputs from the beginning, I would use ->source
or put-all!
instead. But I am trying to use manifold
for a queue/worker model, were inputs are generated from web requests.
that’s great
producer puts
consumer takes
buffer size is the amount of requests depending on the max allowed latency
@hansen-pansen I believe what @dm3 is saying is that you will need to initialize x
with a buffer, i.e. bind it to (s/stream 10)
as opposed to (s/stream)
(which only has a buffer for one item)
streams automatically deal with backpressure
so they will not attempt to process additional items when they are blocking on the number of deferreds in the buffer
if the buffer is size 1, it will block on that one item
see also https://github.com/ztellman/manifold/blob/master/docs/stream.md#buffers-and-backpressure
@ehashman This is what I also tried, but it's not depicted in above example. I used x (stream/buffered-stream num)
, so all put!
s should should “get through”.
Also: my put!
s did not block.
you have to deref the put!
BAAM This could be the thing!