I don't know every case, but certainly aggregate
and reduce
accept an argument, normally in the last position of topic-config
, which is a map (at least) containing topic-name
which names the store they build and the repartition topic if one occurs
That seems to name the store, not the stream.
Ah yeah you're right @markbastian. Can't do that in jackdaw at the moment.
You're basically talking about this line?
KStream<String,String> stream =
builder.stream("input", <http://Consumed.as|Consumed.as>("Customer_transactions_input_topic"));
I haven't been keeping up with the latest from kafka but what's the use-case for naming stateless nodes in the topology? IIRC, what drove the requirement to give custom names to nodes was that if you inserted additional nodes into some topology, the state stores associated with certain nodes would change name (and thus you'd lose state).
The use case is mostly sugar. It's exactly as stated in this post regarding https://docs.confluent.io/current/streams/developer-guide/dsl-topology-naming.html. I think the ability to generate the topology diagrams with labels makes communication (especially externally) about your topologies much easier.
Yeah. Definitely worth adding.
BTW, IDK if you work at funding circle, but the post I use for reference for a lot of my work now gives me a 403. Should this be broken: https://engineering.fundingcircle.com/blog/2019/08/27/kafka-streams-the-clojure-way/
Oh that's no good. I used to work there. Will try to let someone know
Oh wait, @andrea.crotti or @minimal might know someone who can fix that.
Ah congrats on the new job.
Last request (and if the team takes PRs I am down with submitting one): Currently jackdaw.streams/kafka-streams function builds a topology then the streams app and returns it. Seems like it should be split into two functions like so:
(defn ^Topology topology
"Makes a Kafka Topology object."
[builder]
(.build ^StreamsBuilder (streams-builder* builder)))
(defn kafka-streams
"Makes a Kafka Streams object."
([topology opts]
(let [props (java.util.Properties.)]
(.putAll props opts)
(KafkaStreams. topology ^java.util.Properties props))))
That way when you just want to analyze the topology you don't need to build the streams app or do Java interop. Thoughts?I think that makes sense but it probably breaks compatibility for existing callers of kafka-streams
.
Maybe make kafka-streams a protocol that dispatches on a topology or builder or just internally check the type of the first argument to kafka-streams.
I'm not at fc anymore and we don't use kafka so I can't really help unfortunately