aleph

hansen-pansen 2017-06-07T17:05:29.024322Z

Ugh. Bumped into another thing in #aleph I do not understand. While — thanks to @dm3 — I could run manifold.deferred/futures in a custom thread pool, I now struggle to do the same via manifold.stream. I am using stream.onto to “assign” the same thread pool to the stream. But the execution is serial.

hansen-pansen 2017-06-07T17:09:44.120823Z

What I was expecting: the put!s finish immediately, and after one second I get 10 “Received:” lines at once. What I get instead: The put!s finish immediately, and every second I get a “Received:” line, taking 10 seconds in total. From https://github.com/ztellman/manifold/blob/master/docs/execution.md I understand that I do not need to put the sleep function into a future (as in my previous try with manifold.deferred).

dm3 2017-06-07T17:47:45.981762Z

@hansen-pansen you are supposed to respect backpressure when put!ting into a stream, i.e. (dotimes [_ num] @(s/put! x delay)). Due to the stream x having a default buffer of size 1, the items will be put! and processed by the sleep function one at a time.

dm3 2017-06-07T17:47:59.987182Z

there’s also put-all! for putting multiple items

dm3 2017-06-07T17:48:44.003899Z

the stream callbacks will run on the supplied pool executor, but the operations on the stream will not be parallelized automatically

hansen-pansen 2017-06-07T17:54:24.133138Z

@dm3 Well, I tried to use a buffered-stream, too, but same result. If I would have all the inputs from the beginning, I would use ->source or put-all! instead. But I am trying to use manifold for a queue/worker model, were inputs are generated from web requests.

dm3 2017-06-07T17:57:06.195601Z

that’s great

dm3 2017-06-07T17:57:19.200912Z

producer puts

dm3 2017-06-07T17:57:21.201813Z

consumer takes

dm3 2017-06-07T17:57:47.211848Z

buffer size is the amount of requests depending on the max allowed latency

2017-06-07T18:01:14.294205Z

@hansen-pansen I believe what @dm3 is saying is that you will need to initialize x with a buffer, i.e. bind it to (s/stream 10) as opposed to (s/stream) (which only has a buffer for one item)

2017-06-07T18:01:32.301378Z

streams automatically deal with backpressure

2017-06-07T18:01:58.311432Z

so they will not attempt to process additional items when they are blocking on the number of deferreds in the buffer

2017-06-07T18:02:08.314939Z

if the buffer is size 1, it will block on that one item

hansen-pansen 2017-06-07T18:03:44.352345Z

@ehashman This is what I also tried, but it's not depicted in above example. I used x (stream/buffered-stream num), so all put!s should should “get through”.

hansen-pansen 2017-06-07T18:05:38.396554Z

Also: my put!s did not block.

dm3 2017-06-07T18:05:57.403453Z

you have to deref the put!

hansen-pansen 2017-06-07T18:06:39.419057Z

BAAM This could be the thing!