Just want a sanity check, does this pattern seem safe?
I'm running a worker task on a future, and I'd like to be able to cleanly exit it, so that whatever work is in progress is cleanly finished, and then the future exits out of its loop without sleeping.
I'm using async/alt!!
as a trick for this:
(let [exit-ch (async/chan)
my-task (future (loop []
;;do work here
(async/alt!! (async/timeout 5000) (recur)
exit-ch-ch nil)))
exit-fn (fn [] (async/close! exit-ch))]
;;eventually will run exit-fn to safely cancel the future
)
I'm seeing an issue where it appears this worker task seems to be no longer running... I'm wondering if using alt!!
this way could cause some unexpected problems?I've never seen recur
used inside the body of alt!!
and I'm skeptical that would work given the macro rewriting that happens inside alt!!
.
Can you try wrapping the body of your future
with try
catch
and print if that body is throwing? My guess is you get an exception thrown from alts!!
but the exception is swallowed by the future.
Yeah that's a good point, I did try derefing the future after closing the channel, and all I saw returned was nil
(would have expected an exception if alt!!
had thrown one)
But I will definitely try that
Normally the clauses in alt!!
are values or at least expressions that evaluate to a value, but recur
is different than that
right now I'm just running a stripped-down version of this that runs the future, loops/recurs and prints.. want to see if it will eventually stop or gobble up a ton of memory. So far it's run over 22,000 times with no issues
Other than the recur
thing I don't see anything glaring about your use of alt!!
. I'm guessing you typed up the example above in slack so the mismatch between shutdown-ch
and exit-ch
is not actually a problem in your real code? Also I normally put the shutdown-ch
first and use the {:priority true}
option to deterministically shut down as soon as the shutdown-ch closes.
haha whoops yes, just a typo. Wasn't familiar with the priority
option but that sounds like a good improvement. Thank you!
For more context, I was originally using a future that's running a long-running task that did a 5 second thread/sleep (same loop/recur kind of pattern).
There was no clean way to kill this future on service shutdown; I could try signalling it to shutdown using an atom, but with that Thread/sleep it would still take a long time to exit.
I introduced alt!!
with this timeout and the shutdown-ch as a way to retain the original sleeping behavior, but give it an option to exit immediately by closing the exit channel
alt!! should work fine like that
i wrote something similar to this yesterday/today:
(defn watch-for-cancelling [{job-id :id} cancel-ch stop-ch]
(a/go-loop []
(if-let [cancel? (a/<! (a/thread (-> {:select [:id]
:from [:<http://svc.jobs|svc.jobs>]
:where [:and
[:= :id job-id]
[:= :status "cancelled"]]}
(hdb/fetch)
seq)))]
(do (log/warnf "Cancelling job: %s" job-id)
(a/>! cancel-ch ::cancel))
(when (= :recur (a/alt! (a/timeout 5000) :recur
stop-ch :job-done))
(recur)))))
I always write it like
(async/alt!! (async/timeout 5000)
([_] (recur))
exit-ch-ch
([_] nil))
but I think I recall it working if you elide the binding for the result as wellyou can recur directly from within an alt
i wouldn't have thought to try that. neat
for example https://github.com/hiredman/roundabout/blob/master/src/com/manigfeald/roundabout.cljc
but I never write an alt clause without a binding, so I forget how that works
what do you mean with a binding?
a binding for the result of the alt
there are four in the snippet you provided
(alt! whatever ([x] do-something-with-x))
It seems to be working fine without the binding, I'm just getting some weird issues after the code has been running for two days Not sure at this point if it's something elsewhere in my code or something happening with the alt. Probably need some more logging
x is bound the result of the whatever channel operation
Not sure if that's any help, and I have no time to delve into the details of your problem @bfay, but my first thought here is:
core.async/thread works off a separate unbounded thread pool, distinct from
any other thread pool, while clojure.core/future on the other hand uses the
agent thread pool which is bounded and can be used anywhere in the program.
An important difference however: core.async/thread are daemon threads the
JVM will not wait on before exiting, unlike clojure.core/future threads.
I just looked at the code
alt! assumes if the expr is a seq then it is a binding
which is why your recur isn't working
just always do the binding form
wait really? I mean this example runs and has no binding form
(future (loop []
(println "in loop" (rand-int 500))
(async/alt!! (async/timeout 50) (recur)
shutdown-ch nil)))
(where shutdown-ch is just a barebones (async/chan))
ah, misread the code, it checks if it is a seq, and if the first element of the seq is a vector
my next guess is you are blocking up the core.async threadpool some how which is causing the timeout's callback not to be able to run
and you can test that by running your future, and when it stops looping running (clojure.core.async.impl.dispatch/run #(println "Hello World"))
if nothing prints then it is blocked
Oh nice, that's a good tip. Behavior-wise it does seem a bit like it could be a blocked threadpool
the way the blocking operations are implemented (alt!!, <!!, >!!) they are basically the non-blocking versions + a promise, so even if you don't use any go blocks you still end up with callbacks that deliver to the promise running on the core.async threadpool