I'm looking to create something that models a branching flow like this
in
+
|
+--->error?+--->error
|
v
xf
+
|
+--->error?+--->error
|
v
out
Data comes in the in
, any elements for which error?
returns true go to the error
channel. Then apply xf
to all non-error elements from in. Any elements where error?
is true also go to the error
channel. Any non-error elements go to out
.
I was thinking something like this
(defn pipeline-transform
[in xf error-pred out error]
(let [[errors in'] (async/split error-pred in)
_ (async/pipe errors error false)
apply-xf (async/chan nil xf)
_ (async/pipe in' apply-xf)
[errors apply-xf'] (async/split error-pred apply-xf)
_ (async/pipe errors error)]
(async/pipe apply-xf' out)
nil))
That seems a bit hard to follow though. Is there something built-in that I'm missing that would allow me to write this cleaner?how about this?
(defn pipeline-transform
[in xf error? out errors]
(let [applied-ch (async/chan xf)
[invalid-ch valid-ch] (async/split error? in)
[fail-ch success-ch] (async/split error? applied-ch)
errors' (async/merge [invalid-ch fail-ch])]
(async/pipe valid-ch applied-ch)
(async/pipe success-ch out)
(async/pipe errors' errors)))
@serioga think pub is for distribution messages to multiple subscribers. Maybe it'd work here where the 2 subscribers are out & error. Need to think on that a bit more. @ben.sless Like the original code, that doesn't apply back pressure to the producer. Specifying buffers of 1 would probably prevent that. Since they're essentially the same code, I'm not sure which code layout I'd prefer.
I stuck to the original semantics, I see applying backpressure as orthogonal to the design problem 🙂 maybe build an idiom on top of that... with a building block like this:
(def errors (async/mix erros-out))
(defn either
[in out errors pred]
(let [[ch-t ch-f] (async/split pred in)]
(async/admix errors ch-f)
(async/pipe ch-t out)))
That code also doesn't apply back pressure on the producer which could be quite problematic.