Any manifold experts here? Is there something idiomatically wrong with this or a better approach? We need to poll a stream for a new value and take an action on timeout. When this runs, we eventually hit 100% cpu and the garbage collector starts going crazy.
(let [shutdown (d/deferred)
s (s/stream)]
(d/loop []
(d/chain (s/try-take! s nil 100 ::timeout)
(fn [_]
(when-not (d/realized? shutdown)
(d/recur)))))
shutdown)
If you change the timeout from 100 to 1, the problem happens within a minute and you’ll also see this:
WARNING: excessive pending takes (> 16384), closing stream
java.lang.IllegalStateException
at manifold.stream.default.Stream.take(default.clj:234)
at foo.manifold_test$run_BANG_$this__2988__auto____4284$fn__4285.invoke(manifold_test.clj:10)
at foo.manifold_test$run_BANG_$this__2988__auto____4284.invoke(manifold_test.clj:9)
at clojure.lang.AFn.applyToHelper(AFn.java:154)
at clojure.lang.AFn.applyTo(AFn.java:144)
at clojure.core$apply.invokeStatic(core.clj:667)
at clojure.core$apply.invoke(core.clj:660)
at foo.manifold_test$run_BANG_$this__2988__auto____4284$fn__4289.invoke(manifold_test.clj:9)
at manifold.deferred.Listener.onSuccess(deferred.clj:219)
at manifold.deferred.Deferred$fn__2673.invoke(deferred.clj:398)
...
@vincentjames501 can't say I've used try-take!
much, but you could try repeating a deferred timeout on the try-take!
deferred instead of repeating the try-take!