is there a difference in how an avro serde has to be specified when using streams? Setting the key-serde and value-serde in the topic config is throwing following error:
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: clojure.lang.PersistentArrayMap / value type: clojure.lang.PersistentArrayMap). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
@dbernal can you paste me how your’e building the serdes?
(def +registry+
(merge jav/+base-schema-type-registry+
jav/+UUID-type-registry+))
(defn key-serde []
(jav/avro-serde +registry+
{:avro.schema-registry/url "<http://localhost:8081>"}
{:key? false :avro/schema key-schema}))
(defn value-serde []
(jav/avro-serde +registry+
{:avro.schema-registry/url "<http://localhost:8081>"}
{:key? false :avro/schema value-schema}))
(defn topic-config-sp []
{:topic-name "my-topic"
:key-serde (key-serde)
:value-serde (value-serde)})
hummmm
lemme dig some, that looks right
This is the complete error:
ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks: stream-thread [store-price-fc3c23ca-5cce-49a3-910b-4ea3d09819d7-StreamThread-1] Failed to process stream task 0_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=staging.dbo.StorePrice, partition=0, offset=14826
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:304)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:957)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: clojure.lang.PersistentArrayMap / value type: clojure.lang.PersistentArrayMap). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:288)
... 6 more
Caused by: java.lang.ClassCastException: clojure.lang.PersistentArrayMap cannot be cast to [B
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:153)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:98)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 18 more
Looks like it might be failing at a specific offset
Maybe - more likely that’s just the latest offset for the consumer and it’s throwing when deserializing anything.
@dbernal you aren’t using Avro’s fixed
at all are you?
May I be the first to admit that Jakdaw’s Avro layer is incomplete and doesn’t support fixed
atm.
@dbernal This may not be the serde at all. Can you post the code the resulted in this exception?
I would create a round-trip serde test if you haven’t already to rule this out.
Also, in the above snippet, :key?
is set to false for both serdes. I think for the key serde fn, this should probably be set to true.
@arrdem I'm not sure if I am or not. The stream that I'm reading from is populated by Debezium and I'm not sure what type of Avro it's setting up the streams with. However, this only happens when I use KStreams and not with a consumer client using poll
. @creese This is the code I'm using
(defn build-topology
[builder]
(-> (js/kstream builder (topic-config-sp))
(js/peek (fn [[k v]]
(info (str {:key k :value v}))))
(js/to {:topic-name "es-sink-2"}))
builder)
(defn start-app
[app-config]
(let [builder (js/streams-builder)
topology (build-topology builder)
app (js/kafka-streams topology app-config)]
(js/start app)
(info "pipe is up")
app))
. How would I conduct a round-trip serde test? Would I use the Jackdaw library for that? I switched the key? value to true but it didn't change the exception being thrown.That this works with the client APIs but not with the streams API is concerning.
Using either jackdaw.client or jackdaw.streams, publish to a topic and then read from the same topic.
RE: Debezium: There are different kinds of Avro serdes. Does Debezium put its schema in the Schema Registry?
If so, it’s probably using Confluent Avro which is what Jackdaw uses.
In any case, I would try the RT test to ensure the serde isn’t to blame.
@creese It does publish to the Schema Registry
ok, I'll go ahead and try that
@creese Could you elaborate on the different kinds of Avro serdes? Off topic but Confluent Avro vs which have you found to be different?
I believe that Confluent’s Avro serializer attaches an extra schema ID tag
to the head of every serialized record.
This lets you dynamically discover the decoding schema required for any given record, so long as your schema registry remains intact.
standard Avro which does not use the Schema Registry and does not encode an ID in the magic byte.
I don’t have any experience with that. I’ve only used Confluent Avro.
I will need to do more research into that. I have been considering Avro for Kafka more lately.
It’s worked out pretty well for us - but it does mean that you have to be very careful to back up the schema registry’s Kafka topic.
An alternative is managing your own Avro, or using something like CBOR which gives you really good binary coding without needing fixed schemas.
Depending on how you use Kafka (short lived message bus or durable storage) nippy
or something else would be pretty adequate
Currently we are storing an immutable event stream into Kafka with Protobufs and lets just say it has been less than ideal so we have been considering a switch.
There has been some debate on a schema centric format vs a self describing format.
We use Java primarily but I have been writing tooling in Clojure. I was at the conj when @creese gave the talk on Kafka and Jackdaw. It’s been nice to work with so far. :thumbsup:
Yeah - that’s not really a debate I can shed light on 😕 it’s just a hard problem.
I guess my $0.02 at this point is that I’d probably choose a self-describing format like CBOR, just because I think trying to encode hard business constraints in your serialization layer is mostly premature
But my only real experience here has been the same as Charles - using Confluent Avro.