core-async

2021-06-09T17:51:00.015Z

Probably a question that’s already solved. We wanted to make use of transducers with core async channels. The reason for using transducers is because the pipeline recipe can be leveraged in several situations. Here is the transducer flow

(comp
 (map pre-fn)
 (filter some-pred)
 (buffer ms N)
 ;; don't know how to implement this stateful buffer
 ;; the transducer should buffer inputs and emit when ms millisecond has passed or N elems are accumulated, whichever comes first
 (map (post-fn)))
Also, we wanted to make sure each step here continues reading from upstream only after the entire pipe is processed till the end. i.e. if (map post-fn) is currently executing, then (map pre-fn) should not consume from its upstream channel. Think of this like a pull based model (reactive streams). While this is not a major blocker, I’d be curious how to do this with core async For now, we couldn’t do the entire pipeline using transducers so the buffer logic is temporarily built using
(defn buffer
  [in ms N]
  (let [out (chan)]
    (go
      (loop [b [] timer nil]
        (if timer
          (alt!
            timer (do (>! out b)
                      (recur [] nil))
            in ([v]
                (let [new-buffer (conj b v)]
                  (if (= N (count new-buffer))
                    (do
                      (>! out new-buffer)
                      (recur [] nil))
                    (recur new-buffer timer)))))
          (recur [(<! in)] (timeout ms)))))
    out))
and the two ends are connected manually something like this
(let [source (chan (comp (map pre-fn) (filter some-pred)))
      buffering (buffer source ms N)]
  (loop []
    (post-fn (<! buffering))
    (recur)))

alexmiller 2021-06-09T17:59:08.015700Z

transducers are a pull-based model, and core.async channels can have a transducer running on them. unclear to me what that doesn't satisfy?

2021-06-10T10:51:04.020100Z

I just meant if are processing maps that each have a :timestamp field you can use those to measure time and do the batching

2021-06-10T10:53:01.020300Z

But that's true you then won't get batches completing until a new thing is processed

2021-06-10T10:56:15.020500Z

But I guess that's illustrative of the limitations of transducers talked about earlier in the thread, you can't do anything that's not directly in response to an input

2021-06-11T06:31:00.020700Z

i was inspired by flink’s checkpointing mechanism where it periodically sends barriers into the stream. i did the same thing at the source level by emitting flush events periodically (timeout) alongside messages. with that, I was able to use partition-by with a stateful function 🙂 ^ thanks to your suggestion I was able to make this insight. the solution still needs a bit of furbishing

2021-06-11T06:32:18.020900Z

and i’m still testing it ^ let me know if this has any gotchas

2021-06-09T18:01:07.016100Z

the time based batching in the middle

2021-06-09T18:02:04.016200Z

ok sorry, i was under the assumption that each step in the transducer would run concurrently. i.e once the map mapped and sent it to the filter step, the map step would read from the input channel whilst the previous element is processed by the filter. should've ran some experiments

2021-06-09T18:03:02.016400Z

every element goes through the entire transducer steps before the next element gets consumed

2021-06-09T18:03:39.016600Z

so the only thing that remains is how to implement a buffer transducer

2021-06-09T18:04:34.016800Z

strictly speaking, you cannot

2021-06-09T18:04:38.017Z

does a transducer with timer make any sense? coming from rxjava world it breaks back pressure and had a few issues

2021-06-09T18:14:38.017300Z

https://clojurians.slack.com/archives/C05423W6H/p1623261874016800?thread_ts=1623261548.015700&cid=C05423W6H hmm. that's fine. akka streams solves this for us. just wondered if at all it was possible to do so with core async before bringing in a new framework

2021-06-09T18:55:55.017600Z

the way you solve it with core.async is something like your buffer fn

2021-06-09T18:56:49.017800Z

the limitation is with something inherit to the model of transducers, outside of transducers you can do whatever

2021-06-09T19:01:44.018Z

ya actually I'd rather not go for a new framework just to be able to write a linear pipeline 😛

2021-06-09T19:02:43.018200Z

so anyway what's the issue with buffer? where does it go wrong? i mean we have stateful transducers. won't this be a stateful transducer? or are you saying the timer makes the transducer non deterministic and that's the problem

2021-06-09T19:28:07.018500Z

A transducer is a a transformation of a step function (a function like you would pass to reduce)

2021-06-09T19:29:32.018700Z

And the step function interface/contract does something, then yields control back to the caller

2021-06-09T19:30:00.018900Z

It can't hang around and wait and see

🙌 1