jackdaw

https://github.com/FundingCircle/jackdaw
abdullahibra 2020-06-16T10:28:14.049400Z

Hi everyone

abdullahibra 2020-06-16T10:28:44.050200Z

How can i use LogAndContinue on Exception to make the stream not shutdown if something goes wrong?

2020-06-16T11:21:50.051200Z

Hey @abdullahibra, you would just set the corresponding property in the property map that gets passed to the kafka-streams constructor.

2020-06-16T11:25:19.052300Z

{...
 "default.deserialization.exception.handler" "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"
 ...}

2020-06-16T11:33:53.055100Z

So...just wanted to announce here that I made a super simple thing for converting EDN into avro generic records independently of any other interfaces like Serde or KafkaAvroSerializer/Deserializer etc. This is relevant to jackdaw because the avro serde in jackdaw is....problematic shall we say (this is my fault so it's not intended as a criticism of the current maintainers). With this (or something like it since it's only ~10 lines of code), you could just set the serde to be confluent's standard "io.confluent.kafka.streams.serdes.avro.GenericAvroSerde" and then in your app, use as-edn and as-avro as necessary to pack/unpack records. IMO this is simpler and safer (i.e. less prone to bugs) for new projects than the avro serde bundled into jackdaw. https://github.com/cddr/edn-avro

💯 1
abdullahibra 2020-06-16T11:45:45.055300Z

you mean the config of kafka-streams:

{"application.id" "hello
   "bootstrap.servers" kafka-url
   "cache.max.bytes.buffering" "0"
   "default.deserialization.exception.handler" 
   "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"}

abdullahibra 2020-06-16T11:45:46.055500Z

?

abdullahibra 2020-06-16T11:49:07.055700Z

@cddr ?

2020-06-16T11:49:45.055900Z

yep exactly

👍 1
abdullahibra 2020-06-16T11:55:09.056200Z

@cddr i have done that but still got the state transitioned to SHUTDOWN because of errors

abdullahibra 2020-06-16T11:56:08.056400Z

some of trace:

abdullahibra 2020-06-16T11:56:12.056600Z

-StreamThread-1] Failed to process stream task 0_0 due to the following error:
                      org.apache.kafka.streams.processor.internals.StreamThread.run          StreamThread.java:  788
                  org.apache.kafka.streams.processor.internals.StreamThread.runLoop          StreamThread.java:  819
                  org.apache.kafka.streams.processor.internals.StreamThread.runOnce          StreamThread.java:  912
                   org.apache.kafka.streams.processor.internals.TaskManager.process           TaskManager.java:  425
          org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process  AssignedStreamsTasks.java:  199
                    org.apache.kafka.streams.processor.internals.StreamTask.process            StreamTask.java:  363
                    org.apache.kafka.streams.processor.internals.SourceNode.process            SourceNode.java:   87
          org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward  ProcessorContextImpl.java:  133
          org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward  ProcessorContextImpl.java:  180
          org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward  ProcessorContextImpl.java:  201
                 org.apache.kafka.streams.processor.internals.ProcessorNode.process         ProcessorNode.java:  117
  org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process            KStreamMap.java:   42
          org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward  ProcessorContextImpl.java:  133
          org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward  ProcessorContextImpl.java:  180
          org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward  ProcessorContextImpl.java:  201
                 org.apache.kafka.streams.processor.internals.ProcessorNode.process         ProcessorNode.java:  117
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process           KStreamPeek.java:   42
                                      jackdaw.streams.lambdas.FnForeachAction/apply                lambdas.clj:   30

abdullahibra 2020-06-16T11:57:21.056800Z

-baafde36-3cd8-43ae-8edf-0680d911545f] All stream threads have died. The instance will be in error state and should be closed.
-baafde36-3cd8-43ae-8edf-0680d911545f-StreamThread-1] Shutdown complete

2020-06-16T12:21:57.057Z

@abdullahibra I think the problem is that the error is happening after the record has been deserialized. Looks like there's a problem in the function you're using in the foreach action.

abdullahibra 2020-06-16T12:25:18.057200Z

that's it

abdullahibra 2020-06-16T12:25:44.057400Z

so if i catch Exception nil around that function, i won't get stuck again

abdullahibra 2020-06-16T12:25:57.057600Z

i mean the function inside foeach block

2020-06-16T12:29:31.057800Z

Yes but is that what you actually want? The exception might indicate a problem that needs to be fixed.

abdullahibra 2020-06-16T12:30:56.058Z

yeah but that's not restriction

abdullahibra 2020-06-16T12:31:00.058200Z

thanks you 🙂