I think I understand what you mean. You'd like to do something like...
(defn foo
[builder dispatch-fn]
(-> (k/stream builder foo)
(k/map transform-foo)
(k/filter foo-valid?)
(k/process! (partial dispatch-processor dispatch-fn))))
...but in order for your dispatch processor to work, you'd need to (.addSink sink-id processor-id key-serializer value-serializer processor-id)
for each sink you want to write to in your dispatch-processor, and the processor-id assigned by the DSL is pseudo-random, making it difficult to add a sink with the correct "parent processor".
I think the solution is to use .addProcessor
rather than k/process!
. This allows you to assign a name to the processor.