i'm implementing a plugin for google pubsub on onyx, which works slightly different than kafka. specifically, rather than using a consumer offset, it wants you to manually ack each individual message. additionally, it has a stateful "subscription" in the backend, which automatically recovers.
i'm looking at how to combine this with onyx' barrier snapshotting, and i think the right semantic would be to keep all messages "in flight" until checkpointed!
is called, where i will commit all of them in a single operation.
i think the sqs plugin has a different approach, where it tags each of the messages in flight with the epoch it belongs to, or something like that. is this correct ?
Be a cool plugin, we also run on gcp
yes it's needed
i'll contribute it back to OnyxProject when i'm done
could use some help with QA
@matt.t.grimm I'm currently running a job that does exactly what you're describing. I read from a SQL table in a partitioned fashion and then several tasks in my job form queries against other tables in the database that contain additional information that I need
@dbernal Do you ensure that your I/O is idempotent? My understanding is that a peer may re-run jobs as it replays the event log (from http://lbradstreet.github.io/clojure/onyx/distributed-systems/2015/02/18/onyx-dashboard.html)
We can’t ensure that IO is idempotent, but we do ensure that any windows are rolled back to a consistent checkpoint when failures occur, so you can achieve exactly once processing (as opposed to exactly once side effects).
@lmergen I think the idea should be similar to SQS. You need to tag which epoch a message belongs to so that when an epoch has successfully been checkpointed you can ack the correct results.
right
@lmergen it’s impossible to get exactly once behaviour with this scheme as it’s possible for the checkpoint to finish, but you are unabled to ack the messages before the peer fails.
yes that is what i'm going after
You will get at least once behaviour though.
Great
Seems like a fine approach then 🙂
cool, i think this plugin is long overdue 🙂
apparently the whole clojure community is on AWS
There’s a GCS checkpointing implementation waiting for extraction by others.
oh cool
Really need to get around to freshening up the onyx-plugin template to make life easier for plugin devs.
Does the guarantee of exactly once processing imply that when a new peer comes online, replays the event log, and potentially reruns jobs, that it will not process segments/segment trees that have already been fully processed? In other words, if a job previously completed successfully, would its replay on the new peer amount to a no-op?
When a peer initially replays the scheduler log it will not be scheduled for any processing, as its join/scheduling related log entries will not exist in the past
So if a job had completed previously it’s not a problem - the new peer won’t do anything related to that job.
It’s initially trying to get caught up with the current state of the cluster - not take any commands from it.
@lucasbradstreet Ah, OK, makes sense. Thank you!
Are there reasons it could be better to run lots of small jobs communicating over external channels as opposed to one big one communicating internally via aeron? Currently doing the latter in a single-machine test environment, lots of CPU even when there's no segments, and it crashes under any load.
How big are we talking / how many peers?
I could see it being better to decouple jobs sometimes, but it comes with downsides especially around consistent checkpoints.
102
OK, yeah, I can see how that would be a problem. Onyx isn’t configured by default to work well with that many peers on one box.
Usually it’s a smell that there are too many tasks doing too little.
Thanks, I think I can reduce that number, I'm probably hand-wiring something better handled with onyx grouping or something like that.
Yeah, often we see a lot of linear pipelines of A->B->C->D->E doing too little that would be better off collapsed. If you have trouble rationalising them let me know and I can give you some advice tuning it for a single box.