How to retrieve items from multiple channels (each containing 0-1 items)? What I want to do is something like this:
(let [[err-ch data-ch] (a/split throwable? input-chan)
result-ch (a/reduce extract-and-combine-data nil data-ch)
errors-ch (a/into [] err-ch)]
(a/go {:result (a/<! result-ch) :errors (a/<! errors-ch)}))
The problem is that I have to consume all input-chan
items - and therefore all items on both err-ch
and data-ch
- for all three channels to close (because back-pressure and buffers of size 0). F.ex. if input-chan
only contains errors then the code will block forever at (a/<! result-ch)
because the (empty) result-ch
won't be closed until errors-ch
is consumed. Is there a working way to do this?
What I can think of is:
1. Wrap both result-ch
and errors-ch
into a buffer 1 channel so that they will not need to wait to be read from to consume the inputs (i.e. add result-ch' (chan 1), _ (pipe result-ch result-ch')
)
2. Use alts!
to read whatever comes first and then read the other channel: (a/go (let [[v ch] (a/alts! [result-ch errors-ch])
res (if (= ch result-ch) v (a/<! result-ch))
err (if (= ch errors-ch) v (a/<! errors-ch))]
{:result res :errors err}))
How do you do troubleshooting with async? Normally I just put (def *args [arg1 ...])
into a function to capture its arguments and be able to play with it from the REPL but that is useless for channels as they will be already consumed. So I guess I need to make a copy of the channel using mult
. Or?
@holyjak why do you need to process both the results and errors at the same time? why not have separate functions/go blocks?
1. I assume I must consume both channels so that the writer isn't blocked. 2. I want to report # errors vs # ordinary items.
@holyjak, If I correctly understood your problem: have you tried to consume all items into a collection to “unpack” data from the channels?
(a/into coll input-chan)
then
(split-with throwable? coll)
what about attaching the cat
transducer to a chan?
Apparently, all data is already being streamed through the input-chan
. Unless, I’m missing something. It would be a matter of consuming everything into a collection then splitting later. Considering that the input channel is to be closed signaling the completion.