is this an appropriate usage of pipeline?
(a/pipeline
(.availableProcessors (Runtime/getRuntime))
(doto (a/chan) (a/close!))
(purchase-sender-xf purchase)
(a/to-chan (range num-to-send)))
where prchase-sender-xf is a transducer that transforms maps and lastly sends them to SQS?
confused if i should use pipeline-blockign instead or pipeline async
if you aren't going to actually do anything with the results, I don't think pipeline is appropriate
that isn't a particularly high quality blog post
pipeline (not pipeline-async or pipeline-blocking) does everything on the go block threadpool, which by default is limited to size 8
doing blocking io on the go block threadbool is a bad idea because it means other go blocks cannot run on threadpool while you are blocked on io
ah that makes sense
can still have it in a go block if the go block dispatches a thread and does a parking take on the thread that is doing the io?
i suppose that makes sense, considering go blocks are basically FSM and its then parked the go block on that thread so it can be reallocated elsewhere
limiting pipelines to .available processors doesn't do what you would expect
doing io as part of a pipeline or an xform on a channel is a bad idea
you likely just want an executor
ok. thanks for the answers, I really do very much appreciate it, but i suppose if there are any explanations or articles that would be preferable. for example. My job is to load test an application say. this doesn’t need to be extremely robust just run once every day or so.. i just want to be able to send transactions generated to a queue quickly.
mostly just following; https://tech.grammarly.com/blog/building-etl-pipelines-with-clojure
assuming you are using core.async go blocks in your app, and you are in one of those when you went to send something to the sqs queue I would do it like (async/<! (async/thread (do-sqs-io-stuff)))
for transducers i want to lazy have purchase maps in memory and to parallelize sending them to sqs
rather: all i want to do is lazy evaluate batches of 10 and then send each batch of 10 to sqs in parallel
so something like:
(pmap batch-send-txn (map (comp (map (partial mk-event)) (partition-all 10))) (repeat num-to-send {}))
transduce works:
(transduce (comp (map load.test-generation/make-purchase-event) (take 10)) conj [{}])
=>
[list of objects]
but map does not
i just want to clean this up. its doing so many pass throughs and intermediate steps
(->> (repeat 100 {})
(map make-purchase-event)
(partition-all 10)
(pmap batch-send-txn))
why the limit of 10?
if you really must manually limit it to 10 requests in flight at once I would do something like
(def exec (Executors/newFixedThreadpool 10))
(async/go
(let [ch (async/chan)]
(.execute exec (fn []
(try
(do-whatever-io)
(finally
(async/close! ch)))))
(async/<! ch)))
I forget the name of the static factory method on Executors, so I likely got it slightly wrong
the other problem, if you don't care about result values, is pmap and pipeline both work strictly in order, produce outputs in an order that matches inputs, with if you don't care about outputs is limiting
pmap is terrible because it hides a lot of stuff implicit stuff, that you almost always won't exposed and explicit
but in general, either your app is cpu limited or io limited, if it is cpu limited, then limiting the threads to do io doesn't matter, if it is io limitted, why create an artificial io limit as well? you either have threads in the os schedulers queue, or callbacks in the executors queue
so just do (async/<! (async/thread (do-blocking-io)))
limit 10 is the sqs limit per call to sendBatchMessage. not limiting to 10 n flight at one, just 10 messages in flight for one API call
the problem with partition and a transducer is partition has issues when used concurrently, and it will block until you get the partition number or the channel is closed at the end
typically you would want something that, for example, groups requests 10 at a time, but if no new request comes in some time period, passes through the group regardless of the size
ah i see. yes right now i’m using pmap, which im realizing after taking with some folks, is not great for io, but rather excels at lazy ordered cpu intensive parallelization.
I’d much prefer a separate thread per io op, so was considering doing basically exactly what you described, however not sure i need a go block for it. i dont see much downside however besides overhead
i definitely want more fine tuned control over parallelization than what pmap gives me. i’ll likely instead just dispatch futures and map over them all after looping and creating n futures, one for each io and then afterwards aggregating the futures with a kind of (map @deref futures) to wait until they are all realized.
was thinking either clojure async futures, or claypoole type futures here: https://github.com/TheClimateCorporation/claypoole
using java api interop for sqs so dont really want to have to add a bunch of interop code to deal with async aws clients and callbacks with java futures
though that still might be best.
partition-all can include fewer than n items
ya that makes sense. i do not care about order at all. but do carea bout aggregating output