Why does to-chan
bound the returned channel's buffer size to 100?
it will perform better the larger the buffer because the more of the collection can just be dumped into the channel you don't need an go blocks
but you might pass in an infinite seq
so it uses bounded-count, for small collections you will get a buffer size that matches the collection count, for larger collections you will get a buffer of 100
Would it performing better with a larger buffer mean it should not bound the buffer size?
no
What do you mean by this "the more of the collection can just be dumped into the channel you don't need an go blocks"?
unbounded buffers behave very badly (and core.async doesn't let you have them out of the box)
Perhaps let me ask a more direct question to my problem... I have a collection of maps which describe HTTP requests. The collection typically varies in size from 10ish to a few thousand maps. I need to issue these requests with a maximum parallelism of ~10 to prevent API throttling as much as possible. I was thinking I'd do something like onto-chan
to collection of requests and then call pipeline-async
with the parallelism set to 10.
Should I use onto-chan
or to-chan
for initializing this pipeline given the expected size of the collection?
Something like this
(defn foo
[{:keys [parallelism]} f coll]
(let [from-ch (async/chan (count coll))
to-ch (async/chan (count coll))]
;; place coll onto the from chan
(async/onto-chan from-ch coll)
(async/pipeline-async
parallelism
to-ch
(fn [coll-item result-ch]
(async/go
(let [result (async/<! (f coll-item))]
(async/>! result-ch result)
(async/close! result-ch))))
from-ch)
(async/reduce
(fn [acc x] (conj acc x))
[] to-ch)))
a channel operations (taking and putting) are synchronization points between processes. the timeline of a process putting to a channel has to sync up at some point with the timeline of a process taking from that channel.
but exactly synchronizing can be expensive in terms of processes waiting around for each other
but if you don't force some level of synchronization between takers and putters, you can easily have putters producing stuff faster than takers take which will result in filling up your memory with stuff
so a bounded buffer lets you operate on a channel without having to wait to perfectly synchronize processes, but the bound limits how out of sync they can get
buffer size of the source channel doesn't matter
you already have all the inputs in hand
Oh, right.
so use to-chan
and pass it as the input to pipeline
so for a small collection, the process (created by to-chan) that is feeding the collection in to the channel can quickly dump the contents of the collection into the channel, and exit never having to run again, without having to wait to synchronize with some taker from the channel
the buffer size of a channel affects how far ahead a producer can get in front of a consumer
in this case it doesn't matter
btw (async/reduce ... conj) === (async/into)
Next question... I'm testing out the above code and it appears 4 calls to af
are made even though I set parallelism to 1. Is there some caveats with how pipeline-async works?
Oh, nice!
yes
pipeline-async is almost never what you want
Why? Is there some doc on this? I tried googling and there doesn't appear to be anything on how to choose between the 3 pipeline functions.
pipeline-async's parallelism limit behaves very weirdly
4 calls to af
were made, but how many inputs did you pass in?
in generally because of the internals of pipeline the parallelism number is off by 2
10
(let [start-ms (System/currentTimeMillis)
elapsed-s (fn []
(- (System/currentTimeMillis) start-ms))]
(foo {:parallelism 1}
(fn [n]
(async/go
(print (str (elapsed-s) " - send - " n " \n"))
(flush)
(async/<! (async/timeout (* n 1000)))
n))
(range 10)))
that doesn''t show pipeline
Calls this fn https://clojurians.slack.com/archives/C05423W6H/p1580929958341800
but it is sort of only off by 2 from a certain point of view
Hmm, ok. I don't think it really matters for my particular case. I'm more interested in how I'm supposed to choose the pipeline function. I'm issuing HTTP requests. The client I'm using returns a core async channel. The underlying operation is blocking but my interface is not.
you may as well just use pipeline and call f and take the result with <!!
do you actually care about the order your http calls are being made in?
No
because the pipelines preserve order
I was also bothered by the pipeline being blocked by the slowest task. Your recommendation is to just use a thread pool? The pipeline abstraction is very comfortable and simplifies architecture. Thread pools / executors less so
If I'm taking using <!!, what thread is that blocking? Perhaps the question is where does xf run?
there are multiple answers
the xf may be run in either the consumer (<!!) or producer
<!! blocks the thread that runs it
In this case I only care about the entire result set -- not streaming results. If I was "streaming" results, is there another pipeline function that does not preserve order?
no
but you could just farm it out to an executor/thread pool/future in that case
but pipeline and pipeline-blocking both run the xf on a dedicated thread
The docstrings on the pipeline fns always confuse me. > If you have multiple blocking operations to put in flight, use pipeline-blocking instead, If you have multiple asynchronous operations to put in flight, use pipeline-async instead. Naively it sounds line pipeline-blocking is a better fit when using <!! @hiredman. Why do you suggest pipeline?
pipeline-blocking is maybe more future proof, but pipeline currently runs the exact same code as pipeline-blocking
something like
(defn g [n f requests output]
(let [tokens (async/chan n)
complete (async/chan n)]
(dotimes [i n]
(async/put! tokens true))
(doseq [r requests]
(async/go
(async/<! tokens)
(async/>! output (async/<! (f r)))
(async/>! complete)))
(async/go-loop [x n]
(async/<! complete)
(if (zero? n)
(async/close! output)
(do
(async/>! tokens true)
(recur (dec n)))))))
will limit the number of requests in flight, but run them out of order in case request 0 is really slow for some reason. (there might be an off by one error in there)oh, even worse
the references to n in that last loop should be x
How many of these dedicated threads are there?
This was for disregarding order?
yes, it will run the requests in whatever order and you will get results in whatever order
The reason to not farm it out to a executor being to stick within the core.async paradigm?
you could still have your executor tasks throw stuff on a channel
the parallelism passed in number
these are compatible things
Perhaps I'm struggling to figure out when I should throw these operations onto an actual thread versus simply continuing to do everything within go blocks.
Doesn't this kind of defeat the purpose of using go blocks?
it is a whole thing
https://ask.clojure.org/index.php/8805/where-is-the-blocking-operation
I see. This seems very strange to me.