Can someone tell me why I need to call realize-each
here?
I suppose it means that my requests are all realized and kept into memory until further processing,
which is something I am trying to avoid.
(s/stream->seq
(s/map (fn [res]
(println (type res)) ;; manifold.deferred.Deferred if not realize-each
(select-keys res [:request-time :connection-time :status]))
(s/realize-each ;; why do I need this?
(s/map (fn [u]
(println "HTTP REQ TO " u)
(aleph.http/get u {:throw-exceptions? false}))
(s/buffer 50
(s/->source
["<http://google.com/abc>"
"<http://google.com/def>"
"<http://google.com/ghi>"]))))))
Also, I note that the following works:
@(d/chain (aleph.http/get "<https://google.com>")
#(select-keys % [:request-time :status]))
you have a stream of deferreds
so you need to block/attach a callback in order to get to its contents
so realize-each
is the blocking approach right? How do I use a callback?
you need to decide whether to parallelize a batch
It looks like https://github.com/ztellman/manifold/blob/master/docs/stream.md has a hint:
The value returned by the callback for connect-via provides backpressure - if a deferred value is returned, further messages will not be passed in until the deferred value is realized.
but I don’t understand itI actually want to throttle it
That looks closer to what I want:
(time (doall (s/stream->seq
(s/map (fn [res]
(select-keys res [:request-time :connection-time :status]))
(s/realize-each
(s/map (fn [u]
(println "HTTP REQ TO " u)
(aleph.http/get u {:throw-exceptions? false}))
(s/throttle 1
(s/buffer 50
(s/->source
(map #(str "<http://www.google.com/>" %) (range 1 10))
)))))))))
Actually you are right, I want a mix of parallelize and throttle. How do I parallelize it?
@nha you can use deferred/loop
to do it kinda manually
or pipe the batched stream into a (deferred/future-with my-single-thread-loop ..)