Hello, what happens when the transducer inside pipeline-blocking throws? Is it possible the output chan will never be closed? How to fix? With this:
(let [output-chan (a/chan concurrency)]
(do (pipeline-blocking
10
output-chan
(map #(do-something-that-can-throw %))
(a/to-chan accid->invnrs)
:close
;; Put the error onto the channel for later reading
(fn error-handler [throwable] throwable)))
output-chan)
The channel should be closed when it throws, no?there is an additional arg on chan for error handling
http://clojure.github.io/core.async/#clojure.core.async/chan
Hm, from my experiments it seems ☝️ works fine and returns a channels with three throwables which then gets closed.
If somebody is willing to spend a little time, I would very much appreciate any help figuring out why my catching-transduce
can freeze indefinitely on line 26, when trying to read the collection from errors-ch
https://gist.github.com/holyjak/cd744395c8630266fcde92632f01b9b8
Briefly: It reads from a channel, splits into two (one for data, one for errors), runs transduce on the data, then checks its results for exceptions and the error channel for exceptions, returning either the data result or throwing.
According to the tests (included), the function works correctly. So the only way I can see how this could happen is if the input channel never closes. But the input is produced by the pipeline-blocking
above, which also seems to work correctly even if there is an exception. So I have no idea how to troubleshoot this further 😞 🙏
one escape-hatch for debugging is to use tap>
which sends data to all functions registered by add-tap
, and you can have a tapping function like #(swap! events conj %)
to see the touchpoints in order as data
since that doesn't use channels (but is still async), you can see traces of what happened and in what order, a bit more elegantly than prints
I was lucky 🙂 I discovered that changing one test slightly - so that the transformation fn throws for an input that is then followed by at least two inputs that either cause an exception or not - then I can replicate the problem. Now on to the solution... I should really setup some generative tests for this...
FYI The problem seems to be that when there is an exception during the transduction, I drain
the input channel but it does not do what I expect (does not close the derived errors channel). Here is drain:
(defn drain
[ch]
(a/close! ch)
(a/go-loop []
(when (a/<! ch) (recur)))
nil)
Here is a replication of the issue:
(let [in-ch (a/to-chan [2 4 6])
[odd-ch even-ch] (a/split odd? in-ch)]
(drain even-ch)
{:even-ch (chan-status!! even-ch)
:odd-ch (chan-status!! odd-ch)
:in-ch (chan-status!! in-ch)})
; => {:even-ch :closed, :odd-ch :ch-open-no-value-ready, :in-ch :ch-open-no-value-ready}
(chan-status!! does (or (first (a/alts!! [ch] :default :ch-open-no-value-ready)) :closed)
)It seems I have to drain both the input channel and the branch with any data (here even-ch).
What do I need to consider when using core async for chunking? Can I be notified on channel close so I can abort requests?
sounds like a good use case for a shutdown-channel that you can poll!
- if it has data ready, you are done
you can also use a promise
plus realized?
as a cheap one-way latch indicating shutdown
if the requests are long-running, you can have a construction like
(alt!
done-ch :wrap-up
(async/thread (get-batch ...)) ([result] ...)
...)
where done-ch can be a chan that is either closed or a promise-chan with a value pushed to indicate you are wrapping up
you shouldn't need to poll anything