onyx

FYI: alternative Onyx :onyx: chat is at <https://gitter.im/onyx-platform/onyx> ; log can be found at <https://clojurians-log.clojureverse.org/onyx/index.html>
2018-04-12T12:31:40.000263Z

i'm implementing a plugin for google pubsub on onyx, which works slightly different than kafka. specifically, rather than using a consumer offset, it wants you to manually ack each individual message. additionally, it has a stateful "subscription" in the backend, which automatically recovers. i'm looking at how to combine this with onyx' barrier snapshotting, and i think the right semantic would be to keep all messages "in flight" until checkpointed! is called, where i will commit all of them in a single operation. i think the sqs plugin has a different approach, where it tags each of the messages in flight with the epoch it belongs to, or something like that. is this correct ?

Travis 2018-04-12T12:33:27.000353Z

Be a cool plugin, we also run on gcp

2018-04-12T12:37:22.000083Z

yes it's needed

2018-04-12T12:37:32.000365Z

i'll contribute it back to OnyxProject when i'm done

2018-04-12T12:37:46.000409Z

could use some help with QA

dbernal 2018-04-12T12:43:15.000569Z

@matt.t.grimm I'm currently running a job that does exactly what you're describing. I read from a SQL table in a partitioned fashion and then several tasks in my job form queries against other tables in the database that contain additional information that I need

2018-04-12T15:40:24.000616Z

@dbernal Do you ensure that your I/O is idempotent? My understanding is that a peer may re-run jobs as it replays the event log (from http://lbradstreet.github.io/clojure/onyx/distributed-systems/2015/02/18/onyx-dashboard.html)

lucasbradstreet 2018-04-12T15:43:18.000202Z

We can’t ensure that IO is idempotent, but we do ensure that any windows are rolled back to a consistent checkpoint when failures occur, so you can achieve exactly once processing (as opposed to exactly once side effects).

lucasbradstreet 2018-04-12T15:44:33.000746Z

@lmergen I think the idea should be similar to SQS. You need to tag which epoch a message belongs to so that when an epoch has successfully been checkpointed you can ack the correct results.

2018-04-12T15:44:50.000445Z

right

lucasbradstreet 2018-04-12T15:45:14.000270Z

@lmergen it’s impossible to get exactly once behaviour with this scheme as it’s possible for the checkpoint to finish, but you are unabled to ack the messages before the peer fails.

2018-04-12T15:45:23.000744Z

yes that is what i'm going after

lucasbradstreet 2018-04-12T15:45:24.000174Z

You will get at least once behaviour though.

lucasbradstreet 2018-04-12T15:45:29.000870Z

Great

lucasbradstreet 2018-04-12T15:45:38.000390Z

Seems like a fine approach then 🙂

2018-04-12T15:45:52.000827Z

cool, i think this plugin is long overdue 🙂

2018-04-12T15:46:15.000652Z

apparently the whole clojure community is on AWS

lucasbradstreet 2018-04-12T15:46:51.000170Z

There’s a GCS checkpointing implementation waiting for extraction by others.

2018-04-12T15:47:03.000572Z

oh cool

lucasbradstreet 2018-04-12T15:47:07.000210Z

Really need to get around to freshening up the onyx-plugin template to make life easier for plugin devs.

2018-04-12T16:52:16.000326Z

Does the guarantee of exactly once processing imply that when a new peer comes online, replays the event log, and potentially reruns jobs, that it will not process segments/segment trees that have already been fully processed? In other words, if a job previously completed successfully, would its replay on the new peer amount to a no-op?

lucasbradstreet 2018-04-12T17:06:16.000023Z

When a peer initially replays the scheduler log it will not be scheduled for any processing, as its join/scheduling related log entries will not exist in the past

lucasbradstreet 2018-04-12T17:06:36.000104Z

So if a job had completed previously it’s not a problem - the new peer won’t do anything related to that job.

lucasbradstreet 2018-04-12T17:06:57.000129Z

It’s initially trying to get caught up with the current state of the cluster - not take any commands from it.

2018-04-12T17:09:48.000738Z

@lucasbradstreet Ah, OK, makes sense. Thank you!

sparkofreason 2018-04-12T18:12:35.000139Z

Are there reasons it could be better to run lots of small jobs communicating over external channels as opposed to one big one communicating internally via aeron? Currently doing the latter in a single-machine test environment, lots of CPU even when there's no segments, and it crashes under any load.

lucasbradstreet 2018-04-12T18:13:58.000197Z

How big are we talking / how many peers?

lucasbradstreet 2018-04-12T18:14:36.000391Z

I could see it being better to decouple jobs sometimes, but it comes with downsides especially around consistent checkpoints.

sparkofreason 2018-04-12T18:14:52.000649Z

102

lucasbradstreet 2018-04-12T18:15:25.000312Z

OK, yeah, I can see how that would be a problem. Onyx isn’t configured by default to work well with that many peers on one box.

lucasbradstreet 2018-04-12T18:16:06.000549Z

Usually it’s a smell that there are too many tasks doing too little.

sparkofreason 2018-04-12T18:17:24.000148Z

Thanks, I think I can reduce that number, I'm probably hand-wiring something better handled with onyx grouping or something like that.

lucasbradstreet 2018-04-12T18:18:33.000717Z

Yeah, often we see a lot of linear pipelines of A->B->C->D->E doing too little that would be better off collapsed. If you have trouble rationalising them let me know and I can give you some advice tuning it for a single box.