core-async

2019-08-29T10:18:59.047500Z

I’m curious what the reasoning for this (in the a/chan docstring) is: > If a transducer is supplied a buffer must be specified. What property of transducers requires there to be a buffer? Why can’t I have a channel that coordinates in lock-step producers and consumers with no buffering and transforms a value?

2019-08-29T10:36:05.050600Z

Second question: What is the best way to produce a filtered channel of results? I was assuming I’d use a transducer: Perhaps 1:

(defn filter-chan [f in out]
    (let [out (a/chan)]
      (a/pipeline 1 out (filter f) in)
      out))
Or 2:
(defn filter-chan [f in out]
    (let [filtered-in (a/chan 1 (filter f))]
      (a/pipe in filtered-in)
      (a/pipe filtered-in out)
      out))
I guess I’m surprised core.async doesn’t appear to have more conveniences for composing channels with xforms… Or am I missing something?

schmee 2019-08-29T10:36:31.051Z

I suppose that the buffer/transducer thing is a consequence of not having arity-overloading in Clojure, if you want no buffering then you can pass (buffer 1) as a buffer

2019-08-29T10:37:55.051800Z

(buffer 1) isn’t no buffering… but good point, perhaps (buffer 0) works… will try. Seems too. In which case I don’t know why n must be positive… (a/chan 0 (map str)) ;; => Assert failed: fixed buffers must have size > 0

2019-08-29T10:47:15.053200Z

Regarding composing channels and xforms… Something like this chan-xf seems like it might useful:

2019-08-29T10:47:16.053400Z

(defn chan-xf [in xform]
    (let [out (a/chan 1 xform)]
      (a/pipe in out)))

  (defn filter-chan [f chan]
    (chan-xf chan (filter f)))

  (def fc (filter-chan odd? in-ch))

cgrand 2019-08-29T10:55:26.055200Z

@rickmoynihan a transducer may produce many output items for a single input, that’s why there’s the need for a buffer. When the buffer is a fixed size buffer it will grow beyond its purported max size.

cgrand 2019-08-29T10:57:33.055500Z

=> (def buf (a/buffer 1))
#'user/buf
=> (-> buf .-buf .size)
0
=> (def c (a/chan buf cat))
#'user/c
=> (a/go (a/>! c (range 10)))
#object[clojure.core.async.impl.channels.ManyToManyChannel 0x29f6aff5 "clojure.core.async.impl.channels.ManyToManyChannel@29f6aff5"]
=> (-> buf .-buf .size)
10

markmarkmark 2019-08-29T10:59:22.056100Z

Rich explains this here: https://www.youtube.com/watch?v=4KqUvG8HPYo&feature=youtu.be&t=2870

markmarkmark 2019-08-29T10:59:42.056500Z

at 47:55 if the timestamp in the url doesn't work for some reason

2019-08-29T11:57:22.056800Z

@cgrand: ahh of course that makes a lot of sense

2019-08-29T12:02:58.057900Z

@markmarkmark thanks for the talk ref… that’s good to know. Also good to see that though the size isn’t actually fixed, it is a target size

markmarkmark 2019-08-29T12:05:33.058800Z

it would be pretty awkward if someone made a buffer that was actually a fixed size

2019-08-29T12:06:46.060100Z

Well transducers were retro fitted… I guess fixed sized buffers made much more sense prior to transducers being added.

cgrand 2019-08-29T12:38:50.063600Z

Just to be clear with a transducing channel (equipped with a fixed buffer): • if the buffer is already full, the putter is parked • if the buffer becomes full during the transduction of the put element, the buffer is allowed to grow (because we are in nested calls and core async being “just a macro”, we can’t park in the middle of a non-core.async computation)

👍 1
markmarkmark 2019-08-29T12:43:16.065800Z

right, but the issue is if someone is using a custom buffer whose underlying data structure can't grow (e.g. a j.u.c.ArrayBlockingQueue).

markmarkmark 2019-08-29T12:43:36.066200Z

but, that probably doesn't happen often, and would only bite you once

cgrand 2019-08-29T12:54:05.068600Z

