core-async

kirill.salykin 2020-03-24T12:23:19.272800Z

Hi all, please advice, does it make sense to use reduce with the pipeline-blocking?

(async/pipeline-blocking concurrent
                               output-chan
                               (reduce
                                (fn [acc hour]
                                  (let [acc* (disj acc hour)]
                                    (worker hour context)
                                    (if (seq acc*)
                                      acc*
                                      (reduced :done))))
                                hours)
                               (async/to-chan hours))
What I want to achieve is to send :done to output channel when all hours were processed

Ben Sless 2020-03-26T06:06:44.000100Z

why not use async/reduce?

Ben Sless 2020-03-26T06:07:41.000300Z

or something. You're not propagating the hours, you're working on them

Ben Sless 2020-03-26T06:09:24.000500Z

So a way to use pipeline blocking would be Create input channel with transducer cat on it Create output channel Create pipeline blocking with transducer (map #(worker % context)) put hours on the input channel Then do some sort of take from the output channel &lt;!!/`<!`/`take!`. That take will block until the work is finished

kirill.salykin 2020-03-26T06:15:28.001600Z

interesting... let me try. thanks!

kirill.salykin 2020-03-26T07:51:54.001800Z

works, thanks!

🎆 1
kirill.salykin 2020-03-24T12:23:22.273100Z

thanks

mavbozo 2020-03-24T14:22:45.273200Z

i haven't tried that before. async/pipeline-blocking requires a transducer.

mavbozo 2020-03-24T14:24:45.273400Z

i've used pipeline-blocking before, but with a transducer.

kirill.salykin 2020-03-24T14:35:34.273600Z

that is my concern, that reduce may not work 😞

kirill.salykin 2020-03-24T14:35:35.273800Z

thanks

serioga 2020-03-24T21:26:27.274Z

I think pipeline-blocking returns channel which is close when processing is done.