Hi
Thanks for the library. I'm using it in production already replacing my own custom Java interop code
One question I have: I'm trying to create a topology with a time-windowed reduce. But the topology fails with
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: jackdaw.serdes.fn_impl.FnSerializer) is not compatible to the actual key or value type (key type: org.apache.kafka.streams.kstream.Windowed / value type: clojure.lang.PersistentArrayMap). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
Now my question: how do I get a windowed serde?
I don't see any of this in the tests so I could obviously try to do this via Java interops
but I don't see any special handling in the streams_test.clj
for the reduce
cases so I'm wondering if I'm doing something wrong
my code more or less:
(-> joining-stream
(j/to-kstream)
(j/group-by-key)
(j/window-by-time (TimeWindows/of 1000))
(j/reduce (fn [_ new-value] new-value) intermediate-topic)
(j/to-kstream)
(j/to (topic-config output-topic))))))
any idea?