core-async

jimbob 2019-02-05T02:20:09.094100Z

is this an appropriate usage of pipeline?

(a/pipeline
             (.availableProcessors (Runtime/getRuntime))
             (doto (a/chan) (a/close!))
             (purchase-sender-xf purchase)
             (a/to-chan (range num-to-send)))

jimbob 2019-02-05T02:20:34.094700Z

where prchase-sender-xf is a transducer that transforms maps and lastly sends them to SQS?

jimbob 2019-02-05T02:20:53.095100Z

confused if i should use pipeline-blockign instead or pipeline async

2019-02-05T02:47:53.095700Z

if you aren't going to actually do anything with the results, I don't think pipeline is appropriate

2019-02-05T17:02:57.099500Z

that isn't a particularly high quality blog post

2019-02-05T17:07:28.101400Z

pipeline (not pipeline-async or pipeline-blocking) does everything on the go block threadpool, which by default is limited to size 8

2019-02-05T17:08:20.101600Z

doing blocking io on the go block threadbool is a bad idea because it means other go blocks cannot run on threadpool while you are blocked on io

jimbob 2019-02-05T19:20:51.101900Z

ah that makes sense

jimbob 2019-02-05T20:54:05.107300Z

can still have it in a go block if the go block dispatches a thread and does a parking take on the thread that is doing the io?

jimbob 2019-02-05T20:54:29.107500Z

i suppose that makes sense, considering go blocks are basically FSM and its then parked the go block on that thread so it can be reallocated elsewhere

2019-02-05T02:48:23.096200Z

limiting pipelines to .available processors doesn't do what you would expect

2019-02-05T02:48:41.096600Z

doing io as part of a pipeline or an xform on a channel is a bad idea

2019-02-05T02:49:18.097Z

you likely just want an executor

jimbob 2019-02-05T06:14:39.098700Z

ok. thanks for the answers, I really do very much appreciate it, but i suppose if there are any explanations or articles that would be preferable. for example. My job is to load test an application say. this doesn’t need to be extremely robust just run once every day or so.. i just want to be able to send transactions generated to a queue quickly.

jimbob 2019-02-05T06:16:23.098800Z

mostly just following; https://tech.grammarly.com/blog/building-etl-pipelines-with-clojure

2019-02-05T17:04:39.101100Z

assuming you are using core.async go blocks in your app, and you are in one of those when you went to send something to the sqs queue I would do it like (async/<! (async/thread (do-sqs-io-stuff)))

jimbob 2019-02-05T19:21:17.102800Z

for transducers i want to lazy have purchase maps in memory and to parallelize sending them to sqs

jimbob 2019-02-05T19:47:16.103700Z

rather: all i want to do is lazy evaluate batches of 10 and then send each batch of 10 to sqs in parallel

jimbob 2019-02-05T19:49:27.104700Z

so something like:

(pmap batch-send-txn (map (comp (map (partial mk-event)) (partition-all 10))) (repeat num-to-send {}))

jimbob 2019-02-05T19:54:28.105400Z

transduce works:

(transduce (comp (map load.test-generation/make-purchase-event) (take 10)) conj [{}])
=> 
[list of objects]

jimbob 2019-02-05T19:54:32.105600Z

but map does not

jimbob 2019-02-05T20:20:43.106300Z

i just want to clean this up. its doing so many pass throughs and intermediate steps

jimbob 2019-02-05T20:20:51.106500Z

(->>  (repeat 100 {})
                            (map make-purchase-event)
                            (partition-all 10)
                            (pmap batch-send-txn))

2019-02-05T22:02:04.107900Z

why the limit of 10?

2019-02-05T22:05:07.109300Z

if you really must manually limit it to 10 requests in flight at once I would do something like

(def exec (Executors/newFixedThreadpool 10))

(async/go
  (let [ch (async/chan)]
    (.execute exec (fn []
                     (try
                       (do-whatever-io)
                       (finally
                         (async/close! ch)))))
    (async/<! ch)))

2019-02-05T22:05:29.109800Z

I forget the name of the static factory method on Executors, so I likely got it slightly wrong

2019-02-05T22:06:29.110800Z

the other problem, if you don't care about result values, is pmap and pipeline both work strictly in order, produce outputs in an order that matches inputs, with if you don't care about outputs is limiting

2019-02-05T22:07:30.111800Z

pmap is terrible because it hides a lot of stuff implicit stuff, that you almost always won't exposed and explicit

2019-02-05T22:11:43.113900Z

but in general, either your app is cpu limited or io limited, if it is cpu limited, then limiting the threads to do io doesn't matter, if it is io limitted, why create an artificial io limit as well? you either have threads in the os schedulers queue, or callbacks in the executors queue

2019-02-05T22:12:04.114400Z

so just do (async/<! (async/thread (do-blocking-io)))

jimbob 2019-02-05T22:16:24.115400Z

limit 10 is the sqs limit per call to sendBatchMessage. not limiting to 10 n flight at one, just 10 messages in flight for one API call

2019-02-05T22:51:19.117Z

the problem with partition and a transducer is partition has issues when used concurrently, and it will block until you get the partition number or the channel is closed at the end

2019-02-05T22:51:54.117800Z

typically you would want something that, for example, groups requests 10 at a time, but if no new request comes in some time period, passes through the group regardless of the size

jimbob 2019-02-05T23:52:35.118700Z

ah i see. yes right now i’m using pmap, which im realizing after taking with some folks, is not great for io, but rather excels at lazy ordered cpu intensive parallelization.

jimbob 2019-02-05T23:53:44.120100Z

I’d much prefer a separate thread per io op, so was considering doing basically exactly what you described, however not sure i need a go block for it. i dont see much downside however besides overhead

jimbob 2019-02-05T23:55:27.122100Z

i definitely want more fine tuned control over parallelization than what pmap gives me. i’ll likely instead just dispatch futures and map over them all after looping and creating n futures, one for each io and then afterwards aggregating the futures with a kind of (map @deref futures) to wait until they are all realized.

jimbob 2019-02-05T23:56:14.122700Z

was thinking either clojure async futures, or claypoole type futures here: https://github.com/TheClimateCorporation/claypoole

jimbob 2019-02-05T23:57:10.124Z

using java api interop for sqs so dont really want to have to add a bunch of interop code to deal with async aws clients and callbacks with java futures

jimbob 2019-02-05T23:57:21.124200Z

though that still might be best.

jimbob 2019-02-05T23:58:27.124300Z

partition-all can include fewer than n items

jimbob 2019-02-05T23:58:45.124500Z

ya that makes sense. i do not care about order at all. but do carea bout aggregating output