Ok I think I realized this being awkard is mainly due to observe
being non-backpressured. I wrapped it in relieve
now can deref iter
outside of the notify call. This makes more sense to me now
Although I'm still interested in ways to use the notifier to keep consuming an entire flow, attempting to fully keep up with the consumer. Is it just a matter of enqueuing notifications on something like a core async chan (or linkedblockingqueue) and then just dequeuing them and deref'ing as the notifications come in?
aha, I guess just a semaphor works for this.
(do
(def prod1
(m/relieve {} ;;;;m/buffer 100
(m/observe
(fn [event-fn]
(let [producer
(Thread.
(fn []
(dotimes [n 10]
;;(println "Producing" n)
(Thread/sleep (long (rand-int 100)))
(event-fn n))))]
(.start producer)
(fn []
(println "cancelled")
(.stop producer))))))))
(do
(def s1 (m/sem))
(? s1) ;;acquire
(def iter (prod1
(fn []
(println :notified)
(s1) ;;release
)
(fn [] (println :terminated))))
(dotimes [x 100]
(? (m/sp
(? s1) ;;acquire
(println "result" @iter)))))
Hope you're ok with these kinds of questions. I've taken a few stabs at learning missionary and think I'm starting to get some momentum in grokking it this time around. I have a flow and a task. I need to start the task while collecting all items from he flow. When the task finishes I need to grab the snapshot of all collected items from the flow at that point in time, and all items from the flow after that snapshot should be printed [edit: or just provide a flow of all items after the snapshot]. Any tips for how to do this?
(let [f (ap
(let [n (?? (m/enumerate (repeatedly 10 #(rand-int 200))))]
(? (m/sleep n n))))
t (m/sleep 1000 ::awake)
snapshot (m/aggregate (fn [_ x] x) nil
(ap (?? (m/latest identity (m/integrate conj [] f)))))
]
;;???
;;need to return:
;;-result of t
;;-latest snapshot when t completes
;;-a flow of all values produced after the snapshot was taken
)
Here's another approach
(defmacro amb= [& forms]
`(case (m/?= (m/enumerate (range ~(count forms))))
~@(interleave (range) forms)))
(defn group-before-token [rf]
(let [items (volatile! [])]
(fn
([]
(rf))
([r]
(rf (if-some [xs @items] (rf r xs) r)))
([r x]
(if-some [xs @items]
(case x
::split (do (vreset! items nil) (rf r xs))
(do (vreset! items (conj xs x)) r))
(rf r x))))))
(defn group-until [task flow]
(->> (m/ap (amb= (do (m/? task) ::split) (m/?? flow)))
(m/transform group-before-token)))
BTW I'm totally OK with these kinds of questions
Very cool, thanks for that!
This is what I came up with:
(defn printer [t]
(t (fn [res] (println "res" res))
(fn [err] (println "err" err))))
"
collect all values from flow f until task t is complete, the collected values are the `init` coll. then print out all subsequent values from f along with the init coll.
"
(printer
(m/reactor
(let [f (ap
(let [n (?? (m/enumerate (range 20)))]
(? (m/sleep 20 n))))
t (m/sleep 90 ::awake)
f1 (->> (m/stream! f)
(m/integrate conj [])
(m/relieve {}))
f2 (m/stream! f)
can-take? (m/dfv)
init (m/dfv)
iter (f1 (fn []
(println :notify)
(can-take? true))
#(println :terminated))]
(m/stream!
(ap
(println (? init) (?? f2))))
(m/stream!
(ap
(? (m/sp
(? t)
(? can-take?)
(init @iter))))))))
=>
:notify
:notify
[0 1 2 3] 4
[0 1 2 3] 5
[0 1 2 3] 6
[0 1 2 3] 7
[0 1 2 3] 8
[0 1 2 3] 9
[0 1 2 3] 10
[0 1 2 3] 11
[0 1 2 3] 12
[0 1 2 3] 13
[0 1 2 3] 14
[0 1 2 3] 15
[0 1 2 3] 16
[0 1 2 3] 17
[0 1 2 3] 18
[0 1 2 3] 19
res #object[missionary.impl.Reactor$Pub 0x5eab2caa]