I’ve a question, maybe someone can point me in the right direction. Background: I’ve had some success implementing onyx in a java/native DSL with a small number of data segments passed through a workflow, and am now trying to scale it up to do some end to end tests on some slightly larger data before moving to test some of our production streams. This is for a scientific data application. The workflow has five linear tasks (no workflow branching):
input task -> task 2 (produces map) -> task 3 (produces vector) -> task 4 (produces vector) -> output task.
The system works up to a point - i.e., a small subset of my entire data. It seems that when I reach a certain number of total system segments, the running tasks start reading null instead of the expected segment. I.e., task 2 might pass out 10 maps in a vector to task 3, which then might each produce 1000 maps in the output vector to task 4. At that point, I get a lot of null pointer errors, and there seems to be nothing in the segment.
What’s odd is this number and associated processed segments seems to be determinate for whatever workflow configuration I have - it always fails at the same point for a given configuration. (I’ve changed it around a bunch to see if I can see any patterns in when it bombs). I wanted to rule out race conditions because of the cross language aspect.
So, for the above workflow,
If I set the system to produce 2012+ segments from task 3, I get java null pointer exceptions for the last segment(s) and the job fails.
If I set the system to produce 2011 segments from task 3, the job finishes, but other jobs running on the same env stall during zookeeper connection.
If I set the system to produce 2010 segments from task 3, this job finishes and 7 other subsequent jobs finish successfully as well.
I believe I’ve narrowed it down to either a zookeeper, aeron, or core-async plugin issue (although I might be completely wrong). So far I’ve updated the system to use a real zookeeper deployment, and it appears to have the same issues as the in-memory version. I’ve also traced through the core-async lifecycle logic for my input/output channels, and I can’t find any obvious problems with that which go against the core.async or Onyx documentation.
So, given all this, is there something that jumps out at someone? Something I’m implementing incorrectly? Could it be related to zookeeper max node size/unrolling segments from passed out vectors, i.e. is this an anti-pattern? Or something else?
I’d really appreciate any help or direction anyone might offer.@rberkheimer Can you pass us a stacktrace? It sounds like an issue with the data that you're passing through if you're getting that reliably.
I am running Onyx locally (in the REPL) and the onyx-kafka consumers seem to consistently ignore the first segment that I place onto a topic. Is there a config setting somewhere that could be causing this?
Using the latest Onyx - 0.12.7 - for Onyx and all plugins.
Setting :kafka/offset-reset
to :earliest
seems to do it. I would've thought that when there is no initial offset in Kafka, :earliest
and :latest
would do the same thing.
earliest/latest is only used to seek when no checkpointed offset is found
offset-default might have been a better name
But it matches auto.offset.reset logic in the kafka consumer
There shouldn't be any checkpointed offset when starting from a fresh instance of ZK and Kafka, correct?
Correct, since it would be pretty hard to build a resume point to use a checkpoint from s3 without ZooKeeper maintaining state
Correct, since it would be pretty hard to build a resume point to use a checkpoint from s3 without ZooKeeper maintaining state
And that’s assuming you supplied a resume point in the first place.
Hi, I am also facing a problem. It looks like the kafka read-messages task is not saving the checkpoint. I am using earliest and all topic’s messages are being read again and again.
Sounds like your job is probably throwing an exception, and handle-exception chose to restart? Anything showing up in onyx.log?
Is this for a single job or multiple jobs?
a single job
Gotta run but I hope you figure it out via the onyx.log. Most likely explanation is that your job is restarting itself due to some reason (maybe unsuccessful checkpoints).
thank you
i will verify this
@lucasbradstreet you are right. it is a npe. ty