hello 👋
I’m trying to use jackdaw to implement a window-by-time, count, suppress, and output program almost identical to this java https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#window-final-results :
KGroupedStream<UserId, Event> grouped = ...;
grouped
.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofMinutes(10)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()))
.filter((windowedUserId, count) -> count < 3)
.toStream()
.foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));
My code using jackdaw function wrappers around the same kstreams methods:
(defn build-topology!
[config builder]
(-> (js/kstream builder (config/topic (:INPUT_TOPIC config)))
(js/filter moder/moderated?)
(js/group-by (fn [[_ v]] (:goalId v)))
(js/window-by-time (.grace (TimeWindows/of (Duration/ofMinutes 5)) (Duration/ofMillis 1)))
(js/count)
(js/suppress {})
(js/to-kstream)
(js/print!) ...
works fine, with the exception of the suppress function, which, when added in throws a
java.lang.RuntimeException: No reader function for tag object
at clojure.lang.EdnReader$TaggedReader.readTagged(EdnReader.java:801) ...
See https://github.com/henryw374/time-literals for a lib that does this
Hey, not sure if you ever solved this but I ran into something similar. It was due to going from a jackdaw avro serde to and edn serde but the avro had a field with a type of bytes (decimal logical type). This caused the serializtion to edn to fail due to it having a BufferBytea class for that field value
Has anyone come across this before? It has me totally stumped, as I’m not sure what suppress is trying to serde or why it is going wrong
is this while running the code in a repl?
Could be seeing an issue with the underlying Clojure.java-time
lib:
https://github.com/dm3/clojure.java-time/issues/15
No, not in a repl, when I run with lein
Thanks