aleph

hansen-pansen 2017-06-08T08:52:04.048025Z

@dm3 @ehashman Thank you very much for the hints and your continuous help! I again read the mentioned link (which, together with the codox documentation and the repository doc folder), is basically the only documentation I know of), and adapted my example with the (s/stream 10). But it doesn’t change the result, and I believe I do not understand the concept, when and where callbacks are performed. I suspect that my sleep function does not get assigned to the pool thread pool via stream/map, or stream/consume plays a role.

dm3 2017-06-08T09:03:44.228286Z

can you paste your whole program now?

hansen-pansen 2017-06-08T09:28:38.600896Z

Sure, thank you.

dm3 2017-06-08T09:31:18.641050Z

and what do you expect should happen in the above?

hansen-pansen 2017-06-08T09:32:46.663077Z

1. Sending … and … finished. appear immediately (i.e., a batch of put!s do not block) 2. After one second, I see 10 Slept 1000 ms

hansen-pansen 2017-06-08T09:35:02.695894Z

I suspect the stream/consume blocks the pool.

dm3 2017-06-08T09:37:26.730418Z

hm, it works as I’d expect locally (with 0.1.7-alpha)

dm3 2017-06-08T09:38:10.741267Z

puts return immediately when x is buffered - (s/stream 10)

dm3 2017-06-08T09:38:23.744546Z

then sleep gets called every second 10 times

hansen-pansen 2017-06-08T09:38:42.749009Z

Well, expectation 1 was met 🙂

hansen-pansen 2017-06-08T09:38:49.750754Z

I’ll try with the alpha right now…

hansen-pansen 2017-06-08T09:39:56.766602Z

Nope, not for me. Clojure 1.8 or 1.9 alpha?

dm3 2017-06-08T09:40:09.769806Z

