core-async

Franco Gasperino 2021-06-25T20:40:53.097300Z

I'm looking to express the following: • A sequence of channels are expected to have a single keyword value to indicate if the prodcuer attached to the channel has started (:started) or encountered an error (:error) • All channels should be read. • A timeout should be applied to the read of all channels as a sequence to prevent deadlocks from an unresponsive producer. I've come up with the following, and am looking for a quick yea/nay on the idiomatic style.

(def c1 (async/chan 1))
  (def c2 (async/chan 1))
  (def c3 (async/chan 1))

  (doseq [c [c1 c2 c3]]
    (async/put! c :started))

  (defn started? [& responses]
    (every? #(= :started %) responses))

  (def c4 (async/map started? [c1 c2 c3]))
  
  (let [t (async/timeout 5000)
        [v p] (async/alts!! [c4 t])]
    (cond 
      (= p t) "Timed out!"
      (true? v) "All channels started!"
      (false? v) "One or more channels failed to start!"
      :default "Unknown case"))

2021-06-25T21:08:30.097500Z

nah

Franco Gasperino 2021-06-26T17:22:17.099800Z

Odd that map would act that way, as map returns a channel which (i would think) could be composed as such. If i execute my example without the doseq put! operation on c1 c2 c3, it does in fact time out at the repl.

Franco Gasperino 2021-06-26T17:23:28.100Z

the same timeout behavior if i put! on c1 c2, but not c3

Franco Gasperino 2021-06-26T17:24:06.100200Z

a put! on all 3 channels results in "all channels started!"

2021-06-26T18:46:04.100400Z

The thing to understand is map is starting a go block which is basically copying from the input to the output. And your timeout has no effect on that process

2021-06-26T18:46:23.100600Z

It sits there trying to copy until the cows come home

2021-06-26T18:49:38.100800Z

In particular for map, leaving the map process sitting around doing that copying likely is not a big deal

2021-06-26T18:50:23.101Z

If I recall the output channel is unbuffered and the map process will just wedge writing to it until it gets gc'ed

2021-06-26T18:53:18.101200Z

But in general leaving that kind of process sitting around can cause things to behave unexpectedly (if the channel is buffered, or channels aren't being used as one shot value conveyors)

2021-06-26T18:56:23.101400Z

At the very least I think you'd be better off using something like merge over map, because mapping is ordered and you don't car about order

Franco Gasperino 2021-06-28T15:22:57.104200Z

acknowledged. thanks

2021-06-25T21:09:04.098200Z

if you use map like that, it will hang around reading from the channels regardless of the timeout

2021-06-25T21:09:26.098600Z

you need to pass all the channels together to alt

2021-06-25T21:14:51.099Z

(let [t (async/timeout 5000)]
  (loop [channels [c1 c2 c3]]
    (if (seq channels)
      (let [[v p] (async/alts!! (conj channels t))]
        (cond (= p t)
              "Timed out!"
              (= v :started)
              (recur (remove #{p} channels))
              :else
              "One or more channels failed to start!"))
      "All channels started!")))