Is it possible to configure a stream with Avro? I'm having some difficulty getting it to work. I'm using this example as a starting point
(def streams-config
{"application.id" "word-count"
"bootstrap.servers" (or (System/getenv "BOOTSTRAP_SERVERS") "localhost:9092")
"default.key.serde" "jackdaw.serdes.EdnSerde"
"default.value.serde" "jackdaw.serdes.EdnSerde"
"cache.max.bytes.buffering" "0"})
but haven't quite figured out what to specify for the default serdesI've taken a (slightly) different approach
{:topic-name #profile {:default #env FOO_TOPIC
:local "foo-bar"}
:replication-factor #long #profile {:default #or [#env REPLICATION_FACTOR 1]
:local 1}
:partition-count #long #profile {:default #or [#env PARTITION_COUNT 1]
:local 1}
:poll-interval-ms 1000
:seek #profile {:default #or [#env FOO_TOPIC_SEEK :latest]
:local :latest}
:key-serde :string
:value-serde :json}]
then, in my kafka client, I simply use the serde/serde-map
to process it
let the library figure out what to do 🙂
oh interesting, I'll give that a try
Hey, thanks for Jackdaw I think it’s great :thumbsup: I was wondering when the next release going to be ? currently using 0.6.8
but need to monkey patch it to use topics-ready?
(which was fixed on master with https://github.com/FundingCircle/jackdaw/commit/9cd9524ae2eab26694eceafab882f4396f6e804e)
@dbernal For Avro, if you define your topic metadata like this: https://github.com/FundingCircle/jackdaw/blob/master/examples/serdes/src/serdes.clj#L132-L134
You can use the serde resolver: https://github.com/FundingCircle/jackdaw/blob/master/examples/serdes/src/serdes.clj#L147-L154
It may not be possible to specify Avro as the default value serde. We would need to expose a class and I don’t think Jackdaw has one for Avro.
BTW, I’ve been refactoring some of the examples. In its current form, the Serde example may not work.
@thomas.ormezzano I can do a release later today.
oh thanks that’d be great 😻
@creese :thumbsup:
What is the standard way of handling errors when processing a topology with jackdaw? There doesn’t seem to be an error handling method
@creese @ticean thanks for the reading tips! I had consumed most of what I could find from Greg Young, including his half-written book. What I haven't read yet is anything that gets into the weeds enough to prepare me to write anything similar to what you've presented from FundingCircle. If you've read Enterprise Integration Patterns (just reading it now), do these patterns fit into your designs?
Ok gotcha. So my use case is having a KTable that I can query with .get. I'm able to create the table with the Avro serde but haven't been able to get it working alongside the KafkaStream. Is the issue that there's no equivalent class to jackdaw.serdes.EdnSerde that uses Avro and the schema registry?
(defn avro-serde [registry]
(avro/serde (merge avro/+base-schema-type-registry+
avro/+UUID-type-registry+)
{:avro.schema-registry/url (str "http://" registry)}
{:key? true}))
(defn store [] (Stores/inMemoryKeyValueStore "seller-store"))
(defn ktable [sb store] (js/ktable sb [{:topic-name "test.c.dbo.Seller" :key-serde (avro-serde "localhost:8081") :value-serde (avro-serde "localhost:8081")}] store))
(defn foo
[]
(let [builder (js/streams-builder)
store (Stores/inMemoryKeyValueStore "seller-store")
table (js/ktable builder {:topic-name "test.c.dbo.Seller"}
store)
kafka-stream (js/kafka-streams builder
{"bootstrap.servers" "localhost:9092"
"application.id" "foo-test-now"
"schema.registry.url" "localhost:8081"})]
(js/start kafka-stream)
{:stream kafka-stream
:table table}))
code for context
@callum838 we just how shutdown hooks https://kafka.apache.org/10/documentation/streams/developer-guide/write-streams
which of course in Clojure looks better than that
but it's probably not really documented anywhere since it's not really part of the library I guess
I usually set "default.deserialization.exception.handler"
to "org.apache.kafka.streams.errors.LogAndFailExceptionHandler"
in the streams config.
If there is an exception the app can’t handle, it logs the error and crashes.
I believe you can also log and skip but I haven’t tried that.
Sorry, I haven’t read that.
Nor have I. Added to my reading list. Thanks.
Sorry, my answer was only for deserialization exceptions. For other exceptions, @andrea.crotti is correct.
For the accounting system, we do something like, (.setUncaughtExceptionHandler streams-app the-handler)
We also wait 60 seconds before we shutdown the app.