jackdaw

https://github.com/FundingCircle/jackdaw
dbernal 2019-10-08T08:26:16.087600Z

Is it possible to configure a stream with Avro? I'm having some difficulty getting it to work. I'm using this example as a starting point

(def streams-config
  {"application.id" "word-count"
   "bootstrap.servers" (or (System/getenv "BOOTSTRAP_SERVERS") "localhost:9092")
   "default.key.serde" "jackdaw.serdes.EdnSerde"
   "default.value.serde" "jackdaw.serdes.EdnSerde"
   "cache.max.bytes.buffering" "0"})
but haven't quite figured out what to specify for the default serdes

dharrigan 2019-10-08T08:31:18.087900Z

I've taken a (slightly) different approach

dharrigan 2019-10-08T08:32:06.088100Z

{:topic-name #profile {:default #env FOO_TOPIC
                                                                       :local "foo-bar"}
                                                 :replication-factor #long #profile {:default #or [#env REPLICATION_FACTOR 1]
                                                                                     :local 1}
                                                 :partition-count #long #profile {:default #or [#env PARTITION_COUNT 1]
                                                                                  :local 1}
                                                 :poll-interval-ms 1000
                                                 :seek #profile {:default #or [#env FOO_TOPIC_SEEK :latest]
                                                                 :local :latest}
                                                 :key-serde :string
                                                 :value-serde :json}]

dharrigan 2019-10-08T08:32:44.088700Z

then, in my kafka client, I simply use the serde/serde-map to process it

dharrigan 2019-10-08T08:33:02.089Z

let the library figure out what to do 🙂

dbernal 2019-10-08T08:41:10.089300Z

oh interesting, I'll give that a try

tzzh 2019-10-08T10:15:01.091500Z

Hey, thanks for Jackdaw I think it’s great :thumbsup: I was wondering when the next release going to be ? currently using 0.6.8 but need to monkey patch it to use topics-ready? (which was fixed on master with https://github.com/FundingCircle/jackdaw/commit/9cd9524ae2eab26694eceafab882f4396f6e804e)

2019-10-08T11:21:52.092100Z

@dbernal For Avro, if you define your topic metadata like this: https://github.com/FundingCircle/jackdaw/blob/master/examples/serdes/src/serdes.clj#L132-L134

2019-10-08T11:25:02.093900Z

It may not be possible to specify Avro as the default value serde. We would need to expose a class and I don’t think Jackdaw has one for Avro.

2019-10-08T11:27:45.095900Z

BTW, I’ve been refactoring some of the examples. In its current form, the Serde example may not work.

2019-10-08T11:28:33.096400Z

@thomas.ormezzano I can do a release later today.

tzzh 2019-10-08T11:29:00.096900Z

oh thanks that’d be great 😻

dharrigan 2019-10-08T11:36:43.097100Z

@creese :thumbsup:

callumcodes 2019-10-08T14:34:00.097700Z

What is the standard way of handling errors when processing a topology with jackdaw? There doesn’t seem to be an error handling method

Travis Brown 2019-10-08T15:08:12.097800Z

@creese @ticean thanks for the reading tips! I had consumed most of what I could find from Greg Young, including his half-written book. What I haven't read yet is anything that gets into the weeds enough to prepare me to write anything similar to what you've presented from FundingCircle. If you've read Enterprise Integration Patterns (just reading it now), do these patterns fit into your designs?

dbernal 2019-10-08T15:49:32.097900Z

Ok gotcha. So my use case is having a KTable that I can query with .get. I'm able to create the table with the Avro serde but haven't been able to get it working alongside the KafkaStream. Is the issue that there's no equivalent class to jackdaw.serdes.EdnSerde that uses Avro and the schema registry?

dbernal 2019-10-08T15:50:45.098100Z

(defn avro-serde [registry]
  (avro/serde (merge avro/+base-schema-type-registry+
                     avro/+UUID-type-registry+)
              {:avro.schema-registry/url (str "http://" registry)}
              {:key? true}))

(defn store [] (Stores/inMemoryKeyValueStore "seller-store"))

(defn ktable [sb store] (js/ktable sb [{:topic-name "test.c.dbo.Seller" :key-serde (avro-serde "localhost:8081") :value-serde (avro-serde "localhost:8081")}] store))

(defn foo
  []
  (let [builder (js/streams-builder)
        store (Stores/inMemoryKeyValueStore "seller-store")
        table (js/ktable builder {:topic-name "test.c.dbo.Seller"}
                         store)
        kafka-stream (js/kafka-streams builder
                                       {"bootstrap.servers" "localhost:9092"
                                        "application.id" "foo-test-now"
                                        "schema.registry.url" "localhost:8081"})]
    (js/start kafka-stream)
    {:stream kafka-stream
     :table table}))

dbernal 2019-10-08T15:50:55.098300Z

code for context

2019-10-08T16:26:55.099400Z

which of course in Clojure looks better than that

2019-10-08T16:27:27.100200Z

but it's probably not really documented anywhere since it's not really part of the library I guess

2019-10-08T17:12:57.100500Z

I usually set "default.deserialization.exception.handler" to "org.apache.kafka.streams.errors.LogAndFailExceptionHandler" in the streams config.

2019-10-08T17:13:38.100700Z

If there is an exception the app can’t handle, it logs the error and crashes.

2019-10-08T17:13:57.100900Z

I believe you can also log and skip but I haven’t tried that.

2019-10-08T17:15:23.101400Z

Sorry, I haven’t read that.

2019-10-08T17:16:59.101600Z

Nor have I. Added to my reading list. Thanks.

2019-10-08T17:18:58.102600Z

Sorry, my answer was only for deserialization exceptions. For other exceptions, @andrea.crotti is correct.

2019-10-08T17:20:38.103500Z

For the accounting system, we do something like, (.setUncaughtExceptionHandler streams-app the-handler)

2019-10-08T17:21:48.104600Z

We also wait 60 seconds before we shutdown the app.