aleph

2017-07-19T16:26:50.215724Z

Can someone tell me why I need to call realize-each here? I suppose it means that my requests are all realized and kept into memory until further processing, which is something I am trying to avoid.

(s/stream->seq
  (s/map (fn [res]
           (println (type res)) ;;  manifold.deferred.Deferred if not realize-each
           (select-keys res [:request-time :connection-time :status]))
         (s/realize-each ;; why do I need this?
           (s/map (fn [u]
                    (println "HTTP REQ TO " u)
                    (aleph.http/get u {:throw-exceptions? false}))
                  (s/buffer 50
                            (s/->source
                              ["<http://google.com/abc>"
                               "<http://google.com/def>"
                               "<http://google.com/ghi>"]))))))
Also, I note that the following works:
@(d/chain (aleph.http/get "<https://google.com>")
          #(select-keys % [:request-time :status]))

dm3 2017-07-19T16:31:57.398137Z

you have a stream of deferreds

dm3 2017-07-19T16:32:44.425842Z

so you need to block/attach a callback in order to get to its contents

2017-07-19T16:33:29.452457Z

so realize-each is the blocking approach right? How do I use a callback?

dm3 2017-07-19T16:34:24.484078Z

you need to decide whether to parallelize a batch

2017-07-19T16:34:36.491612Z

It looks like https://github.com/ztellman/manifold/blob/master/docs/stream.md has a hint:

The value returned by the callback for connect-via provides backpressure - if a deferred value is returned, further messages will not be passed in until the deferred value is realized.
but I don’t understand it

2017-07-19T16:35:01.506430Z

I actually want to throttle it

2017-07-19T16:42:12.751727Z

That looks closer to what I want:

(time (doall (s/stream-&gt;seq
               (s/map (fn [res]
                        (select-keys res [:request-time :connection-time :status]))
                      (s/realize-each
                        (s/map (fn [u]
                                 (println "HTTP REQ TO " u)
                                 (aleph.http/get u {:throw-exceptions? false}))
                               (s/throttle 1
                                           (s/buffer 50
                                                     (s/-&gt;source
                                                       (map #(str "<http://www.google.com/>" %) (range 1 10))
                                                       )))))))))

2017-07-19T16:48:30.968100Z

Actually you are right, I want a mix of parallelize and throttle. How do I parallelize it?

dm3 2017-07-19T18:00:08.442487Z

@nha you can use deferred/loop to do it kinda manually

dm3 2017-07-19T18:00:47.467101Z

or pipe the batched stream into a (deferred/future-with my-single-thread-loop ..)