missionary

2020-12-24T11:11:10.061400Z

I have reified the documentation of reactor, please let me know if some point deserves clarification https://github.com/leonoel/missionary/blob/17d31949c821215ed85563000c5378270fc96aed/src/missionary/core.cljc#L643-L673

👍 1
2020-12-24T16:23:07.062900Z

Is there a way to make emit-responses here lazy so that it only runs the fake-http-req twice?

(def fake-http-req
  (sp
    (println "making request")
    (? (m/sleep 100 {:body "<p>hi</p>"}))))

(def emit-responses
  (ap
    (?? (m/enumerate (range)))
    (? fake-http-req)))

(->> emit-responses
     (m/transform (comp (take 2) (map  #(doto % prn))))
     (m/aggregate conj)
     ?)
making request
{:body "<p>hi</p>"}
making request
{:body "<p>hi</p>"}
making request

2020-12-24T16:52:11.065100Z

not that way. each pipeline stage is, conceptually, separated by a 1-item buffer so as soon as the transform stage pulls the second value, the ap stage sees room in its output buffer and prepares the third value.

2020-12-24T16:52:43.065700Z

If you want to be sure no more than 2 calls are made you'll have to implement that inside the ap

2020-12-24T16:56:00.066Z

ok gotcha

2020-12-24T17:54:06.071400Z

Another one, slightly different than yesterday, essentially in that the task can have an invalid result based on its value relative to the collected flow values, and need to be retried: I have two flows running in parallel, emitting increasing integers at different speeds fast and slow. I need to wait for the slow flow to emit a number higher than the first seen number from the fast flow, this will be the start int. Ultimately I need: - the numbers that came from the fast flow before start - the start number - a flow of all numbers from the fast flow after start Currently have this working, but wondering if there's another way to think about this?

(defn nflow [start step delay]
  (ap
    (let [x (?? (m/enumerate (range start 1000 step)))]
      (? (m/sleep delay x)))))

(def fast-flow (nflow 3 1 100)) ;;start at 3, step by 1, emit every 100 ms
(def slow-flow (nflow 0 5 500)) ;;start at 0, step by 5, emit every 500 ms

(->> (ap (amb=
           [:fast (?? fast-flow)]
           [:slow (?? slow-flow)]))
     (m/integrate
       (fn [{:keys [prior start after] :as acc} [k v]]
         (cond
           (and (not start) (= k :slow))
           (if (> v (first prior))
             (assoc acc :start v)
             acc)

           (and (not start) (= k :fast))
           (update acc :prior conj v)

           :else
           ;;(update acc :after conj v)
           (assoc acc :after v)
           
           
           ))
       {:prior []})
     (m/transform (comp (take 20) (map #(:after % %))))
     (m/aggregate conj)
     ?)
[{:prior []}
 {:prior [3]}
 {:prior [3 4]}
 {:prior [3 4 5]}
 {:prior [3 4 5 6]}
 {:prior [3 4 5 6]}
 {:prior [3 4 5 6 7]}
 {:prior [3 4 5 6 7 8]}
 {:prior [3 4 5 6 7 8 9]}
 {:prior [3 4 5 6 7 8 9 10]}
 {:prior [3 4 5 6 7 8 9 10 11]}
 {:prior [3 4 5 6 7 8 9 10 11], :start 5}
 12
 13
 14
 15
 16
 10
 17
 18]

2020-12-25T08:49:14.071500Z

seems like the right way to me

👍 1