Hi,
This is sorta my first time trying to use core.async in a real system so question may be silly:
I would like to:
1. Move the contents of a lazy sequence into a channel.
2. Do stuff with the channel (run it through a pipeline).
3. Take the output channel, and convert it back to a lazy sequence.
So all the core.async stuff are “black-boxed” away from the rest of the code.
The issue is: the source sequence is blocking - it makes DB calls behind the scenes, so calling first
or next
on it could block. For this reason, It doesn’t seem right to use to-chan
which, from looking at the code, pulls from the sequence inside a go-loop
. To me it seems like I want a version of to-chan
that does the blocking in another thread, that isn’t part of the core.async thread pool (which AFAIK is small). So the question is:
1. Do I need to implement it myself, or is there some readymade function I can use for this?
2. How do I convert the output channel into a lazy-sequence (that I can consume from without waiting for the whole process to complete)?
3. Am I totally missing something here? Please elaborate 🙂
Thanks!
if you care about chunking the solution isn't to "dechunk" it is to not use lazy seqs
claypoole
Looks really useful, I didn’t know about it.
I ended up implementing something very similar to its pmap
that receives the thread pool size (using pipeline-blocking
inside). I may switch to it instead.
@noisesmith I know but it's quite handy in this case. I didn't find an easier way to do what I wanted.
Designing around blocking lazy sequences is IMO not a good idea, I think you'll have an easier time if you just use channels or some other kind of queue.
That’s not very helpful I’m afraid - I’m using Monger which provides either a lazy sequence or iterator that I have to wrap somehow if I want to use a channel - I would like to know how to do that. On the other hand, our system doesn’t use channels and I would like to use core.async for this specific flow without rewriting the entire codebase - so I would like to know how to convert a channel into what is commonly used in Clojure - a lazy sequence. The reason I want it to be lazy is to allow processing of the output while parts of it are still being generated - otherwise response time would increase.
Well although it's not ideal it should be possible - I found a couple implementations on google, eg: https://gist.github.com/stathissideris/8659706
Instead of to-chan
(which is deprecated) you can use clojure.core.async/to-chan!!
or onto-chan!!
to avoid blocking the go pool
Thanks! A friend sent me the same link and it seems to fit the output part.
Oh, it’s deprecated?
I don’t see that mentioned, nor the to-chan!!
function in the docs:
https://clojuredocs.org/clojure.core.async
OK found it in the probably more official docs: https://clojure.github.io/core.async/#clojure.core.async/to-chan!! This looks like exactly what I wanted (and just implemented by myself) - will try, thanks.
Yeah those docs on http://clojuredocs.org seem out of date.
Curious - if it starts as a lazy seq and ends a lazy seq, why not keep it a lazy seq? What do you gain by putting core async in the middle?
Each item in the input seq triggers another blocking call, so I’m using pipeline-blocking
to parallelize the work with constant bound on the concurrent calls.
There is one annoying issue with to-chan!!
- if the input seq throws an exception (which a DB call might) it would simply close the channel and the error gets “swallowed” - not very production-ready…
Another option for pooled parallelization could be pmap from https://github.com/TheClimateCorporation/claypoole