core-async

2020-05-07T22:08:45.132500Z

I've spent the last day and a half wondering at some of my core.async code and ultimately found an annoying gotcha in core.async/pipe. Pipe says on the tin that it takes elements from the from channel and puts them onto the to channel, and stops consuming from when to closes. What it fails to mention is that when to closes, the pipe worker will consume ONE MORE ELEMENT from from that then gets dropped on the floor when the pipe worker notices that to is closed. If you have any expectation to continue consuming elements from from in another manner, you will never see that dropped element.

2020-05-07T22:09:49.132600Z

Is this the expected behavior? Am I using pipe in an unusual way where the from channel continues to be read from in a second context after the first to consumer closes? Should this behavior be called out in the docs?

ghadi 2020-05-07T22:14:17.134Z

From reading the code i don’t see how that is possible... do you have a repro case?

2020-05-07T22:14:34.134200Z

Just a moment, I'll put one together.

ghadi 2020-05-07T22:15:12.134600Z

Clj or cljs

2020-05-07T22:15:33.134800Z

For now, imagine the pipe worker is parked at (let [v (<! from)] then the to channel closes. What happens to v? Does it get delivered to anything?

2020-05-07T22:15:46.135Z

CLJ for my use case

ghadi 2020-05-07T22:17:15.135500Z

heh I see what you are saying

ghadi 2020-05-07T22:17:46.136300Z

There’s no transactional conveyance from from to to

2020-05-07T22:17:57.136800Z

that's a good way to put it

ghadi 2020-05-07T22:18:05.137100Z

The in flight v is dropped on the floor

ghadi 2020-05-07T22:18:18.137500Z

Calling @hiredman

ghadi 2020-05-07T22:21:33.138600Z

In general you might always produce n+1 things in response to n demand

2020-05-07T22:22:00.138800Z

(let [from (async/chan)
      to-a (async/chan)
      to-b (async/chan)]
  (async/pipe from to-a)
  (async/close! to-a)
  (async/pipe from to-b)
  (async/go (async/>! from :1)
            (async/>! from :2))
  (async/<!! to-b))

2020-05-07T22:22:14.139Z

Interestingly the first time I ran this I got :1 but got :2 on every subsequent run

2020-05-07T22:22:33.139200Z

Is close! asynchronous? I can't think of why I would get :1 otherwise.

2020-05-07T22:22:53.139400Z

pipe is

2020-05-07T22:23:59.139600Z

Oh, I suppose the second pipe might make progress before the first pipe does.

2020-05-07T22:24:16.139800Z

yes

2020-05-07T22:24:25.140Z

Then :1 would be earmarked for delivery to to-b instead of to-a

2020-05-07T22:25:03.140200Z

This should be pretty reliable then:

(let [from (async/chan)
      to-a (async/chan)
      to-b (async/chan)]
  (async/pipe from to-a)
  (Thread/sleep 10)
  (async/close! to-a)
  (async/pipe from to-b)
  (async/go (async/>! from :1)
            (async/>! from :2))
  (async/<!! to-b))

2020-05-07T22:25:59.140400Z

it is complicated

2020-05-07T22:27:44.140600Z

in general it is better to use >! and <! ops to communicate to impose ordering over trying to do it via close!

2020-05-07T22:29:19.140800Z

I'm afraid I don't see the relevance of that advice.

2020-05-07T22:30:56.141Z

If I use pipe to hook up a producer and consumer, and the consumer expires but I want to make sure the producer gets rerouted to a new consumer, I don't see how I can make the changeover reliably without leaking an element on the floor.

2020-05-07T22:31:29.141200Z

Maybe pipe is just not intended to be used in this way, which is fine, just not what I expected when I read the docstring.

2020-05-07T22:31:44.141400Z

you are trying to reason about how the time lines of the go blocks spun up by the pipes, and the go block you spin up, and the main thread interact

2020-05-07T22:32:13.141600Z

Oh, sure in my repro example it's just a kludge to get it working "most of the time"

2020-05-07T22:33:16.141800Z

I'm not sure how I can make it a watertight example though since it's always nondeterministic how much progress the first pipe worker will make before I hook up the second pipe worker.

2020-05-07T22:34:26.142Z

in general, in core.async anything that copies from one channel and adds to another effectively acts as an extra buffer of size at least 1 in the whole system

2020-05-07T22:35:02.142200Z

and there isn't really a way(in core.async) to make it transactional so you don't need that extra buffer of at least 1

2020-05-07T22:38:39.142500Z

I think my surprise is mostly from the docstring that says Will stop consuming the from channel if the to channel closes". Now that I've noticed the nuance here it makes sense why it's not transactional and I'm fine with that behavior now that I'm aware of it, but I wonder if the docstring could be improved to call out this edge case.

2020-05-07T22:39:25.142700Z

https://aturon.github.io/academic/turon-thesis.pdf is a doctoral thesis (beautifully typeset) aimed at making it possible to do the transactional thing, it is neat

2020-05-07T22:39:57.142900Z

pipeline has the same verbiage in the docstring and the same behavior. I haven't had time to look through all the other cases that might be similar. I feel like I'm a relatively experienced user of core.async and this mystified me and my coworkers.

2020-05-07T22:40:43.143100Z

pipeline is even more of a buffer then pipe

2020-05-07T22:41:29.143300Z

pipe introduces a buffer of 1, pipeline introduces a buffer of at least n (the parallelism number you pass in)

2020-05-07T22:42:07.143500Z

Yeah, the fact that there's a buffer doesn't surprise me at all. The fact that the contents of the buffer could be dropped when the destination changes is what suprprised me.

2020-05-07T22:46:32.143700Z

yeah, it is important to keep in mind you don't have a network of channels, you have channels connecting a network of things that do computation, and once data is out of a channel in in the computation, it is gone from the channel, so even if pipe wanted to, it can't put it back at the front of the channel

2020-05-07T22:47:38.144Z

Yep that makes sense.

2020-05-07T22:48:05.144200Z

I don't have a question as much as a request: Can the docstring of pipe (and probably pipeline too?) be improved to call attention to this behavior?

2020-05-07T22:48:44.144400Z

I want to save other people the trouble of figuring this out for themselves.

2020-05-07T22:55:10.146500Z

Well I suppose I do have a question after all. My use case is basically this:

(let [producer (async/chan)
      consumer-a (async/chan)
      consumer-b (async/chan)]
  (connect-with-magic producer consumer-a)
  (disconnect-with-magic consumer-a)
  (connect-with-magic producer consumer-b)
  (async/go (async/&gt;! producer :val))
  (async/&lt;! consumer-b)) ;=&gt; :val
Is there something I can put in connect-with-magic and disconnect-with-magic such that :val is guaranteed to be delivered to consumer-b? I'm perfectly okay with produced vals being ignored if they are produced before consumer-b is connected, but once the consumer is connected I need all the produced values to be delivered.

2020-05-07T23:02:17.146600Z

the best way to do that may be to post something on http://ask.clojure.org

2020-05-07T23:03:17.147300Z

you can write your own copying construct that looks at the return value of the put to determine what to do

2020-05-07T23:05:15.147600Z

(let [producer (async/chan)
      consumer-a (async/chan)
      consumer-b (async/chan)]
  (async/go-loop []
    (let [msg (async/&lt;! producer)]
      (when-some msg
        (when-not (async/&gt;! consumer-a msg)
          (async/&gt;! consumer-b msg)))))
  (async/close! consumer-a)
  (async/go-loop []
    (let [msg (async/&lt;! producer)]
      (when-some msg
        (when-not (async/&gt;! consumer-b msg)
          (async/&gt;! consumer-a msg)))))
  (async/go (async/&gt;! producer :val))
  (async/&lt;! consumer-b))

2020-05-07T23:05:45.148Z

oh, and I guess those loops need recurs in there somewhere

2020-05-07T23:06:20.148500Z

basically write to one consumer, and if that fails write to the other

2020-05-07T23:06:32.148800Z

but you have no ordering guarantees there

2020-05-07T23:08:08.150Z

I have essentially a seq of consumers. I want to produce to the first consumer until that consumer is closed, then move to the next consumer. If no consumers are connected I'm fine dropping values. So my problem is a little more general than these two consumers.

2020-05-07T23:08:16.150100Z

I'll do that, thanks.

2020-05-07T23:08:50.150600Z

I'm trying out mult and tap right now and it looks like it might work for me?

2020-05-07T23:12:11.151400Z

you will race on tapping with mult

2020-05-07T23:14:27.151900Z

I'm fine with the race on tap as long as the old tap doesn't cause an element to disappear

2020-05-07T23:16:00.153100Z

The problem with the pipe solution is that once the first pipe worker parks on take, it's impossible for me to recover the next element to be delivered. With mult/tap I at least have a chance of reading the next value.

2020-05-07T23:16:03.153300Z

a race may drop

2020-05-07T23:16:17.153600Z

(defn keep-piping [source consumers]
  (async/go-loop [consumer nil
                  msg nil]
    (cond (nil? consumer)
          (when-let [c (async/&lt;! consumers)]
            (recur consumer msg))
          (nil? msg)
          (when-let [msg (async/&lt;! source)]
            (recur consumer msg))
          (not (async/&gt;! consumer msg))
          (recur nil msg))))

2020-05-07T23:16:43.154100Z

takes a source channel, and a channel to read consumer channels off of, and pipes from the source the first consumer, then the next, etc

2020-05-07T23:19:03.154600Z

hmm, interesting!

2020-05-07T23:20:23.155800Z

I think that would work well for me if the component that was connecting consumer to producer knew about all the consumers. Right now my consumers are ignorant of each other, and my producer is ignorant of the consumers, and I have no code that is aware of everyone and responsible for doing the connecting.

2020-05-07T23:21:34.156500Z

if mult or pipe worked, then you do have some central place, you just don't think of it like that

2020-05-07T23:21:41.156700Z

yep I think you're right

2020-05-07T23:21:53.156900Z

well, my producer is creating the mult

2020-05-07T23:21:58.157100Z

and my consumers are creating taps

2020-05-07T23:22:45.157500Z

the consumer is initiating the action when a tap is created but the tap IS centrally managed in the mult like you say