core-async

2020-01-30T09:16:50.312500Z

hmm... I don't understand something about the interaction between core.async/take and mults

2020-01-30T09:17:17.312800Z

this simplified example works: `

2020-01-30T09:17:19.313100Z

(let [input-chan (a/chan 32)
        input-mult (a/mult input-chan)

        plus-chan (a/reduce + 0 (a/tap input-mult (a/chan 32)))
        all-chan (a/into [] (a/tap input-mult (a/chan 32)))
        zero-chan (a/into [] (a/take 1 (a/tap input-mult (a/chan 32))))]

    (a/pipe (a/to-chan (range 33)) input-chan)

    {:all (a/<!! all-chan)
     :plus (a/<!! plus-chan)
     :zero (a/<!! zero-chan)})
  {:all [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32], :plus 528, :zero [0]}

2020-01-30T09:17:57.313500Z

this hangs forever

(let [input-chan (a/chan 32)
        input-mult (a/mult input-chan)

        plus-chan (a/reduce + 0 (a/tap input-mult (a/chan 32)))
        all-chan (a/into [] (a/tap input-mult (a/chan 32)))
        zero-chan (a/into [] (a/take 1 (a/tap input-mult (a/chan 32))))]

    (a/pipe (a/to-chan (range 34)) input-chan)

    {:all (a/<!! all-chan)
     :plus (a/<!! plus-chan)
     :zero (a/<!! zero-chan)})

2020-01-30T09:22:07.313800Z

an even simpler example:

(let [input-chan (a/chan 32)
        input-mult (a/mult input-chan)

        plus-chan (a/reduce + 0 (a/tap input-mult (a/chan 32)))
        zero-chan (a/into [] (a/take 1 (a/tap input-mult (a/chan 32))))]

    (a/pipe (a/to-chan (range 34)) input-chan)

    {:plus (a/<!! plus-chan)
     :zero (a/<!! zero-chan)})
  ;;; hangs forever

  (let [input-chan (a/chan 32)
        input-mult (a/mult input-chan)

        plus-chan (a/reduce + 0 (a/tap input-mult (a/chan 32)))
        zero-chan (a/into [] (a/take 1 (a/tap input-mult (a/chan 32))))]

    (a/pipe (a/to-chan (range 33)) input-chan)

    {:plus (a/<!! plus-chan)
     :zero (a/<!! zero-chan)})
  {:plus 528, :zero [0]}

2020-01-30T09:23:38.314400Z

creating multiple things off the same input is a really common pattern for me so I'd love to figure out what I'm doing wrong with the mult

bertofer 2020-01-30T10:00:13.319700Z

@otfrom I think the problem is with this channel (a/take 1 (a/tap input-mult (a/chan 32))) the one with 32 buffered. When the take takes 1 and returns a channel, the channel created for the tap is no longer consumed, so when the buffer is full, it prevents the mult from continuing distributing items, while the range 33 version works as it’s able to buffer all the items. If the tapped channel of the take you put a bigger buffer, then it works

bertofer 2020-01-30T10:02:59.320700Z

Not being able to distribute all messages makes the reduce ch to not close and then doesn’t produce a result

bertofer 2020-01-30T10:04:39.321900Z

(let [input-chan (a/chan 32)
        input-mult (a/mult input-chan)
        plus-chan (a/reduce + 0 (a/tap input-mult (a/chan 32)))
        zero-chan (a/into [] (a/take 1 (a/tap input-mult (a/chan 33))))]
    (a/pipe (a/to-chan (range 34)) input-chan)
    {:plus (a/<!! plus-chan)
     :zero (a/<!! zero-chan)})
Just with 33 on the zero-chan tap ch it works

2020-01-30T10:14:49.322500Z

ok. I had thought that the take would close its consuming channel when it got its one item thus not blocking the rest

2020-01-30T10:15:49.323300Z

forcing the channel for the a/take to be the size of the whole input doesn't feel like what I was hoping for with a/take

2020-01-30T10:16:09.323500Z

thx for the answer 🙂

bertofer 2020-01-30T10:36:53.323600Z

yeah I agree, a dropping or sliding buffer wouldn’t need to know the length , but if you want to close the channel so it doesn’t keep consuming the mult after the take, you could implement your own take that closes the input channel

2020-01-30T10:39:22.323800Z

ah, so just use a different kind of buffer. Thx. Hadn't twigged that one

2020-01-30T11:13:16.326Z

so for some of the results above (like the all-chan) what I really want to do is just stream it out to a file. I worry a bit about blocking the threadpool if I do that. Are they any good examples out there of taking something off a channel and streaming each record (or batches) to a file or other io?

2020-01-30T11:13:42.326300Z

thx for the help @bertofer

bertofer 2020-01-30T11:15:38.326800Z

np, I enjoyed to figure it out as it was quite puzzling

reborg 2020-01-30T11:45:43.327500Z

I’m wondering what advantages the above has compared to:

(let [input (range 34)
      all (future (into [] input))
      plus (future (reduce + input))
      zero (future (take 1 input))]
  {:all (deref all) 
   :plus (deref plus) 
   :zero (deref zero)})

2020-01-30T11:51:49.327900Z

for simple topologies I've written that exact code

2020-01-30T11:52:02.328200Z

does future use a pool or a new thread each time?

2020-01-30T11:52:29.328700Z

I've got other bits where there are shared transforms that go to some downstream components

2020-01-30T11:53:12.329Z

I suppose with your example there is no back pressure either

reborg 2020-01-30T11:54:54.329700Z

what should slow down if one of those parallel task is taking longer?

reborg 2020-01-30T11:55:16.330200Z

(it could be me not really understanding some concepts, so genuine questions)

2020-01-30T13:35:17.330700Z

good use of anomalies too I think

alexmiller 2020-01-30T13:55:46.331100Z

Expanding caching pool

2020-01-30T15:25:01.331800Z

@reborg do you mean why would I choose a larger buffer for one task rather than another?

2020-01-30T15:25:36.332200Z

If + was doing real work I'd want that to have a larger buffer

reborg 2020-01-30T15:32:38.332300Z

I was thinking about the need for backpressure, that it could be useful in case you don’t control the producer upstream, as a way to communicate the need for a slowdown without crashing the consumer or responding 500. But if you have control of the source of the data (such as a file), the backpressure is already built in with sequences or input streams. So I tend to think backpressure is more important with a streaming consumer (always on). I was under the impression you were dealing with one off processing via files.

2020-01-30T17:12:15.332500Z

hmm... I wonder if back pressure is the right term. For what I'm doing atm I'm not processing files, I'm processing generated simulations. The buffers let me process things multiple times without having to realise the whole of the sequence into memory at the same time or re-create the seq for multiple outputs

2020-01-30T17:55:48.332700Z

yeah, the usual idiom is to use thread, with blocking ops inside. helpfully thread returns a channel so it can be used in a go block idiomatically

2020-01-30T17:56:56.332900Z

tho here I think I just want it for the side effects