core-async

kenny 2020-02-05T18:49:58.334400Z

Why does to-chan bound the returned channel's buffer size to 100?

2020-02-05T18:55:38.335900Z

it will perform better the larger the buffer because the more of the collection can just be dumped into the channel you don't need an go blocks

2020-02-05T18:55:47.336200Z

but you might pass in an infinite seq

2020-02-05T18:56:12.336800Z

so it uses bounded-count, for small collections you will get a buffer size that matches the collection count, for larger collections you will get a buffer of 100

kenny 2020-02-05T18:58:36.336900Z

Would it performing better with a larger buffer mean it should not bound the buffer size?

2020-02-05T18:59:24.337100Z

no

kenny 2020-02-05T19:00:41.337300Z

What do you mean by this "the more of the collection can just be dumped into the channel you don't need an go blocks"?

2020-02-05T19:00:48.337500Z

unbounded buffers behave very badly (and core.async doesn't let you have them out of the box)

kenny 2020-02-05T19:10:05.340700Z

Perhaps let me ask a more direct question to my problem... I have a collection of maps which describe HTTP requests. The collection typically varies in size from 10ish to a few thousand maps. I need to issue these requests with a maximum parallelism of ~10 to prevent API throttling as much as possible. I was thinking I'd do something like onto-chan to collection of requests and then call pipeline-async with the parallelism set to 10.

kenny 2020-02-05T19:11:51.341600Z

Should I use onto-chan or to-chan for initializing this pipeline given the expected size of the collection?

kenny 2020-02-05T19:12:38.341800Z

Something like this

(defn foo
  [{:keys [parallelism]} f coll]
  (let [from-ch (async/chan (count coll))
        to-ch (async/chan (count coll))]
    ;; place coll onto the from chan
    (async/onto-chan from-ch coll)

    (async/pipeline-async
      parallelism
      to-ch
      (fn [coll-item result-ch]
        (async/go
          (let [result (async/<! (f coll-item))]
            (async/>! result-ch result)
            (async/close! result-ch))))
      from-ch)

    (async/reduce
      (fn [acc x] (conj acc x))
      [] to-ch)))

2020-02-05T19:13:06.341900Z

a channel operations (taking and putting) are synchronization points between processes. the timeline of a process putting to a channel has to sync up at some point with the timeline of a process taking from that channel.

2020-02-05T19:14:08.342100Z

but exactly synchronizing can be expensive in terms of processes waiting around for each other

2020-02-05T19:16:11.342300Z

but if you don't force some level of synchronization between takers and putters, you can easily have putters producing stuff faster than takers take which will result in filling up your memory with stuff

2020-02-05T19:18:02.342500Z

so a bounded buffer lets you operate on a channel without having to wait to perfectly synchronize processes, but the bound limits how out of sync they can get

ghadi 2020-02-05T19:18:26.343100Z

buffer size of the source channel doesn't matter

ghadi 2020-02-05T19:18:40.343500Z

you already have all the inputs in hand

kenny 2020-02-05T19:19:00.344100Z

Oh, right.

ghadi 2020-02-05T19:19:08.344300Z

so use to-chan and pass it as the input to pipeline

2020-02-05T19:19:35.344900Z

so for a small collection, the process (created by to-chan) that is feeding the collection in to the channel can quickly dump the contents of the collection into the channel, and exit never having to run again, without having to wait to synchronize with some taker from the channel

ghadi 2020-02-05T19:19:53.345500Z

the buffer size of a channel affects how far ahead a producer can get in front of a consumer

ghadi 2020-02-05T19:20:06.346Z

in this case it doesn't matter

1
ghadi 2020-02-05T19:20:48.347300Z

btw (async/reduce ... conj) === (async/into)

kenny 2020-02-05T19:21:00.347500Z

Next question... I'm testing out the above code and it appears 4 calls to af are made even though I set parallelism to 1. Is there some caveats with how pipeline-async works?

kenny 2020-02-05T19:21:08.347600Z

Oh, nice!

2020-02-05T19:21:14.347900Z

yes

2020-02-05T19:21:27.348200Z

pipeline-async is almost never what you want

kenny 2020-02-05T19:22:24.349100Z

Why? Is there some doc on this? I tried googling and there doesn't appear to be anything on how to choose between the 3 pipeline functions.

2020-02-05T19:23:58.350600Z

pipeline-async's parallelism limit behaves very weirdly

ghadi 2020-02-05T19:24:28.351500Z

4 calls to af were made, but how many inputs did you pass in?

2020-02-05T19:24:36.351600Z

in generally because of the internals of pipeline the parallelism number is off by 2

kenny 2020-02-05T19:24:36.351700Z

10

kenny 2020-02-05T19:24:40.351900Z

(let [start-ms (System/currentTimeMillis)
        elapsed-s (fn []
                    (- (System/currentTimeMillis) start-ms))]
    (foo {:parallelism 1}
          (fn [n]
            (async/go
              (print (str (elapsed-s) " - send - " n " \n"))
              (flush)
              (async/<! (async/timeout (* n 1000)))
              n))
          (range 10)))

ghadi 2020-02-05T19:25:17.352500Z

that doesn''t show pipeline

kenny 2020-02-05T19:25:31.352700Z

Calls this fn https://clojurians.slack.com/archives/C05423W6H/p1580929958341800

2020-02-05T19:25:48.353500Z

but it is sort of only off by 2 from a certain point of view

kenny 2020-02-05T19:27:01.354900Z

Hmm, ok. I don't think it really matters for my particular case. I'm more interested in how I'm supposed to choose the pipeline function. I'm issuing HTTP requests. The client I'm using returns a core async channel. The underlying operation is blocking but my interface is not.

2020-02-05T19:28:20.355600Z

you may as well just use pipeline and call f and take the result with <!!

2020-02-05T19:28:56.355900Z

do you actually care about the order your http calls are being made in?

kenny 2020-02-05T19:29:24.356100Z

No

2020-02-05T19:29:40.356700Z

because the pipelines preserve order

Ben Sless 2020-02-09T11:23:29.369100Z

I was also bothered by the pipeline being blocked by the slowest task. Your recommendation is to just use a thread pool? The pipeline abstraction is very comfortable and simplifies architecture. Thread pools / executors less so

kenny 2020-02-05T19:30:14.357300Z

If I'm taking using <!!, what thread is that blocking? Perhaps the question is where does xf run?

alexmiller 2020-02-05T19:30:46.357800Z

there are multiple answers

alexmiller 2020-02-05T19:31:06.358200Z

the xf may be run in either the consumer (<!!) or producer

alexmiller 2020-02-05T19:31:16.358400Z

<!! blocks the thread that runs it

kenny 2020-02-05T19:31:27.358500Z

In this case I only care about the entire result set -- not streaming results. If I was "streaming" results, is there another pipeline function that does not preserve order?

alexmiller 2020-02-05T19:31:46.358800Z

no

alexmiller 2020-02-05T19:32:09.359Z

but you could just farm it out to an executor/thread pool/future in that case

2020-02-05T19:33:11.359600Z

but pipeline and pipeline-blocking both run the xf on a dedicated thread

kenny 2020-02-05T19:34:56.360800Z

The docstrings on the pipeline fns always confuse me. > If you have multiple blocking operations to put in flight, use pipeline-blocking instead, If you have multiple asynchronous operations to put in flight, use pipeline-async instead. Naively it sounds line pipeline-blocking is a better fit when using <!! @hiredman. Why do you suggest pipeline?

2020-02-05T19:37:40.361700Z

pipeline-blocking is maybe more future proof, but pipeline currently runs the exact same code as pipeline-blocking

2020-02-05T19:40:37.362700Z

something like

(defn g [n f requests output]
  (let [tokens (async/chan n)
        complete (async/chan n)]
    (dotimes [i n]
      (async/put! tokens true))
    (doseq [r requests]
      (async/go
        (async/&lt;! tokens)
        (async/&gt;! output (async/&lt;! (f r)))
        (async/&gt;! complete)))
    (async/go-loop [x n]
      (async/&lt;! complete)
      (if (zero? n)
        (async/close! output)
        (do
          (async/&gt;! tokens true)
          (recur (dec n)))))))
will limit the number of requests in flight, but run them out of order in case request 0 is really slow for some reason. (there might be an off by one error in there)

2020-02-05T19:42:58.362900Z

oh, even worse

2020-02-05T19:43:08.363200Z

the references to n in that last loop should be x

kenny 2020-02-05T19:45:12.363300Z

How many of these dedicated threads are there?

kenny 2020-02-05T19:45:54.363500Z

This was for disregarding order?

2020-02-05T19:47:03.363700Z

yes, it will run the requests in whatever order and you will get results in whatever order

kenny 2020-02-05T19:47:21.363900Z

The reason to not farm it out to a executor being to stick within the core.async paradigm?

alexmiller 2020-02-05T19:48:01.364100Z

you could still have your executor tasks throw stuff on a channel

2020-02-05T19:48:10.364300Z

the parallelism passed in number

alexmiller 2020-02-05T19:48:25.364500Z

these are compatible things

➕ 1
kenny 2020-02-05T19:52:48.364800Z

Perhaps I'm struggling to figure out when I should throw these operations onto an actual thread versus simply continuing to do everything within go blocks.

kenny 2020-02-05T20:23:27.365Z

Doesn't this kind of defeat the purpose of using go blocks?

2020-02-05T21:06:45.365200Z

it is a whole thing

kenny 2020-02-05T21:13:27.365700Z

I see. This seems very strange to me.