core-async

Itay 2020-10-05T10:38:32.067Z

Hi, This is sorta my first time trying to use core.async in a real system so question may be silly: I would like to: 1. Move the contents of a lazy sequence into a channel. 2. Do stuff with the channel (run it through a pipeline). 3. Take the output channel, and convert it back to a lazy sequence. So all the core.async stuff are “black-boxed” away from the rest of the code. The issue is: the source sequence is blocking - it makes DB calls behind the scenes, so calling first or next on it could block. For this reason, It doesn’t seem right to use to-chan which, from looking at the code, pulls from the sequence inside a go-loop . To me it seems like I want a version of to-chan that does the blocking in another thread, that isn’t part of the core.async thread pool (which AFAIK is small). So the question is: 1. Do I need to implement it myself, or is there some readymade function I can use for this? 2. How do I convert the output channel into a lazy-sequence (that I can consume from without waiting for the whole process to complete)? 3. Am I totally missing something here? Please elaborate 🙂 Thanks!

2020-10-06T15:21:16.072900Z

if you care about chunking the solution isn't to "dechunk" it is to not use lazy seqs

Itay 2020-10-06T15:50:56.073100Z

claypoole Looks really useful, I didn’t know about it. I ended up implementing something very similar to its pmap that receives the thread pool size (using pipeline-blocking inside). I may switch to it instead.

jumar 2020-10-07T12:02:03.073300Z

@noisesmith I know but it's quite handy in this case. I didn't find an easier way to do what I wanted.

Jan K 2020-10-05T14:06:56.067500Z

Designing around blocking lazy sequences is IMO not a good idea, I think you'll have an easier time if you just use channels or some other kind of queue.

Itay 2020-10-05T15:07:09.067800Z

That’s not very helpful I’m afraid - I’m using Monger which provides either a lazy sequence or iterator that I have to wrap somehow if I want to use a channel - I would like to know how to do that. On the other hand, our system doesn’t use channels and I would like to use core.async for this specific flow without rewriting the entire codebase - so I would like to know how to convert a channel into what is commonly used in Clojure - a lazy sequence. The reason I want it to be lazy is to allow processing of the output while parts of it are still being generated - otherwise response time would increase.

Jan K 2020-10-05T16:09:14.068Z

Well although it's not ideal it should be possible - I found a couple implementations on google, eg: https://gist.github.com/stathissideris/8659706

Jan K 2020-10-05T16:12:08.068200Z

Instead of to-chan (which is deprecated) you can use clojure.core.async/to-chan!! or onto-chan!! to avoid blocking the go pool

Itay 2020-10-05T16:14:09.068700Z

Thanks! A friend sent me the same link and it seems to fit the output part.

Itay 2020-10-05T16:14:32.068900Z

Oh, it’s deprecated? I don’t see that mentioned, nor the to-chan!! function in the docs: https://clojuredocs.org/clojure.core.async

Itay 2020-10-05T16:16:56.069100Z

OK found it in the probably more official docs: https://clojure.github.io/core.async/#clojure.core.async/to-chan!! This looks like exactly what I wanted (and just implemented by myself) - will try, thanks.

Jan K 2020-10-05T16:17:48.069300Z

Yeah those docs on http://clojuredocs.org seem out of date.

orestis 2020-10-05T17:55:01.071500Z

Curious - if it starts as a lazy seq and ends a lazy seq, why not keep it a lazy seq? What do you gain by putting core async in the middle?

Itay 2020-10-05T18:43:20.071700Z

Each item in the input seq triggers another blocking call, so I’m using pipeline-blocking to parallelize the work with constant bound on the concurrent calls.

Itay 2020-10-05T18:45:07.071900Z

There is one annoying issue with to-chan!! - if the input seq throws an exception (which a DB call might) it would simply close the channel and the error gets “swallowed” - not very production-ready…

Jan K 2020-10-05T22:40:53.072100Z

Another option for pooled parallelization could be pmap from https://github.com/TheClimateCorporation/claypoole