core-async

Jakub Holý 2020-07-10T17:28:31.043600Z

Hello, what happens when the transducer inside pipeline-blocking throws? Is it possible the output chan will never be closed? How to fix? With this:

(let [output-chan (a/chan concurrency)]
     (do (pipeline-blocking
           10
           output-chan
           (map #(do-something-that-can-throw %))
           (a/to-chan accid->invnrs)
           :close
           ;; Put the error onto the channel for later reading
           (fn error-handler [throwable] throwable)))
     output-chan)
The channel should be closed when it throws, no?

alexmiller 2020-07-10T17:32:58.044200Z

there is an additional arg on chan for error handling

1👀
Jakub Holý 2020-07-10T17:34:21.045300Z

Hm, from my experiments it seems ☝️ works fine and returns a channels with three throwables which then gets closed.

Jakub Holý 2020-07-10T18:03:52.050100Z

If somebody is willing to spend a little time, I would very much appreciate any help figuring out why my catching-transduce can freeze indefinitely on line 26, when trying to read the collection from errors-ch https://gist.github.com/holyjak/cd744395c8630266fcde92632f01b9b8 Briefly: It reads from a channel, splits into two (one for data, one for errors), runs transduce on the data, then checks its results for exceptions and the error channel for exceptions, returning either the data result or throwing. According to the tests (included), the function works correctly. So the only way I can see how this could happen is if the input channel never closes. But the input is produced by the pipeline-blocking above, which also seems to work correctly even if there is an exception. So I have no idea how to troubleshoot this further 😞 🙏

2020-07-10T18:07:45.051600Z

one escape-hatch for debugging is to use tap> which sends data to all functions registered by add-tap, and you can have a tapping function like #(swap! events conj %) to see the touchpoints in order as data

1👀
2020-07-10T18:08:22.052400Z

since that doesn't use channels (but is still async), you can see traces of what happened and in what order, a bit more elegantly than prints

Jakub Holý 2020-07-10T19:26:36.054600Z

I was lucky 🙂 I discovered that changing one test slightly - so that the transformation fn throws for an input that is then followed by at least two inputs that either cause an exception or not - then I can replicate the problem. Now on to the solution... I should really setup some generative tests for this...

Jakub Holý 2020-07-10T20:10:09.056600Z

FYI The problem seems to be that when there is an exception during the transduction, I drain the input channel but it does not do what I expect (does not close the derived errors channel). Here is drain:

(defn drain
  [ch]
  (a/close! ch)
  (a/go-loop []
    (when (a/<! ch) (recur)))
  nil)

Jakub Holý 2020-07-10T20:12:34.057500Z

Here is a replication of the issue:

(let [in-ch             (a/to-chan [2 4 6])
        [odd-ch even-ch] (a/split odd? in-ch)]
    (drain even-ch)
    {:even-ch (chan-status!! even-ch)
     :odd-ch (chan-status!! odd-ch)
     :in-ch  (chan-status!! in-ch)})
; => {:even-ch :closed, :odd-ch :ch-open-no-value-ready, :in-ch :ch-open-no-value-ready}
(chan-status!! does (or (first (a/alts!! [ch] :default :ch-open-no-value-ready)) :closed) )

Jakub Holý 2020-07-10T20:17:30.058400Z

It seems I have to drain both the input channel and the branch with any data (here even-ch).

dominicm 2020-07-10T21:52:15.059700Z

What do I need to consider when using core async for chunking? Can I be notified on channel close so I can abort requests?

2020-07-10T21:55:07.060400Z

sounds like a good use case for a shutdown-channel that you can poll! - if it has data ready, you are done

2020-07-10T21:55:52.061Z

you can also use a promise plus realized? as a cheap one-way latch indicating shutdown

2020-07-10T21:59:32.062500Z

if the requests are long-running, you can have a construction like

(alt!
   done-ch :wrap-up
   (async/thread (get-batch ...)) ([result] ...)
  ...)

2020-07-10T22:00:00.063100Z

where done-ch can be a chan that is either closed or a promise-chan with a value pushed to indicate you are wrapping up

2020-07-10T22:00:17.063400Z

you shouldn't need to poll anything