hmm... I don't understand something about the interaction between core.async/take and mults
this simplified example works:
`
(let [input-chan (a/chan 32)
input-mult (a/mult input-chan)
plus-chan (a/reduce + 0 (a/tap input-mult (a/chan 32)))
all-chan (a/into [] (a/tap input-mult (a/chan 32)))
zero-chan (a/into [] (a/take 1 (a/tap input-mult (a/chan 32))))]
(a/pipe (a/to-chan (range 33)) input-chan)
{:all (a/<!! all-chan)
:plus (a/<!! plus-chan)
:zero (a/<!! zero-chan)})
{:all [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32], :plus 528, :zero [0]}
this hangs forever
(let [input-chan (a/chan 32)
input-mult (a/mult input-chan)
plus-chan (a/reduce + 0 (a/tap input-mult (a/chan 32)))
all-chan (a/into [] (a/tap input-mult (a/chan 32)))
zero-chan (a/into [] (a/take 1 (a/tap input-mult (a/chan 32))))]
(a/pipe (a/to-chan (range 34)) input-chan)
{:all (a/<!! all-chan)
:plus (a/<!! plus-chan)
:zero (a/<!! zero-chan)})
an even simpler example:
(let [input-chan (a/chan 32)
input-mult (a/mult input-chan)
plus-chan (a/reduce + 0 (a/tap input-mult (a/chan 32)))
zero-chan (a/into [] (a/take 1 (a/tap input-mult (a/chan 32))))]
(a/pipe (a/to-chan (range 34)) input-chan)
{:plus (a/<!! plus-chan)
:zero (a/<!! zero-chan)})
;;; hangs forever
(let [input-chan (a/chan 32)
input-mult (a/mult input-chan)
plus-chan (a/reduce + 0 (a/tap input-mult (a/chan 32)))
zero-chan (a/into [] (a/take 1 (a/tap input-mult (a/chan 32))))]
(a/pipe (a/to-chan (range 33)) input-chan)
{:plus (a/<!! plus-chan)
:zero (a/<!! zero-chan)})
{:plus 528, :zero [0]}
creating multiple things off the same input is a really common pattern for me so I'd love to figure out what I'm doing wrong with the mult
@otfrom I think the problem is with this channel (a/take 1 (a/tap input-mult (a/chan 32)))
the one with 32 buffered. When the take takes 1 and returns a channel, the channel created for the tap is no longer consumed, so when the buffer is full, it prevents the mult from continuing distributing items, while the range 33
version works as it’s able to buffer all the items. If the tapped channel of the take you put a bigger buffer, then it works
Not being able to distribute all messages makes the reduce ch to not close and then doesn’t produce a result
(let [input-chan (a/chan 32)
input-mult (a/mult input-chan)
plus-chan (a/reduce + 0 (a/tap input-mult (a/chan 32)))
zero-chan (a/into [] (a/take 1 (a/tap input-mult (a/chan 33))))]
(a/pipe (a/to-chan (range 34)) input-chan)
{:plus (a/<!! plus-chan)
:zero (a/<!! zero-chan)})
Just with 33 on the zero-chan tap ch it worksok. I had thought that the take would close its consuming channel when it got its one item thus not blocking the rest
forcing the channel for the a/take to be the size of the whole input doesn't feel like what I was hoping for with a/take
thx for the answer 🙂
yeah I agree, a dropping or sliding buffer wouldn’t need to know the length , but if you want to close the channel so it doesn’t keep consuming the mult after the take, you could implement your own take
that closes the input channel
ah, so just use a different kind of buffer. Thx. Hadn't twigged that one
so for some of the results above (like the all-chan) what I really want to do is just stream it out to a file. I worry a bit about blocking the threadpool if I do that. Are they any good examples out there of taking something off a channel and streaming each record (or batches) to a file or other io?
thx for the help @bertofer
np, I enjoyed to figure it out as it was quite puzzling
I’m wondering what advantages the above has compared to:
(let [input (range 34)
all (future (into [] input))
plus (future (reduce + input))
zero (future (take 1 input))]
{:all (deref all)
:plus (deref plus)
:zero (deref zero)})
for simple topologies I've written that exact code
does future use a pool or a new thread each time?
I've got other bits where there are shared transforms that go to some downstream components
I suppose with your example there is no back pressure either
what should slow down if one of those parallel task is taking longer?
(it could be me not really understanding some concepts, so genuine questions)
good use of anomalies too I think
Expanding caching pool
@reborg do you mean why would I choose a larger buffer for one task rather than another?
If + was doing real work I'd want that to have a larger buffer
I was thinking about the need for backpressure, that it could be useful in case you don’t control the producer upstream, as a way to communicate the need for a slowdown without crashing the consumer or responding 500. But if you have control of the source of the data (such as a file), the backpressure is already built in with sequences or input streams. So I tend to think backpressure is more important with a streaming consumer (always on). I was under the impression you were dealing with one off processing via files.
hmm... I wonder if back pressure is the right term. For what I'm doing atm I'm not processing files, I'm processing generated simulations. The buffers let me process things multiple times without having to realise the whole of the sequence into memory at the same time or re-create the seq for multiple outputs
yeah, the usual idiom is to use thread
, with blocking ops inside. helpfully thread
returns a channel so it can be used in a go block idiomatically
tho here I think I just want it for the side effects