I’m curious what the reasoning for this (in the a/chan
docstring) is:
> If a transducer is supplied a buffer must be specified.
What property of transducers requires there to be a buffer? Why can’t I have a channel that coordinates in lock-step producers and consumers with no buffering and transforms a value?
Second question: What is the best way to produce a filtered channel of results? I was assuming I’d use a transducer: Perhaps 1:
(defn filter-chan [f in out]
(let [out (a/chan)]
(a/pipeline 1 out (filter f) in)
out))
Or 2:
(defn filter-chan [f in out]
(let [filtered-in (a/chan 1 (filter f))]
(a/pipe in filtered-in)
(a/pipe filtered-in out)
out))
I guess I’m surprised core.async doesn’t appear to have more conveniences for composing channels with xforms… Or am I missing something?I suppose that the buffer/transducer thing is a consequence of not having arity-overloading in Clojure, if you want no buffering then you can pass (buffer 1)
as a buffer
(buffer 1)
isn’t no buffering… but good point, perhaps (buffer 0)
works… will try. Seems too. In which case I don’t know why n
must be positive…
(a/chan 0 (map str)) ;; => Assert failed: fixed buffers must have size > 0
Regarding composing channels and xforms… Something like this chan-xf
seems like it might useful:
(defn chan-xf [in xform]
(let [out (a/chan 1 xform)]
(a/pipe in out)))
(defn filter-chan [f chan]
(chan-xf chan (filter f)))
(def fc (filter-chan odd? in-ch))
@rickmoynihan a transducer may produce many output items for a single input, that’s why there’s the need for a buffer. When the buffer is a fixed size buffer it will grow beyond its purported max size.
=> (def buf (a/buffer 1))
#'user/buf
=> (-> buf .-buf .size)
0
=> (def c (a/chan buf cat))
#'user/c
=> (a/go (a/>! c (range 10)))
#object[clojure.core.async.impl.channels.ManyToManyChannel 0x29f6aff5 "clojure.core.async.impl.channels.ManyToManyChannel@29f6aff5"]
=> (-> buf .-buf .size)
10
Rich explains this here: https://www.youtube.com/watch?v=4KqUvG8HPYo&feature=youtu.be&t=2870
at 47:55 if the timestamp in the url doesn't work for some reason
@cgrand: ahh of course that makes a lot of sense
@markmarkmark thanks for the talk ref… that’s good to know. Also good to see that though the size isn’t actually fixed, it is a target size
it would be pretty awkward if someone made a buffer that was actually a fixed size
Well transducers were retro fitted… I guess fixed sized buffers made much more sense prior to transducers being added.
Just to be clear with a transducing channel (equipped with a fixed buffer): • if the buffer is already full, the putter is parked • if the buffer becomes full during the transduction of the put element, the buffer is allowed to grow (because we are in nested calls and core async being “just a macro”, we can’t park in the middle of a non-core.async computation)
right, but the issue is if someone is using a custom buffer whose underlying data structure can't grow (e.g. a j.u.c.ArrayBlockingQueue
).
but, that probably doesn't happen often, and would only bite you once
Well it could go unnoticed for a long time as a blocking queue would cause the go block to block thus removing the thread from the thread pool. In the end you may have to wait a long time before reaching thread starvation.
I was imagining something like this, where the assumption is that add*! will only be called when there is room (as the Buffer docstring seems to state) and the non-blocking, throws-an-exception-when-full method add
is used.
I admit, it does seem unlikely that someone would do this.
it's just the first thing I thought of when I was reminded of the quirk of the expanding fixed buffer
I’m really only just starting to use core.async in anger; and I find it curious that there are effectively two forms of composition available. Composition of channels, and composition of transducers. Is this a fair observation? If so, what are the forces that drive one to prefer one form over the other?
I should stay I have an understanding transducers already… so I’m wondering more about the composing channels side of things.
My own experience is that you shouldn’t compose channels. Don’t treat them as seqs. Use them to connect processes together.
transducers can create multiple outputs per step (`mapcat` is the canonical example) and those expanding transducers must have someplace to put the value (ie the buffer)
you can compose xforms with comp
perhaps illuminating: https://clojure.org/reference/transducers ctrl+f "compos" -> 7 results for compose/composition etc. https://clojure.org/news/2013/06/28/clojure-clore-async-channels ctrl +f "compos" -> 0 results
that core.async news item is linked from the core.async repo as "Rationale"
so you can have multiple xforms per channel, and you can use pipe or pipeline or tap, etc to decide where to separate channels. it is an architectural choice how you break things up. I go into more depth on this in Clojure Applied
yes, I’m well aware of comp
…
I think what I’m getting at by channel composition, is the separating channels you mention. i.e. in my case I need to fan out, then apply a filter to each fan.
So by channel composition I really mean taking one of those branches as my input and “composing it” into a new input channel that has the filter applied.
That makes some sense to me… but I’m not sure I grok all your nuance. By composition, I really mean taking an input channel and yielding an output channel with some transformation/process applied… https://clojurians.slack.com/archives/C05423W6H/p1567085983080300?thread_ts=1567074965.050600&cid=C05423W6H
things like mult exist for fanning out
and then you can connect a filter transducer to one out chan if needed
I’m not sure it is illuminating.
The process of tying channels together with processes in between, is effectively what I mean by composition.
i.e. it seems common in core async, for everything to take an in
and out
channel. By channel composition all I mean is tying the ins and outs together.
building channels out of other channels etc.
yeah that’s exactly what I’ve done.
but thanks a lot… I think what you’ve said about deciding where to split channels up and the architectural choices with that, is really what I’m getting at.
there is no one answer there. the tools are available.
I see what you're saying. I guess I just don't like the word compose for channels.
I'm also a very light user of core.async, so I know a bunch of random trivia, but don't know that much about the nitty gritty of making things with core.async that people actually want to use.
Like me a few days ago 🙂