Is there a transducer like paritition-all
that times out if there are elements in a batch?
for example, I have a core-async pipeline listening to a channel, and I want to process the elements in batches of 10, unless I have some elements queued up and 500ms have passed?
so batching by count / timeout, whichever comes first?
yeah, that would work too
I was thinking timeout between consecutive items on the channel
I can imagine how to write this (using a timeout chan plus fresh collection as loop variable), and I think it would be small / simple, but I don't know of it existing
Cool, I hate re-inventing the wheel for simple utilities like this: I always think there's some library with all the utilities I want
It's a bit tricky, because I want a transducer: so I think you'd have to do some kind of stateful transducer that occasionally fires the reducing function without a new element coming in, or something like that
the natural thing would be to wrap the original channel with an alt / alts over that channel and a timeout, then creating a new timeout on each complete chunk
but that would consume and wrap your channel, unlike a transducer which is wrapped by the channel
Yeah, I have an ETL pipeline that basically works by providing a kafka topic, a target channel and a transducer
I want to batch up the messages from kafka in one particular case, without rewriting everything
it isn't possible to do that with a transducer
thanks - I was thinking that would be provable, but was stumped on the proof
a reducing function gets two arguments, and a transducer (a transformation of a reducing function) might cause it to close over something, so call it three arguments: state accum value
in a purely functional context the reducing function can only do things with those 3 arguments, and until it is done the thread doing it is blocked
(purely functional isn't exactly what I want here, I mean a context where the return value is important)
for core.async channels the return value isn't really important because the reducing function is mutating something
erm, actually sorry, the reducing function would be four arguments, not three: original reducing function, state, accum, value
so a transducer on a channel could stick those three things in a queue somewhere to be run at another time
but that would be very unsafe because you will mutating the internals of a channel without the channel's lock being held
so for the pure model it is not possible because the transducer/reducing function has to yield control to the actual reducing code, and practically in core.async where you might use another thread to retain control after returning, it is unsafe because of locks
Ok, so I think that's what I was wondering, it's unsafe to call the reducing function asynchronously?
e.g.
([coll next]
(future (sleep 10)
(rf coll (inc next)))
(rf coll next))
In this case, line 3 could potentially cause some issue with the channel's locks?
yes
if you are looking for a pulsed+batching process, it's very straightforward
That's a weird abstraction leak, to me 🙂
that would still be broken whether it was core.async or not
the return value of a reducing function generally needs to be used
Oh yeah, because coll
is wrong
sorry
I know that way 🙂
I already have a transducer and I wanted to add a step to batch the incoming messages to the transducer
as mentioned before, not possible to factor in timing delays in a transducer
Yeah, I guess it makes sense