Hello again... I'm trying to mix the streams + processor APIs. Specifically I want to add a Processor to the end of my topology which sends messages to multiple topics. The issue I'm hitting is that Streams (/Jackdaw) gives my processor a random name, so I have no easy way to call Topology.addSink(String processor-name, ...)
.
I'd prefer to avoid introspecting the topology to dig out that name, I can't see a way to do that robustly
Are you using the Processor
because you need some other feature not provided by the DSL? I think you can achieve what you've described with something like this...
(fn [builder]
(let [foo (k/stream builder foo-topic)]
(-> foo (k/map identity) (k/to foo-1))
(-> foo (k/map identity) (k/to foo-2))
(-> foo (k/map identity) (k/to foo-3))
builder))
This is related to a wider concern that if 1 message (a command) is broken up into many events and those events are committed separately, that a command may get partially executed in case of downtime. Otherwise I could branch
.
Ah ok. So you want to have control over when the commit happens. :thinking_face:
Yep exactly
(defn dispatch-processor
"Why not `flat-map f` beforehand instead of passing it in here? That would
split one message into many and therefore commit many times instead of once."
[f ctx k v]
(doseq [[topic msg-key msg] (f [k v])]
(.forward ctx msg-key msg (To/child topic)))
(.commit ctx))
that's what I'm trying to attach
Open to better ways to achieve this