jackdaw

https://github.com/FundingCircle/jackdaw
dbernal 2018-12-05T19:40:56.002800Z

is there a difference in how an avro serde has to be specified when using streams? Setting the key-serde and value-serde in the topic config is throwing following error:

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: clojure.lang.PersistentArrayMap / value type: clojure.lang.PersistentArrayMap). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

arrdem 2018-12-05T20:07:19.003100Z

@dbernal can you paste me how your’e building the serdes?

dbernal 2018-12-05T20:41:51.003300Z

(def +registry+
 (merge jav/+base-schema-type-registry+
        jav/+UUID-type-registry+))

(defn key-serde []
  (jav/avro-serde +registry+
                  {:avro.schema-registry/url "<http://localhost:8081>"}
                  {:key? false :avro/schema key-schema}))

(defn value-serde []
  (jav/avro-serde +registry+
                  {:avro.schema-registry/url "<http://localhost:8081>"}
                  {:key? false :avro/schema value-schema}))

(defn topic-config-sp []
  {:topic-name "my-topic"
   :key-serde (key-serde)
   :value-serde (value-serde)})

arrdem 2018-12-05T21:00:05.003500Z

hummmm

arrdem 2018-12-05T21:00:50.003700Z

lemme dig some, that looks right

dbernal 2018-12-05T21:03:08.004Z

This is the complete error:

ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks: stream-thread [store-price-fc3c23ca-5cce-49a3-910b-4ea3d09819d7-StreamThread-1] Failed to process stream task 0_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=staging.dbo.StorePrice, partition=0, offset=14826
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:304)
	at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
	at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:957)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: clojure.lang.PersistentArrayMap / value type: clojure.lang.PersistentArrayMap). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
	at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
	at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
	at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:288)
	... 6 more
Caused by: java.lang.ClassCastException: clojure.lang.PersistentArrayMap cannot be cast to [B
	at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:153)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:98)
	at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
	... 18 more

dbernal 2018-12-05T21:03:38.004500Z

Looks like it might be failing at a specific offset

arrdem 2018-12-05T21:35:35.005Z

Maybe - more likely that’s just the latest offset for the consumer and it’s throwing when deserializing anything.

arrdem 2018-12-05T21:37:33.005400Z

@dbernal you aren’t using Avro’s fixed at all are you?

arrdem 2018-12-05T21:38:00.005900Z

May I be the first to admit that Jakdaw’s Avro layer is incomplete and doesn’t support fixed atm.

2018-12-05T21:45:38.006900Z

@dbernal This may not be the serde at all. Can you post the code the resulted in this exception?

2018-12-05T21:46:40.007500Z

I would create a round-trip serde test if you haven’t already to rule this out.

2018-12-05T21:47:32.008300Z

Also, in the above snippet, :key? is set to false for both serdes. I think for the key serde fn, this should probably be set to true.

dbernal 2018-12-05T22:06:24.011200Z

@arrdem I'm not sure if I am or not. The stream that I'm reading from is populated by Debezium and I'm not sure what type of Avro it's setting up the streams with. However, this only happens when I use KStreams and not with a consumer client using poll. @creese This is the code I'm using

(defn build-topology
  [builder]
  (-&gt; (js/kstream builder (topic-config-sp))
      (js/peek (fn [[k v]]
                 (info (str {:key k :value v}))))
      (js/to {:topic-name "es-sink-2"}))
  builder)

(defn start-app
  [app-config]
  (let [builder (js/streams-builder)
        topology (build-topology builder)
        app (js/kafka-streams topology app-config)]
    (js/start app)
    (info "pipe is up")
    app))
. How would I conduct a round-trip serde test? Would I use the Jackdaw library for that? I switched the key? value to true but it didn't change the exception being thrown.

arrdem 2018-12-05T22:10:46.012500Z

That this works with the client APIs but not with the streams API is concerning.

2018-12-05T22:11:07.012900Z

Using either jackdaw.client or jackdaw.streams, publish to a topic and then read from the same topic.

2018-12-05T22:12:41.014Z

RE: Debezium: There are different kinds of Avro serdes. Does Debezium put its schema in the Schema Registry?

2018-12-05T22:13:13.014600Z

If so, it’s probably using Confluent Avro which is what Jackdaw uses.

2018-12-05T22:13:50.015200Z

In any case, I would try the RT test to ensure the serde isn’t to blame.

dbernal 2018-12-05T22:13:50.015300Z

@creese It does publish to the Schema Registry

dbernal 2018-12-05T22:14:09.015500Z

ok, I'll go ahead and try that

jeremy 2018-12-05T23:10:31.016400Z

@creese Could you elaborate on the different kinds of Avro serdes? Off topic but Confluent Avro vs which have you found to be different?

arrdem 2018-12-05T23:21:21.017500Z

https://docs.confluent.io/current/avro.html

arrdem 2018-12-05T23:21:42.018100Z

I believe that Confluent’s Avro serializer attaches an extra schema ID tag to the head of every serialized record.

arrdem 2018-12-05T23:22:05.018600Z

This lets you dynamically discover the decoding schema required for any given record, so long as your schema registry remains intact.

2018-12-05T23:23:45.019500Z

standard Avro which does not use the Schema Registry and does not encode an ID in the magic byte.

2018-12-05T23:24:17.020100Z

I don’t have any experience with that. I’ve only used Confluent Avro.

jeremy 2018-12-05T23:25:13.020500Z

I will need to do more research into that. I have been considering Avro for Kafka more lately.

arrdem 2018-12-05T23:26:41.021Z

It’s worked out pretty well for us - but it does mean that you have to be very careful to back up the schema registry’s Kafka topic.

arrdem 2018-12-05T23:27:25.021700Z

An alternative is managing your own Avro, or using something like CBOR which gives you really good binary coding without needing fixed schemas.

arrdem 2018-12-05T23:29:42.022500Z

Depending on how you use Kafka (short lived message bus or durable storage) nippy or something else would be pretty adequate

jeremy 2018-12-05T23:34:28.023100Z

Currently we are storing an immutable event stream into Kafka with Protobufs and lets just say it has been less than ideal so we have been considering a switch.

jeremy 2018-12-05T23:35:39.023800Z

There has been some debate on a schema centric format vs a self describing format.

jeremy 2018-12-05T23:40:45.025700Z

We use Java primarily but I have been writing tooling in Clojure. I was at the conj when @creese gave the talk on Kafka and Jackdaw. It’s been nice to work with so far. :thumbsup:

👍 2
arrdem 2018-12-05T23:48:24.026400Z

Yeah - that’s not really a debate I can shed light on 😕 it’s just a hard problem.

👍 1
arrdem 2018-12-05T23:55:06.027700Z

I guess my $0.02 at this point is that I’d probably choose a self-describing format like CBOR, just because I think trying to encode hard business constraints in your serialization layer is mostly premature

arrdem 2018-12-05T23:55:48.028100Z

But my only real experience here has been the same as Charles - using Confluent Avro.