core-async

souenzzo 2019-06-06T01:06:09.045700Z

when I put a msg in a channel, I want that all listeners receive it. It's possible in core-async?

2019-06-06T01:07:28.046Z

you can use pub and sub

2019-06-06T01:09:47.047100Z

this is done by making a new sub channel per client, because messages still only go to one consumer, so the workaround is defining a channel that gets all the messages that another channel has

souenzzo 2019-06-06T01:10:54.048Z

I dont need the "topic" stuff, but should work. tnks

2019-06-06T01:12:52.048400Z

I've used pub/sub in production, but I never needed the topic

2019-06-06T01:13:50.049200Z

oh wait - @souenzzo I think you actually want mult and tap rather than pub and sub

2019-06-06T01:14:23.049500Z

they are simpler and directly do the thing you want

jplaza 2019-06-06T01:20:20.050400Z

Is this equivalent?

(go (>! out-ch (<! ch)))
to
(go (pipe ch out-ch))

ghadi 2019-06-06T01:32:02.050900Z

not quite, pipe will fully consume the input channel

ghadi 2019-06-06T01:32:21.051300Z

the first example only pulls a single value

ghadi 2019-06-06T01:33:29.051800Z

(defn pipe
  "Takes elements from the from channel and supplies them to the to
  channel. By default, the to channel will be closed when the from
  channel closes, but can be determined by the close?  parameter. Will
  stop consuming the from channel if the to channel closes"
  ([from to] (pipe from to true))
  ([from to close?]
     (go-loop []
      (let [v (<! from)]
        (if (nil? v)
          (when close? (close! to))
          (when (>! to v)
            (recur)))))
     to))

jplaza 2019-06-06T01:40:16.053600Z

Thanks @ghadi! I’ve bee playing with core.async to ingest large CSV files and queue rows to SQS for later processing

2019-06-06T15:13:54.055300Z

@hiredman Sorry for the delayed response, but I wasn't surprised by the fact that alts! chose a channel with a ready value rather than an unexpired timeout ... what I was surprised by was that if that happens in a loop, it stops any timeouts in the system from ever triggering or making progress, for all eternity, until you stop alts!-ing on the promise chan. That is down to an implementation detail, and even in a single-threaded system, it doesn't need to be implemented or behave that way.

2019-06-06T16:10:36.056Z

ah yeah, that is a bummer

2019-06-06T16:32:15.057Z

Anyway thanks a lot for your help and insight!