core-async

fiddlerwoaroof 2020-07-23T20:27:07.068500Z

Is there a transducer like paritition-all that times out if there are elements in a batch?

fiddlerwoaroof 2020-07-23T20:27:51.069500Z

for example, I have a core-async pipeline listening to a channel, and I want to process the elements in batches of 10, unless I have some elements queued up and 500ms have passed?

2020-07-23T20:28:02.069800Z

so batching by count / timeout, whichever comes first?

fiddlerwoaroof 2020-07-23T20:28:14.070Z

yeah, that would work too

fiddlerwoaroof 2020-07-23T20:28:24.070500Z

I was thinking timeout between consecutive items on the channel

2020-07-23T20:29:46.071500Z

I can imagine how to write this (using a timeout chan plus fresh collection as loop variable), and I think it would be small / simple, but I don't know of it existing

fiddlerwoaroof 2020-07-23T20:30:57.072300Z

Cool, I hate re-inventing the wheel for simple utilities like this: I always think there's some library with all the utilities I want

fiddlerwoaroof 2020-07-23T20:31:42.073100Z

It's a bit tricky, because I want a transducer: so I think you'd have to do some kind of stateful transducer that occasionally fires the reducing function without a new element coming in, or something like that

2020-07-23T20:33:00.073900Z

the natural thing would be to wrap the original channel with an alt / alts over that channel and a timeout, then creating a new timeout on each complete chunk

2020-07-23T20:33:31.074400Z

but that would consume and wrap your channel, unlike a transducer which is wrapped by the channel

fiddlerwoaroof 2020-07-23T20:40:10.075Z

Yeah, I have an ETL pipeline that basically works by providing a kafka topic, a target channel and a transducer

fiddlerwoaroof 2020-07-23T20:40:32.075500Z

I want to batch up the messages from kafka in one particular case, without rewriting everything

2020-07-23T20:53:47.076Z

it isn't possible to do that with a transducer

2020-07-23T21:03:30.076400Z

thanks - I was thinking that would be provable, but was stumped on the proof

2020-07-23T21:18:10.078100Z

a reducing function gets two arguments, and a transducer (a transformation of a reducing function) might cause it to close over something, so call it three arguments: state accum value

2020-07-23T21:19:23.079600Z

in a purely functional context the reducing function can only do things with those 3 arguments, and until it is done the thread doing it is blocked

2020-07-23T21:19:50.080200Z

(purely functional isn't exactly what I want here, I mean a context where the return value is important)

2020-07-23T21:20:27.081Z

for core.async channels the return value isn't really important because the reducing function is mutating something

2020-07-23T21:22:17.082800Z

erm, actually sorry, the reducing function would be four arguments, not three: original reducing function, state, accum, value

2020-07-23T21:22:54.083500Z

so a transducer on a channel could stick those three things in a queue somewhere to be run at another time

2020-07-23T21:23:46.084600Z

but that would be very unsafe because you will mutating the internals of a channel without the channel's lock being held

2020-07-23T21:26:21.086900Z

so for the pure model it is not possible because the transducer/reducing function has to yield control to the actual reducing code, and practically in core.async where you might use another thread to retain control after returning, it is unsafe because of locks

fiddlerwoaroof 2020-07-23T21:46:38.087800Z

Ok, so I think that's what I was wondering, it's unsafe to call the reducing function asynchronously?

fiddlerwoaroof 2020-07-23T21:48:10.088900Z

e.g.

([coll next]
  (future (sleep 10)
          (rf coll (inc next)))
  (rf coll next))

fiddlerwoaroof 2020-07-23T21:48:39.089400Z

In this case, line 3 could potentially cause some issue with the channel's locks?

ghadi 2020-07-23T21:49:35.089600Z

yes

ghadi 2020-07-23T21:50:29.090500Z

if you are looking for a pulsed+batching process, it's very straightforward

fiddlerwoaroof 2020-07-23T21:50:32.090900Z

That's a weird abstraction leak, to me 🙂

ghadi 2020-07-23T21:50:52.091300Z

that would still be broken whether it was core.async or not

ghadi 2020-07-23T21:51:13.092Z

the return value of a reducing function generally needs to be used

fiddlerwoaroof 2020-07-23T21:51:14.092200Z

Oh yeah, because coll is wrong

fiddlerwoaroof 2020-07-23T21:51:19.092400Z

sorry

ghadi 2020-07-23T21:52:18.092600Z

fiddlerwoaroof 2020-07-23T21:53:03.093Z

I know that way 🙂

fiddlerwoaroof 2020-07-23T21:53:31.093600Z

I already have a transducer and I wanted to add a step to batch the incoming messages to the transducer

ghadi 2020-07-23T21:54:10.094400Z

as mentioned before, not possible to factor in timing delays in a transducer

fiddlerwoaroof 2020-07-23T21:54:54.095600Z

Yeah, I guess it makes sense