@dm3 @ehashman Thank you very much for the hints and your continuous help!
I again read the mentioned link (which, together with the codox documentation and the repository doc folder), is basically the only documentation I know of), and adapted my example with the (s/stream 10)
. But it doesn’t change the result, and I believe I do not understand the concept, when and where callbacks are performed. I suspect that my sleep
function does not get assigned to the pool
thread pool via stream/map
, or stream/consume
plays a role.
can you paste your whole program now?
Sure, thank you.
and what do you expect should happen in the above?
1. Sending … and … finished. appear immediately (i.e., a batch of put!
s do not block)
2. After one second, I see 10 Slept 1000 ms
I suspect the stream/consume
blocks the pool.
hm, it works as I’d expect locally (with 0.1.7-alpha)
puts return immediately when x
is buffered - (s/stream 10)
then sleep
gets called every second 10 times
Well, expectation 1 was met 🙂
I’ll try with the alpha right now…
Nope, not for me. Clojure 1.8 or 1.9 alpha?
boot.user=> (def a (s/stream 10))
#'boot.user/a
boot.user=> (def pool (manifold.executor/fixed-thread-executor 10 {:initial-thread-count 10}))
#'boot.user/pool
boot.user=> (s/consume #(println "Got" %) (s/map sleep (s/onto pool a)))
<< … >>
boot.user=> (do (println "Started" (java.util.Date.)) (dotimes [_ 10] @(s/put! a 1000)) (println "Done" (java.util.Date.)))
I mean manifold 0.1.7-alpha5
I know. I tried manifold 0.1.7 alpha, but see “my” behaviour. That’s why I also asked for the Clojure version. I will try with your version. Right now, I am a leiningen, no boot, user. Just a sec.
clojure/lein/boot don’t matter
I know / hope. But I want to try your exact snippet.
also the behaviour should be consistent between Manifold 0.1.5-0.1.7
I try it out, I need to head out for lunch. Sorry, this is not very respectful from me.
no problem 🙂 it’s asynchronous
😂 Yes, that’s truly asynchronous.
I think I should re-phrase expectation 2: I expect that after 1 second I see all 10 Got or Slept … at once, as those 10 sleeps get executed in parallel.
Instead, I see every second a Got and Slept …, which in total takes 10 seconds, as they run sequentially.
yes, that’s how the stream is consumed - sequentially
there’s no parallel-consume
in manifold
My assumption was that when the stream is consumed, the sleep get’s pushed to the worker pool, immediately fetching the next, putting in the pool etc.
no, although I can see how that would be a valid assumption
Okay, then I should build a loop that take!
s from the stream, create a manifold.deferred
pool (which I tried a while ago, where you helped me, too.)
I’m not sure how you want to signal backpressure to the source
you can have a token system where you take!
as many times as you have free workers
Yes, this is why I initially went to manifold
. I assumed the stream would block, when the thread-pool is full, and therefore couldn’t consume
immediately.
So I completely misunderstood the idea of the executor on the stream. I must admit, I don’t see a reason at all right now for it.
it’s there to control where the execution happens
Should I put a deferred (like (future sleep)
) onto the stream?
one sec
Sure.
consume-parallel is trickier than I thought 🙂
I think I will just move back to core.async/pipeline-blocking
.
what’s your end goal? process items from the stream in parallel?
or partition the stream into N streams?
Exactly. I have a function a that gets called via a web service irregularly, which returns a value. This value has to be processed with another function b that triggers an HTTP request and some computation. I want to decouple a from b. My idea is that a feeds that value into a stream, and at the sink a worker pool performs b in parallel, and putting the result onto another stream.
So your first suggestion.
Interesting. (s/consume)
is definitely the “blocker” here. Even if I map a future on the stream, deref
ing the future blocks the rest. If I consume, but do not deref
, everything runs in parallel.
I couldn’t figure out how to make the parallelization work on the onto
thread pool
actually I have something 🙂
(require '[manifold.stream :as s])
(require '[manifold.deferred :as d])
(require '[manifold.executor :as ex])
(defn round-robin-fork
[src n]
(let [dsts (take n (repeatedly s/stream))
^java.util.concurrent.BlockingQueue ready
(doto (java.util.concurrent.ArrayBlockingQueue. n)
(.addAll dsts))
free-up! #(.offer ready %)
next! #(.take ready)
send! #(-> (s/put! %1 %2)
(d/chain
(fn [result]
(if result
(free-up! %1)
(s/close! %1)))))]
(d/loop [dst (.take ready)]
(-> (s/take! src ::none)
(d/chain
(fn [result]
(if (= result ::none)
(doseq [d dsts]
(s/close! d))
(do (future (send! dst result))
(d/chain (next!)
#(d/recur %))))))))
dsts))
;; Test
(def s (s/stream))
(def ss (round-robin-fork s 10))
(doseq [[idx s] (map-indexed vector ss)]
(s/consume
#(let [sleep (rand-int 1000)]
(Thread/sleep sleep)
(println (Thread/currentThread) "DST [" idx "] slept for " sleep " ms, got: " %))
s))
(dotimes [i 20]
(println "putting " i)
@(s/put! s i))
However the work here is performed on the default future
executor
however you could pass a pool
as a third argument
You rock!
and use it in future-with
the name is from the previous iteration, should probably be just fork
also if you do onto
and use the same executor to run the tasks inside fork
- the thing will deadlock somewhere - not sure where