jackdaw

https://github.com/FundingCircle/jackdaw
xi 2019-07-23T09:39:52.014500Z

Thanks @cddr! I'm blind, I must have looked at every other file but that one >.< join doesn't take a topic-config? Although, think I'm being dumb (as well as blind! In my defence, I'm sick) - I should use to/through.

2019-07-23T10:00:45.015600Z

> join doesn't take a topic-config? Ah I guess there's a few other variable names which imply a topic-config

2019-07-23T10:01:03.015900Z

What are you trying to do?

xi 2019-07-23T10:56:47.016800Z

I'm trying to rekey two topics, join them then run a processor

xi 2019-07-23T10:57:58.018Z

So far I have something like (left-join (-&gt; (kstream ...) (select-key ...)) (-&gt; (kstream ...) (group-by ...) (aggregate ...))

xi 2019-07-23T10:58:52.019100Z

which I plan to pass into process!, but I need to add a state store and I can't see how to in Jackdaw

xi 2019-07-23T10:59:29.020Z

My team picked the processor API initially for simplicity then got rushed along adding features and we've only just remembered to revisit the KStreams API

xi 2019-07-23T10:59:48.020300Z

So I need to make them work together as we transition

xi 2019-07-23T11:00:08.020700Z

I can have a go at the state store with kstreamsbuilder* but that's not ideal

2019-07-23T11:08:15.024300Z

So that all sounds totally reasonable and I think it is supported by jackdaw. The process! method takes a function which takes as the first parameter a ProcessorContext. This is where you can get access to the state-store and perform side-effects. If you plan to use a state-store, I think you need to add them to the topology at build time using methods on the StreamsBuilder and as you've identified you can get that from the kstreamsbuilder* method.

xi 2019-07-23T11:09:14.024700Z

Sounds like I'm on the right track then - thanks for the help

2019-07-23T11:18:43.028300Z

I haven't thought too much about how to make it easier to get access to state stores. We do it a bit ourselves and just drop down to the interop but I'm sure we could improve on the "ergonomics" in light of real-world examples. Let me know if you have any thoughts on how you'd like it to work.

xi 2019-07-23T13:09:25.029Z

I got my stream to build. I used this:

(defn kvsb                               
  [store]                                
  (Stores/keyValueStoreBuilder           
   (Stores/persistentKeyValueStore store)
   store-serde store-serde))             

(.addStateStore (j/streams-builder* builder) (k/kvsb "my-store"))

xi 2019-07-23T13:11:58.030200Z

It'd be nice to be able to do more like: (add-state-store builder (persistent-key-value-store "my-store" serde serde)) (especially if that returned builder, to maintain threadability)

xi 2019-07-23T13:15:50.030900Z

Or maybe (add-state-store builder (with (persistent-key-value-store "my-store") serde serde))