missionary

2020-12-22T23:15:20.055400Z

I'm a little confused on how the flow protocol is supposed to work, just trying to get a feel for how it works without the higher level constructs. The following works but doesn't seem right. Is there a better way to just manually print each item from a flow? Are new values from the flow supposed to be handled in the notifier function? This is intentionally somewhat difficult right, because the flow protocol requires you to "bring your own state/coordination" (which the rest of missionary does for you)?

(do 
  (def prod1
    (m/observe
      (fn [event-fn]
        (let [producer
              (Thread.
                (fn []
                  (dotimes [n 6]
                    ;;(println "Producing" n)
                    (Thread/sleep 1000)
                    (event-fn n))))]
          (.start producer)
          (fn []
            (println "cancelled")
            (.stop producer)))))))


(do
  (declare iter)
  (def iter (prod1
              (fn []
                ;;(println :notified)
                (let [x @iter]
                  (println "new value" x)))
              (fn [] (println :terminated)))))

2020-12-23T10:53:23.056400Z

Bootstrapping a flow is not trivial because very often it requires to build circular references between the iterator and the notifier, and it's ridiculously hard to do that in clojure (which is one of the reasons why the JVM part of missionary is written in java). Your first example is timing-sensitive. A flow is allowed to call the notifier during the constructor call, in this case the var would not be assigned yet and you'll get a NPE on deref. The semaphore approach looks good to me, also have a look at the blocking iterator implementation for a similar strategy with threads and locks : https://github.com/leonoel/missionary/issues/23#issuecomment-746317145 The implementation of aggregate may be worth reading as well.

2020-12-23T11:18:12.056700Z

> Are new values from the flow supposed to be handled in the notifier function? The protocol allows this but it's not mandatory. > This is intentionally somewhat difficult right, because the flow protocol requires you to "bring your own state/coordination" (which the rest of missionary does for you)? The protocol is intentionally low-level to allow a large variety of composition semantics.

2020-12-23T14:15:13.056900Z

Thanks for the explanation, that helps! And the FlowIterator impl is useful too