core-async

2020-05-08T19:05:23.162500Z

Just want a sanity check, does this pattern seem safe? I'm running a worker task on a future, and I'd like to be able to cleanly exit it, so that whatever work is in progress is cleanly finished, and then the future exits out of its loop without sleeping. I'm using async/alt!! as a trick for this:

(let [exit-ch (async/chan)
        my-task (future (loop []
                          ;;do work here
                          (async/alt!! (async/timeout 5000) (recur)
                                       exit-ch-ch nil)))
        exit-fn (fn [] (async/close! exit-ch))]
    ;;eventually will run exit-fn to safely cancel the future
  )
I'm seeing an issue where it appears this worker task seems to be no longer running... I'm wondering if using alt!! this way could cause some unexpected problems?

2020-05-08T19:10:28.163500Z

I've never seen recur used inside the body of alt!! and I'm skeptical that would work given the macro rewriting that happens inside alt!!.

2020-05-08T19:11:27.164600Z

Can you try wrapping the body of your future with try catch and print if that body is throwing? My guess is you get an exception thrown from alts!! but the exception is swallowed by the future.

2020-05-08T19:12:49.165900Z

Yeah that's a good point, I did try derefing the future after closing the channel, and all I saw returned was nil (would have expected an exception if alt!! had thrown one) But I will definitely try that

2020-05-08T19:13:20.166900Z

Normally the clauses in alt!! are values or at least expressions that evaluate to a value, but recur is different than that

2020-05-08T19:15:32.167700Z

right now I'm just running a stripped-down version of this that runs the future, loops/recurs and prints.. want to see if it will eventually stop or gobble up a ton of memory. So far it's run over 22,000 times with no issues

2020-05-08T19:21:30.169600Z

Other than the recur thing I don't see anything glaring about your use of alt!!. I'm guessing you typed up the example above in slack so the mismatch between shutdown-ch and exit-ch is not actually a problem in your real code? Also I normally put the shutdown-ch first and use the {:priority true} option to deterministically shut down as soon as the shutdown-ch closes.

2020-05-08T19:23:56.171800Z

haha whoops yes, just a typo. Wasn't familiar with the priority option but that sounds like a good improvement. Thank you!

2020-05-08T19:31:51.177400Z

For more context, I was originally using a future that's running a long-running task that did a 5 second thread/sleep (same loop/recur kind of pattern). There was no clean way to kill this future on service shutdown; I could try signalling it to shutdown using an atom, but with that Thread/sleep it would still take a long time to exit. I introduced alt!! with this timeout and the shutdown-ch as a way to retain the original sleeping behavior, but give it an option to exit immediately by closing the exit channel

2020-05-08T19:33:33.177800Z

alt!! should work fine like that

dpsutton 2020-05-08T19:33:35.178Z

i wrote something similar to this yesterday/today:

(defn watch-for-cancelling [{job-id :id} cancel-ch stop-ch]
  (a/go-loop []
    (if-let [cancel? (a/<! (a/thread (-> {:select [:id]
                                          :from [:<http://svc.jobs|svc.jobs>]
                                          :where [:and
                                                  [:= :id job-id]
                                                  [:= :status "cancelled"]]}
                                         (hdb/fetch)
                                         seq)))]
      (do (log/warnf "Cancelling job: %s" job-id)
          (a/&gt;! cancel-ch ::cancel))
      (when (= :recur (a/alt! (a/timeout 5000) :recur
                              stop-ch :job-done))
        (recur)))))

2020-05-08T19:34:26.178100Z

I always write it like

(async/alt!! (async/timeout 5000)
             ([_] (recur))
             exit-ch-ch
             ([_] nil))
but I think I recall it working if you elide the binding for the result as well

2020-05-08T19:35:23.178500Z

you can recur directly from within an alt

dpsutton 2020-05-08T19:35:38.178800Z

i wouldn't have thought to try that. neat

2020-05-08T19:37:17.179700Z

but I never write an alt clause without a binding, so I forget how that works

dpsutton 2020-05-08T19:38:00.181Z

what do you mean with a binding?

2020-05-08T19:38:27.182Z

a binding for the result of the alt

dpsutton 2020-05-08T19:38:48.183100Z

there are four in the snippet you provided

2020-05-08T19:38:53.183400Z

(alt! whatever ([x] do-something-with-x))

2020-05-08T19:39:07.183900Z

It seems to be working fine without the binding, I'm just getting some weird issues after the code has been running for two days Not sure at this point if it's something elsewhere in my code or something happening with the alt. Probably need some more logging

2020-05-08T19:39:11.184100Z

x is bound the result of the whatever channel operation

fmjrey 2020-05-08T19:39:13.184200Z

Not sure if that's any help, and I have no time to delve into the details of your problem @bfay, but my first thought here is:

core.async/thread works off a separate unbounded thread pool, distinct from
any other thread pool, while clojure.core/future on the other hand uses the
agent thread pool which is bounded and can be used anywhere in the program.
An important difference however: core.async/thread are daemon threads the
JVM will not wait on before exiting, unlike clojure.core/future threads.

2020-05-08T19:39:17.184400Z

I just looked at the code

2020-05-08T19:39:41.185Z

alt! assumes if the expr is a seq then it is a binding

2020-05-08T19:39:52.185200Z

which is why your recur isn't working

2020-05-08T19:40:12.185500Z

just always do the binding form

2020-05-08T19:40:49.186100Z

wait really? I mean this example runs and has no binding form

(future (loop []
                   (println "in loop" (rand-int 500))
                   (async/alt!! (async/timeout 50) (recur)
                                shutdown-ch nil)))

2020-05-08T19:41:43.186500Z

(where shutdown-ch is just a barebones (async/chan))

2020-05-08T19:42:10.187100Z

ah, misread the code, it checks if it is a seq, and if the first element of the seq is a vector

2020-05-08T19:44:08.187900Z

my next guess is you are blocking up the core.async threadpool some how which is causing the timeout's callback not to be able to run

2020-05-08T19:45:45.188900Z

and you can test that by running your future, and when it stops looping running (clojure.core.async.impl.dispatch/run #(println "Hello World")) if nothing prints then it is blocked

2020-05-08T19:46:32.189600Z

Oh nice, that's a good tip. Behavior-wise it does seem a bit like it could be a blocked threadpool

2020-05-08T19:48:41.191400Z

the way the blocking operations are implemented (alt!!, <!!, >!!) they are basically the non-blocking versions + a promise, so even if you don't use any go blocks you still end up with callbacks that deliver to the promise running on the core.async threadpool