missionary

2020-12-23T03:10:29.055800Z

Ok I think I realized this being awkard is mainly due to observe being non-backpressured. I wrapped it in relieve now can deref iter outside of the notify call. This makes more sense to me now

2020-12-23T04:41:36.056Z

Although I'm still interested in ways to use the notifier to keep consuming an entire flow, attempting to fully keep up with the consumer. Is it just a matter of enqueuing notifications on something like a core async chan (or linkedblockingqueue) and then just dequeuing them and deref'ing as the notifications come in?

2020-12-23T05:02:48.056200Z

aha, I guess just a semaphor works for this.

(do 
  (def prod1
    (m/relieve {} ;;;;m/buffer 100
      (m/observe
       (fn [event-fn]
         (let [producer
               (Thread.
                 (fn []
                   (dotimes [n 10]
                     ;;(println "Producing" n)
                     (Thread/sleep (long (rand-int 100)))
                     (event-fn n))))]
           (.start producer)
           (fn []
             (println "cancelled")
             (.stop producer))))))))


(do
  (def s1 (m/sem))
  (? s1) ;;acquire
  (def iter (prod1
              (fn []
                (println :notified)
                (s1) ;;release
                )
              (fn [] (println :terminated))))

  (dotimes  [x 100]
    (? (m/sp
         (? s1) ;;acquire
         (println "result" @iter)))))

2020-12-23T17:54:29.059100Z

Hope you're ok with these kinds of questions. I've taken a few stabs at learning missionary and think I'm starting to get some momentum in grokking it this time around. I have a flow and a task. I need to start the task while collecting all items from he flow. When the task finishes I need to grab the snapshot of all collected items from the flow at that point in time, and all items from the flow after that snapshot should be printed [edit: or just provide a flow of all items after the snapshot]. Any tips for how to do this?

(let [f (ap
          (let [n (?? (m/enumerate (repeatedly 10 #(rand-int 200))))]
            (? (m/sleep n n))))
      t (m/sleep 1000 ::awake)

      snapshot (m/aggregate (fn [_ x] x) nil
                 (ap (?? (m/latest identity (m/integrate conj [] f)))))
      
      ]
 ;;???  

  ;;need to return:
  ;;-result of t
  ;;-latest snapshot when t completes
  ;;-a flow of all values produced after the snapshot was taken
  
  )

2020-12-24T08:46:59.059900Z

Here's another approach

(defmacro amb= [& forms]
  `(case (m/?= (m/enumerate (range ~(count forms))))
     ~@(interleave (range) forms)))

(defn group-before-token [rf]
  (let [items (volatile! [])]
    (fn
      ([]
       (rf))
      ([r]
       (rf (if-some [xs @items] (rf r xs) r)))
      ([r x]
       (if-some [xs @items]
         (case x
           ::split (do (vreset! items nil) (rf r xs))
           (do (vreset! items (conj xs x)) r))
         (rf r x))))))

(defn group-until [task flow]
  (->> (m/ap (amb= (do (m/? task) ::split) (m/?? flow)))
       (m/transform group-before-token)))

2020-12-24T08:50:15.060100Z

BTW I'm totally OK with these kinds of questions

2020-12-24T14:44:55.061600Z

Very cool, thanks for that!

2020-12-23T23:59:11.059600Z

This is what I came up with:

(defn printer [t]
  (t (fn [res] (println "res" res))
    (fn [err] (println "err" err))))

"
collect all values from flow f until task t is complete, the collected values are the `init` coll. then print out all subsequent values from f along with the init coll.
"
(printer
  (m/reactor
    (let [f (ap
              (let [n (?? (m/enumerate (range 20)))]
                (? (m/sleep 20 n))))
          
          t         (m/sleep 90 ::awake)
          f1        (->> (m/stream! f)
                         (m/integrate conj [])
                         (m/relieve {}))
          f2        (m/stream! f)
          can-take? (m/dfv)
          init      (m/dfv)
          iter      (f1 (fn []
                     (println :notify)
                     (can-take? true))
                 #(println :terminated))]
      
      (m/stream!
        (ap
          (println (? init) (?? f2))))
      
      (m/stream!
        (ap
          (? (m/sp
               (? t)
               (? can-take?)
               (init @iter))))))))
=>
:notify
:notify
[0 1 2 3] 4
[0 1 2 3] 5
[0 1 2 3] 6
[0 1 2 3] 7
[0 1 2 3] 8
[0 1 2 3] 9
[0 1 2 3] 10
[0 1 2 3] 11
[0 1 2 3] 12
[0 1 2 3] 13
[0 1 2 3] 14
[0 1 2 3] 15
[0 1 2 3] 16
[0 1 2 3] 17
[0 1 2 3] 18
[0 1 2 3] 19
res #object[missionary.impl.Reactor$Pub 0x5eab2caa]