I've spent the last day and a half wondering at some of my core.async code and ultimately found an annoying gotcha in core.async/pipe
. Pipe says on the tin that it takes elements from the from
channel and puts them onto the to
channel, and stops consuming from
when to
closes. What it fails to mention is that when to
closes, the pipe worker will consume ONE MORE ELEMENT from from
that then gets dropped on the floor when the pipe worker notices that to
is closed. If you have any expectation to continue consuming elements from from
in another manner, you will never see that dropped element.
Is this the expected behavior? Am I using pipe
in an unusual way where the from
channel continues to be read from in a second context after the first to
consumer closes? Should this behavior be called out in the docs?
From reading the code i don’t see how that is possible... do you have a repro case?
Just a moment, I'll put one together.
Clj or cljs
For now, imagine the pipe worker is parked at (let [v (<! from)]
then the to
channel closes. What happens to v
? Does it get delivered to anything?
CLJ for my use case
heh I see what you are saying
There’s no transactional conveyance from from to to
that's a good way to put it
The in flight v is dropped on the floor
Calling @hiredman
In general you might always produce n+1 things in response to n demand
(let [from (async/chan)
to-a (async/chan)
to-b (async/chan)]
(async/pipe from to-a)
(async/close! to-a)
(async/pipe from to-b)
(async/go (async/>! from :1)
(async/>! from :2))
(async/<!! to-b))
Interestingly the first time I ran this I got :1
but got :2
on every subsequent run
Is close!
asynchronous? I can't think of why I would get :1
otherwise.
pipe is
Oh, I suppose the second pipe might make progress before the first pipe does.
yes
Then :1
would be earmarked for delivery to to-b
instead of to-a
This should be pretty reliable then:
(let [from (async/chan)
to-a (async/chan)
to-b (async/chan)]
(async/pipe from to-a)
(Thread/sleep 10)
(async/close! to-a)
(async/pipe from to-b)
(async/go (async/>! from :1)
(async/>! from :2))
(async/<!! to-b))
it is complicated
in general it is better to use >! and <! ops to communicate to impose ordering over trying to do it via close!
I'm afraid I don't see the relevance of that advice.
If I use pipe
to hook up a producer and consumer, and the consumer expires but I want to make sure the producer gets rerouted to a new consumer, I don't see how I can make the changeover reliably without leaking an element on the floor.
Maybe pipe
is just not intended to be used in this way, which is fine, just not what I expected when I read the docstring.
you are trying to reason about how the time lines of the go blocks spun up by the pipes, and the go block you spin up, and the main thread interact
Oh, sure in my repro example it's just a kludge to get it working "most of the time"
I'm not sure how I can make it a watertight example though since it's always nondeterministic how much progress the first pipe worker will make before I hook up the second pipe worker.
in general, in core.async anything that copies from one channel and adds to another effectively acts as an extra buffer of size at least 1 in the whole system
and there isn't really a way(in core.async) to make it transactional so you don't need that extra buffer of at least 1
I think my surprise is mostly from the docstring that says Will stop consuming the from channel if the to channel closes"
. Now that I've noticed the nuance here it makes sense why it's not transactional and I'm fine with that behavior now that I'm aware of it, but I wonder if the docstring could be improved to call out this edge case.
https://aturon.github.io/academic/turon-thesis.pdf is a doctoral thesis (beautifully typeset) aimed at making it possible to do the transactional thing, it is neat
pipeline
has the same verbiage in the docstring and the same behavior. I haven't had time to look through all the other cases that might be similar. I feel like I'm a relatively experienced user of core.async
and this mystified me and my coworkers.
pipeline is even more of a buffer then pipe
pipe introduces a buffer of 1, pipeline introduces a buffer of at least n (the parallelism number you pass in)
Yeah, the fact that there's a buffer doesn't surprise me at all. The fact that the contents of the buffer could be dropped when the destination changes is what suprprised me.
yeah, it is important to keep in mind you don't have a network of channels, you have channels connecting a network of things that do computation, and once data is out of a channel in in the computation, it is gone from the channel, so even if pipe wanted to, it can't put it back at the front of the channel
Yep that makes sense.
I don't have a question as much as a request: Can the docstring of pipe
(and probably pipeline
too?) be improved to call attention to this behavior?
I want to save other people the trouble of figuring this out for themselves.
Well I suppose I do have a question after all. My use case is basically this:
(let [producer (async/chan)
consumer-a (async/chan)
consumer-b (async/chan)]
(connect-with-magic producer consumer-a)
(disconnect-with-magic consumer-a)
(connect-with-magic producer consumer-b)
(async/go (async/>! producer :val))
(async/<! consumer-b)) ;=> :val
Is there something I can put in connect-with-magic
and disconnect-with-magic
such that :val
is guaranteed to be delivered to consumer-b
? I'm perfectly okay with produced vals being ignored if they are produced before consumer-b
is connected, but once the consumer is connected I need all the produced values to be delivered.the best way to do that may be to post something on http://ask.clojure.org
you can write your own copying construct that looks at the return value of the put to determine what to do
(let [producer (async/chan)
consumer-a (async/chan)
consumer-b (async/chan)]
(async/go-loop []
(let [msg (async/<! producer)]
(when-some msg
(when-not (async/>! consumer-a msg)
(async/>! consumer-b msg)))))
(async/close! consumer-a)
(async/go-loop []
(let [msg (async/<! producer)]
(when-some msg
(when-not (async/>! consumer-b msg)
(async/>! consumer-a msg)))))
(async/go (async/>! producer :val))
(async/<! consumer-b))
oh, and I guess those loops need recurs in there somewhere
basically write to one consumer, and if that fails write to the other
but you have no ordering guarantees there
I have essentially a seq of consumers. I want to produce to the first consumer until that consumer is closed, then move to the next consumer. If no consumers are connected I'm fine dropping values. So my problem is a little more general than these two consumers.
I'll do that, thanks.
I'm trying out mult
and tap
right now and it looks like it might work for me?
you will race on tapping with mult
I'm fine with the race on tap as long as the old tap doesn't cause an element to disappear
The problem with the pipe solution is that once the first pipe worker parks on take, it's impossible for me to recover the next element to be delivered. With mult/tap I at least have a chance of reading the next value.
a race may drop
(defn keep-piping [source consumers]
(async/go-loop [consumer nil
msg nil]
(cond (nil? consumer)
(when-let [c (async/<! consumers)]
(recur consumer msg))
(nil? msg)
(when-let [msg (async/<! source)]
(recur consumer msg))
(not (async/>! consumer msg))
(recur nil msg))))
takes a source channel, and a channel to read consumer channels off of, and pipes from the source the first consumer, then the next, etc
hmm, interesting!
I think that would work well for me if the component that was connecting consumer to producer knew about all the consumers. Right now my consumers are ignorant of each other, and my producer is ignorant of the consumers, and I have no code that is aware of everyone and responsible for doing the connecting.
if mult or pipe worked, then you do have some central place, you just don't think of it like that
yep I think you're right
well, my producer is creating the mult
and my consumers are creating taps
the consumer is initiating the action when a tap is created but the tap IS centrally managed in the mult like you say