I have reified the documentation of reactor
, please let me know if some point deserves clarification https://github.com/leonoel/missionary/blob/17d31949c821215ed85563000c5378270fc96aed/src/missionary/core.cljc#L643-L673
Is there a way to make emit-responses
here lazy so that it only runs the fake-http-req twice?
(def fake-http-req
(sp
(println "making request")
(? (m/sleep 100 {:body "<p>hi</p>"}))))
(def emit-responses
(ap
(?? (m/enumerate (range)))
(? fake-http-req)))
(->> emit-responses
(m/transform (comp (take 2) (map #(doto % prn))))
(m/aggregate conj)
?)
making request
{:body "<p>hi</p>"}
making request
{:body "<p>hi</p>"}
making request
not that way. each pipeline stage is, conceptually, separated by a 1-item buffer so as soon as the transform
stage pulls the second value, the ap
stage sees room in its output buffer and prepares the third value.
If you want to be sure no more than 2 calls are made you'll have to implement that inside the ap
ok gotcha
Another one, slightly different than yesterday, essentially in that the task can have an invalid result based on its value relative to the collected flow values, and need to be retried:
I have two flows running in parallel, emitting increasing integers at different speeds fast and slow. I need to wait for the slow flow to emit a number higher than the first seen number from the fast flow, this will be the start
int. Ultimately I need:
- the numbers that came from the fast flow before start
- the start
number
- a flow of all numbers from the fast flow after start
Currently have this working, but wondering if there's another way to think about this?
(defn nflow [start step delay]
(ap
(let [x (?? (m/enumerate (range start 1000 step)))]
(? (m/sleep delay x)))))
(def fast-flow (nflow 3 1 100)) ;;start at 3, step by 1, emit every 100 ms
(def slow-flow (nflow 0 5 500)) ;;start at 0, step by 5, emit every 500 ms
(->> (ap (amb=
[:fast (?? fast-flow)]
[:slow (?? slow-flow)]))
(m/integrate
(fn [{:keys [prior start after] :as acc} [k v]]
(cond
(and (not start) (= k :slow))
(if (> v (first prior))
(assoc acc :start v)
acc)
(and (not start) (= k :fast))
(update acc :prior conj v)
:else
;;(update acc :after conj v)
(assoc acc :after v)
))
{:prior []})
(m/transform (comp (take 20) (map #(:after % %))))
(m/aggregate conj)
?)
[{:prior []}
{:prior [3]}
{:prior [3 4]}
{:prior [3 4 5]}
{:prior [3 4 5 6]}
{:prior [3 4 5 6]}
{:prior [3 4 5 6 7]}
{:prior [3 4 5 6 7 8]}
{:prior [3 4 5 6 7 8 9]}
{:prior [3 4 5 6 7 8 9 10]}
{:prior [3 4 5 6 7 8 9 10 11]}
{:prior [3 4 5 6 7 8 9 10 11], :start 5}
12
13
14
15
16
10
17
18]
seems like the right way to me