Probably a question that’s already solved. We wanted to make use of transducers with core async channels. The reason for using transducers is because the pipeline recipe can be leveraged in several situations. Here is the transducer flow
(comp
(map pre-fn)
(filter some-pred)
(buffer ms N)
;; don't know how to implement this stateful buffer
;; the transducer should buffer inputs and emit when ms millisecond has passed or N elems are accumulated, whichever comes first
(map (post-fn)))
Also, we wanted to make sure each step here continues reading from upstream only after the entire pipe is processed till the end. i.e. if (map post-fn) is currently executing, then (map pre-fn) should not consume from its upstream channel. Think of this like a pull based model (reactive streams). While this is not a major blocker, I’d be curious how to do this with core async
For now, we couldn’t do the entire pipeline using transducers so the buffer logic is temporarily built using
(defn buffer
[in ms N]
(let [out (chan)]
(go
(loop [b [] timer nil]
(if timer
(alt!
timer (do (>! out b)
(recur [] nil))
in ([v]
(let [new-buffer (conj b v)]
(if (= N (count new-buffer))
(do
(>! out new-buffer)
(recur [] nil))
(recur new-buffer timer)))))
(recur [(<! in)] (timeout ms)))))
out))
and the two ends are connected manually
something like this
(let [source (chan (comp (map pre-fn) (filter some-pred)))
buffering (buffer source ms N)]
(loop []
(post-fn (<! buffering))
(recur)))
transducers are a pull-based model, and core.async channels can have a transducer running on them. unclear to me what that doesn't satisfy?
I just meant if are processing maps that each have a :timestamp
field you can use those to measure time and do the batching
But that's true you then won't get batches completing until a new thing is processed
But I guess that's illustrative of the limitations of transducers talked about earlier in the thread, you can't do anything that's not directly in response to an input
i was inspired by flink’s checkpointing mechanism where it periodically sends barriers into the stream. i did the same thing at the source level by emitting flush events periodically (timeout) alongside messages. with that, I was able to use partition-by
with a stateful function 🙂 ^ thanks to your suggestion I was able to make this insight. the solution still needs a bit of furbishing
and i’m still testing it ^ let me know if this has any gotchas
the time based batching in the middle
ok sorry, i was under the assumption that each step in the transducer would run concurrently. i.e once the map mapped and sent it to the filter step, the map step would read from the input channel whilst the previous element is processed by the filter. should've ran some experiments
every element goes through the entire transducer steps before the next element gets consumed
so the only thing that remains is how to implement a buffer transducer
strictly speaking, you cannot
does a transducer with timer make any sense? coming from rxjava world it breaks back pressure and had a few issues
https://clojurians.slack.com/archives/C05423W6H/p1623261874016800?thread_ts=1623261548.015700&cid=C05423W6H hmm. that's fine. akka streams solves this for us. just wondered if at all it was possible to do so with core async before bringing in a new framework
the way you solve it with core.async is something like your buffer fn
the limitation is with something inherit to the model of transducers, outside of transducers you can do whatever
ya actually I'd rather not go for a new framework just to be able to write a linear pipeline 😛
so anyway what's the issue with buffer? where does it go wrong? i mean we have stateful transducers. won't this be a stateful transducer? or are you saying the timer makes the transducer non deterministic and that's the problem
A transducer is a a transformation of a step function (a function like you would pass to reduce)
And the step function interface/contract does something, then yields control back to the caller
It can't hang around and wait and see