Thanks @cddr! I'm blind, I must have looked at every other file but that one >.<
join
doesn't take a topic-config
? Although, think I'm being dumb (as well as blind! In my defence, I'm sick) - I should use to/through
.
> join
doesn't take a topic-config
?
Ah I guess there's a few other variable names which imply a topic-config
What are you trying to do?
I'm trying to rekey two topics, join them then run a processor
So far I have something like (left-join (-> (kstream ...) (select-key ...)) (-> (kstream ...) (group-by ...) (aggregate ...))
which I plan to pass into process!
, but I need to add a state store and I can't see how to in Jackdaw
My team picked the processor API initially for simplicity then got rushed along adding features and we've only just remembered to revisit the KStreams API
So I need to make them work together as we transition
I can have a go at the state store with kstreamsbuilder*
but that's not ideal
So that all sounds totally reasonable and I think it is supported by jackdaw.
The process! method takes a function which takes as the first parameter a ProcessorContext
. This is where you can get access to the state-store and perform side-effects.
If you plan to use a state-store, I think you need to add them to the topology at build time using methods on the StreamsBuilder
and as you've identified you can get that from the kstreamsbuilder*
method.
Sounds like I'm on the right track then - thanks for the help
I haven't thought too much about how to make it easier to get access to state stores. We do it a bit ourselves and just drop down to the interop but I'm sure we could improve on the "ergonomics" in light of real-world examples. Let me know if you have any thoughts on how you'd like it to work.
I got my stream to build. I used this:
(defn kvsb
[store]
(Stores/keyValueStoreBuilder
(Stores/persistentKeyValueStore store)
store-serde store-serde))
(.addStateStore (j/streams-builder* builder) (k/kvsb "my-store"))
It'd be nice to be able to do more like:
(add-state-store builder (persistent-key-value-store "my-store" serde serde))
(especially if that returned builder, to maintain threadability)
Or maybe (add-state-store builder (with (persistent-key-value-store "my-store") serde serde))