I'd like to have a trigger that fires periodically, but only if new segments have arrived. Is that possible?
@dave.dixon yes, you’ll need to write a new trigger that uses some of the logic from the timer trigger as well as setting a flag when a segment comes through. See https://github.com/onyx-platform/onyx/blob/0.12.x/src/onyx/triggers.cljc#L56
You can build your own triggers and keep them as part of your code, but if you get it all worked out we’d be happy to add it to Onyx.
Thanks, did not realize you could do custom triggers. Very cool.
Yep. The different event types that you can chose to act upon are listed here: http://www.onyxplatform.org/docs/cheat-sheet/latest/#state-event/:event-type
The trigger states are checkpointed and may be recovered on a different node, so be careful to reset any times on recovery.
The current timer trigger just choses to fire on recovery and reset the timer that way.
So generally, if a namespace-qualified keyword like :onyx.triggers/timer
appears as a value in a job-definition map, is it safe to assume that refers to a symbol, and thus the associated attribute is a potential extension point?
Yeah, it might have been nice to support symbols there too, but that’s right.
Hmm. Interesting. Won’t that only fire when it sees a new segment and enough time has passed? So if not enough time has passed and then a segment comes, it won’t fire in the future if no more segments arrive?
Yes, that's the behavior I was looking for. But what you describe may be more generally useful.
Cool. This is more likely to have fresh emits when you do emit.
Ah cool, you’re able to handle both cases then.
One quick non-obvious thought is that if you are going to be using group-by-key/group-by-fn with a high cardinality of keys, you may want to switch the trigger-state from a map to a vector. This is because you will be storing the keywords as strings so your state will be larger when serialized. It’s not really a big deal otherwise though.
That’s the main reason all of the other trigger destructure vectors instead.
As the state gets bigger those keywords will compress better though, so overall it might not hurt so much.
Ah. I was wondering why vectors were used. I'll probably make that change, with the optimism we'll have so many customers it will matter.
🙂
It’d be more ready for inclusion in Onyx too.
Assuming you want to create a PR
I'll probably do that.
Great
This is something that I’ve wanted for a while as it makes it easily to amortise the cost of state updates at the cost of a little latency.
is it normal for an Onyx job to increasingly spawn threads? I'm trying to run a job with production type settings and it keeps running out of memory. Wondering why this could be happening or if I'm setting something wrong
It’ll only increasingly spawn threads if your peers are dying and being rebooted, and even then the old threads should be shutting themselves down. Do you have a log of the behaviour? What’s the general usage pattern: which plugins, are you using windows, do you have large windows, etc?
Using the SQL plugin as input and an elasticsearch plugin as output. I'm using a window for aggregation as soon as input tasks are done so it's possible it might be too big?
@dbernal yeah, if you’re building up a lot of intermediate state before flushing / emitting it, then it seems like you could easily run out of memory.
By default all of that state is kept in the memory state store, which is an atom
If you’re using onyx-peer-http-query, you can query the /metrics endpoint and look at the checkpoint_size metric. That’ll give you a good indication of how much state you’re building up over time.
We have an experimental disk backed state store that uses LMDB, but you will likely want to evaluate whether what you’re doing is sensible as there are pretty big costs in managing/checkpointing that much state.
@lucasbradstreet ok thank you. I'll go ahead and see if that happens to be the problem. It seems like I might be using Onyx with windows the wrong way as I'm essentially trying to do a group-by-key on a SQL database and then flushing segments to be saved somewhere else. Is it sensible for me to approach that problem using windows or should I rethink my overall design? If what you say about maintaining state happens to be the issue then using windows for long batch aggregations might be the wrong way to go about it
That strategy makes sense, but maybe you’re building up too much state before flushing, or you’re not evicting after the trigger fires? Hard to tell without seeing the code though. You should probably confirm that you’re building up a lot of state via that metric I mentioned first though.
ok, I'll go ahead and check that first
@lucasbradstreet It turns out that wasn't the issue after all. Looks like I'm creating a new pooled connection in one of my function tasks every time it's called with a new segment. Would it be possible to maintain an active SQL connection pool for a given peer so that it can be reused for new segments being processed by a task?
Aha! Ok yeah that’s bad too :)
You need the connection in your onyx/fn?
it needs to reach out to SQL to find some more information out about the segment that came in
If so, you can use a lifecycle with lifecycle/before-task-start to inject the connection into :onyx.core/params
, which allows you to pass the connection into your onyx/fn.
We provide a datomic lifecycle that does something similar here: https://github.com/onyx-platform/onyx-datomic/blob/0.12.x/src/onyx/plugin/datomic.clj#L408
You will want to also implement after-task-stop to close it when the peer gets stopped.
ah ok, I'll take a look. I had tried to do that but I ran into some issues trying to serialize it. I assumed everything passed as a param needed to be serialized
Ah. You were probably trying to do it though param injection via the task map, which does get serialized.
That’s the easier approach but is not amenable to injecting stuff like connections
ok gotcha. I'll give it another shot then. Thanks for all the help!
No problem. Glad you figured it out and it was an easy problem to fix :)