boot.user=> (def a (s/stream 10))
#'boot.user/a
boot.user=> (def pool (manifold.executor/fixed-thread-executor 10 {:initial-thread-count 10}))
#'boot.user/pool
boot.user=> (s/consume #(println "Got" %) (s/map sleep (s/onto pool a)))
<< … >>
boot.user=> (do (println "Started" (java.util.Date.)) (dotimes [_ 10] @(s/put! a 1000)) (println "Done" (java.util.Date.)))

dm3 2017-06-08T09:40:24.773380Z

I mean manifold 0.1.7-alpha5

hansen-pansen 2017-06-08T09:41:32.789264Z

I know. I tried manifold 0.1.7 alpha, but see “my” behaviour. That’s why I also asked for the Clojure version. I will try with your version. Right now, I am a leiningen, no boot, user. Just a sec.

dm3 2017-06-08T09:41:46.792490Z

clojure/lein/boot don’t matter

hansen-pansen 2017-06-08T09:42:11.798198Z

I know / hope. But I want to try your exact snippet.

dm3 2017-06-08T09:42:11.798297Z

also the behaviour should be consistent between Manifold 0.1.5-0.1.7

hansen-pansen 2017-06-08T09:58:32.035303Z

I try it out, I need to head out for lunch. Sorry, this is not very respectful from me.

dm3 2017-06-08T10:00:45.069191Z

no problem 🙂 it’s asynchronous

hansen-pansen 2017-06-08T12:17:29.775962Z

😂 Yes, that’s truly asynchronous.

hansen-pansen 2017-06-08T12:19:10.799322Z

I think I should re-phrase expectation 2: I expect that after 1 second I see all 10 Got or Slept … at once, as those 10 sleeps get executed in parallel.

hansen-pansen 2017-06-08T12:20:23.816175Z

Instead, I see every second a Got and Slept …, which in total takes 10 seconds, as they run sequentially.

dm3 2017-06-08T12:20:39.820064Z

yes, that’s how the stream is consumed - sequentially

dm3 2017-06-08T12:21:07.826761Z

there’s no parallel-consume in manifold

hansen-pansen 2017-06-08T12:21:45.835637Z

My assumption was that when the stream is consumed, the sleep get’s pushed to the worker pool, immediately fetching the next, putting in the pool etc.

dm3 2017-06-08T12:22:16.843020Z

no, although I can see how that would be a valid assumption

hansen-pansen 2017-06-08T12:23:48.864428Z

Okay, then I should build a loop that take!s from the stream, create a manifold.deferred pool (which I tried a while ago, where you helped me, too.)

dm3 2017-06-08T12:24:59.881721Z

I’m not sure how you want to signal backpressure to the source

dm3 2017-06-08T12:25:25.887633Z

you can have a token system where you take! as many times as you have free workers

hansen-pansen 2017-06-08T12:26:03.896658Z

Yes, this is why I initially went to manifold. I assumed the stream would block, when the thread-pool is full, and therefore couldn’t consume immediately.

hansen-pansen 2017-06-08T12:27:09.912340Z

So I completely misunderstood the idea of the executor on the stream. I must admit, I don’t see a reason at all right now for it.

dm3 2017-06-08T12:27:49.921947Z

it’s there to control where the execution happens

hansen-pansen 2017-06-08T12:28:08.926607Z

Should I put a deferred (like (future sleep)) onto the stream?

dm3 2017-06-08T12:37:10.063588Z

one sec

hansen-pansen 2017-06-08T12:53:03.317542Z

Sure.

dm3 2017-06-08T13:16:30.751045Z

consume-parallel is trickier than I thought 🙂

hansen-pansen 2017-06-08T13:22:43.874452Z

I think I will just move back to core.async/pipeline-blocking.

dm3 2017-06-08T13:23:28.889903Z

what’s your end goal? process items from the stream in parallel?

dm3 2017-06-08T13:23:44.895741Z

or partition the stream into N streams?

hansen-pansen 2017-06-08T13:28:10.989832Z

Exactly. I have a function a that gets called via a web service irregularly, which returns a value. This value has to be processed with another function b that triggers an HTTP request and some computation. I want to decouple a from b. My idea is that a feeds that value into a stream, and at the sink a worker pool performs b in parallel, and putting the result onto another stream.

hansen-pansen 2017-06-08T13:28:30.997254Z

So your first suggestion.

hansen-pansen 2017-06-08T14:07:00.882645Z

Interesting. (s/consume) is definitely the “blocker” here. Even if I map a future on the stream, derefing the future blocks the rest. If I consume, but do not deref, everything runs in parallel.

dm3 2017-06-08T14:18:56.173944Z

I couldn’t figure out how to make the parallelization work on the onto thread pool

dm3 2017-06-08T14:24:25.309902Z

actually I have something 🙂

dm3 2017-06-08T14:28:14.403765Z

(require '[manifold.stream :as s])
(require '[manifold.deferred :as d])
(require '[manifold.executor :as ex])

(defn round-robin-fork
  [src n]
  (let [dsts (take n (repeatedly s/stream))
        ^java.util.concurrent.BlockingQueue ready
        (doto (java.util.concurrent.ArrayBlockingQueue. n)
          (.addAll dsts))

        free-up! #(.offer ready %)
        next! #(.take ready)
        send! #(-> (s/put! %1 %2)
                   (d/chain
                     (fn [result]
                       (if result
                         (free-up! %1)
                         (s/close! %1)))))]
    (d/loop [dst (.take ready)]
      (-> (s/take! src ::none)
          (d/chain
            (fn [result]
              (if (= result ::none)
                (doseq [d dsts]
                  (s/close! d))
                (do (future (send! dst result))
                    (d/chain (next!)
                      #(d/recur %))))))))
    dsts))

;; Test
(def s (s/stream))
(def ss (round-robin-fork s 10))

(doseq [[idx s] (map-indexed vector ss)]
  (s/consume
    #(let [sleep (rand-int 1000)]
       (Thread/sleep sleep)
       (println (Thread/currentThread) "DST [" idx "] slept for " sleep " ms, got: " %))
    s))

(dotimes [i 20]
  (println  "putting " i)
  @(s/put! s i))

dm3 2017-06-08T14:28:24.407635Z

However the work here is performed on the default future executor

dm3 2017-06-08T14:28:52.419146Z

however you could pass a pool as a third argument

hansen-pansen 2017-06-08T14:28:57.421429Z

You rock!

dm3 2017-06-08T14:28:58.421555Z

and use it in future-with

dm3 2017-06-08T14:30:59.474014Z

the name is from the previous iteration, should probably be just fork

dm3 2017-06-08T14:32:15.505506Z

also if you do onto and use the same executor to run the tasks inside fork - the thing will deadlock somewhere - not sure where