jackdaw

https://github.com/FundingCircle/jackdaw
Laura Wyglendacz 2021-06-04T09:02:54.007Z

hello 👋

Laura Wyglendacz 2021-06-04T09:04:59.008200Z

I’m trying to use jackdaw to implement a window-by-time, count, suppress, and output program almost identical to this java https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#window-final-results :

KGroupedStream<UserId, Event> grouped = ...;
grouped
    .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofMinutes(10)))
    .count()
    .suppress(Suppressed.untilWindowCloses(unbounded()))
    .filter((windowedUserId, count) -> count < 3)
    .toStream()
    .foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));

Laura Wyglendacz 2021-06-04T09:08:12.009600Z

My code using jackdaw function wrappers around the same kstreams methods:

(defn build-topology!
         [config builder]
         (-> (js/kstream builder (config/topic (:INPUT_TOPIC config)))
          (js/filter moder/moderated?)
          (js/group-by (fn [[_ v]] (:goalId v)))
          (js/window-by-time (.grace (TimeWindows/of (Duration/ofMinutes 5)) (Duration/ofMillis 1)))
          (js/count)
          (js/suppress {})
          (js/to-kstream)
          (js/print!) ...
works fine, with the exception of the suppress function, which, when added in throws a
java.lang.RuntimeException: No reader function for tag object
	at clojure.lang.EdnReader$TaggedReader.readTagged(EdnReader.java:801) ...

2021-06-06T10:54:20.011400Z

See https://github.com/henryw374/time-literals for a lib that does this

2021-06-15T09:31:38.015400Z

Hey, not sure if you ever solved this but I ran into something similar. It was due to going from a jackdaw avro serde to and edn serde but the avro had a field with a type of bytes (decimal logical type). This caused the serializtion to edn to fail due to it having a BufferBytea class for that field value

Laura Wyglendacz 2021-06-04T09:09:01.010400Z

Has anyone come across this before? It has me totally stumped, as I’m not sure what suppress is trying to serde or why it is going wrong

2021-06-04T12:36:47.010500Z

is this while running the code in a repl? Could be seeing an issue with the underlying Clojure.java-time lib: https://github.com/dm3/clojure.java-time/issues/15

Laura Wyglendacz 2021-06-04T16:55:58.010800Z

No, not in a repl, when I run with lein

Laura Wyglendacz 2021-06-04T16:56:02.011Z

Thanks