@hiredman I need it to work in cljs as well. Do you have any particular comments to level at this impl? Why it’s a bad idea? As far as I can tell this should work and be safe, will need to run it through a larger test suite though. the .lock and impl/active and commit are really just incantations for me. But intuitively this seems like it should work.
The following seems to be a safe version
(defn big-promise []
(let [p (a/promise-chan)
c (a/chan)
m (a/mult c)]
(a/go
(let [v (a/<! p)]
(when v
(a/>! c v))
(a/close! c)))
(reify
impl/ReadPort
(take! [this handler]
(a/go
(let [c' (a/chan)
v (do
(a/tap m c')
(let [v (or (a/poll! p)
(a/<! c'))]
(a/untap m c')
(a/close! c')
v))]
#?(:clj
(.lock handler))
(let [good (and (impl/active? handler)
(impl/commit handler))]
#?(:clj
(.unlock handler))
(when good
(dispatch/run #(good v))))))
nil)
impl/WritePort
(put! [this v handler]
(impl/put! p v handler))
impl/Channel
(close! [this]
(impl/close! p))
(closed? [this]
(impl/closed? p)))))
But It doesnt support a/poll! Which it needs to do
(defn- box [val]
#?(:clj
(reify clojure.lang.IDeref
(deref [_] val))
:cljs
(reify IDeref
(-deref [_] val))))
(defn big-promise
([] (big-promise nil))
([max-pending-takes]
(let [max-pending-takes (or max-pending-takes impl/MAX-QUEUE-SIZE)
p (a/promise-chan)
c (a/chan)
m (a/mult c)
takes (atom 0)]
(a/go
(let [v (a/<! p)]
(when v
(a/>! c v))
(a/close! c)))
(reify
impl/ReadPort
(take! [this handler]
(if (not ^boolean (impl/active? handler))
nil
(if-let [v (a/poll! p)]
(box v)
(do
(assert (< @takes max-pending-takes)
(str "No more than " max-pending-takes
" pending takes are allowed on this big-promise."))
(a/go
(let [c' (a/chan)
v (do
(swap! takes inc)
(a/tap m c')
(let [v (or (a/poll! p)
(a/<! c'))]
(a/untap m c')
(a/close! c')
v))]
(swap! takes dec)
#?(:clj (.lock handler))
(let [good (and (impl/active? handler)
(impl/commit handler))]
#?(:clj (.unlock handler))
(when good
(dispatch/run #(good v))))))
nil))))
impl/WritePort
(put! [this v handler]
(impl/put! p v handler))
impl/Channel
(close! [this]
(impl/close! p))
(closed? [this]
(impl/closed? p))))))
but this does