Well it could go unnoticed for a long time as a blocking queue would cause the go block to block thus removing the thread from the thread pool. In the end you may have to wait a long time before reaching thread starvation.

markmarkmark 2019-08-29T13:01:32.070300Z

I was imagining something like this, where the assumption is that add*! will only be called when there is room (as the Buffer docstring seems to state) and the non-blocking, throws-an-exception-when-full method add is used.

markmarkmark 2019-08-29T13:02:03.070900Z

I admit, it does seem unlikely that someone would do this.

markmarkmark 2019-08-29T13:02:18.071400Z

it's just the first thing I thought of when I was reminded of the quirk of the expanding fixed buffer

2019-08-29T13:03:51.072500Z

I’m really only just starting to use core.async in anger; and I find it curious that there are effectively two forms of composition available. Composition of channels, and composition of transducers. Is this a fair observation? If so, what are the forces that drive one to prefer one form over the other?

2019-08-29T13:07:15.074Z

I should stay I have an understanding transducers already… so I’m wondering more about the composing channels side of things.

cgrand 2019-08-29T13:25:24.075500Z

My own experience is that you shouldn’t compose channels. Don’t treat them as seqs. Use them to connect processes together.

alexmiller 2019-08-29T13:33:18.078300Z

transducers can create multiple outputs per step (`mapcat` is the canonical example) and those expanding transducers must have someplace to put the value (ie the buffer)

👍 1
alexmiller 2019-08-29T13:34:08.079200Z

you can compose xforms with comp

markmarkmark 2019-08-29T13:34:17.079500Z

perhaps illuminating: https://clojure.org/reference/transducers ctrl+f "compos" -> 7 results for compose/composition etc. https://clojure.org/news/2013/06/28/clojure-clore-async-channels ctrl +f "compos" -> 0 results

markmarkmark 2019-08-29T13:35:01.080Z

that core.async news item is linked from the core.async repo as "Rationale"

alexmiller 2019-08-29T13:35:27.080100Z

so you can have multiple xforms per channel, and you can use pipe or pipeline or tap, etc to decide where to separate channels. it is an architectural choice how you break things up. I go into more depth on this in Clojure Applied

2019-08-29T13:39:43.080300Z

yes, I’m well aware of comp… I think what I’m getting at by channel composition, is the separating channels you mention. i.e. in my case I need to fan out, then apply a filter to each fan. So by channel composition I really mean taking one of those branches as my input and “composing it” into a new input channel that has the filter applied.

2019-08-29T13:41:06.080900Z

That makes some sense to me… but I’m not sure I grok all your nuance. By composition, I really mean taking an input channel and yielding an output channel with some transformation/process applied… https://clojurians.slack.com/archives/C05423W6H/p1567085983080300?thread_ts=1567074965.050600&cid=C05423W6H

alexmiller 2019-08-29T13:43:46.081100Z

things like mult exist for fanning out

alexmiller 2019-08-29T13:44:11.081300Z

and then you can connect a filter transducer to one out chan if needed

2019-08-29T13:44:52.081500Z

I’m not sure it is illuminating. The process of tying channels together with processes in between, is effectively what I mean by composition. i.e. it seems common in core async, for everything to take an in and out channel. By channel composition all I mean is tying the ins and outs together.

2019-08-29T13:45:39.081700Z

building channels out of other channels etc.

2019-08-29T13:46:04.081900Z

yeah that’s exactly what I’ve done.

2019-08-29T13:49:55.082100Z

but thanks a lot… I think what you’ve said about deciding where to split channels up and the architectural choices with that, is really what I’m getting at.

alexmiller 2019-08-29T13:54:01.082300Z

there is no one answer there. the tools are available.

markmarkmark 2019-08-29T14:06:13.082500Z

I see what you're saying. I guess I just don't like the word compose for channels.

markmarkmark 2019-08-29T14:06:56.082700Z

I'm also a very light user of core.async, so I know a bunch of random trivia, but don't know that much about the nitty gritty of making things with core.async that people actually want to use.

2019-08-29T14:08:48.082900Z

Like me a few days ago 🙂