Hi! I see /onyx-savepoints/latest/tenancy-id/job-id
in zookeeper contains data for resume point creation. It’s populated after the job is completed, but killing the job prevents this data from being created. Could this data be populated even in the case the job is killed?
I suspect what is happening is that you are killing the job before a barrier makes it the whole way through the job. It will be populated if given enough time in relation to the barrier period peer-config setting (check the cheat sheet, I’m on my phone)
hello gents! how can I modify the word count example in the onyx-examples repo to emit a map of all counts every trigger instead of printing to stdout? https://github.com/onyx-platform/onyx-examples/tree/0.12.x/aggregation
I’ve managed to find a solution involving a custom aggregation function but it seems like there should be an easier way
the info in the user guide around aggregation seems to have fallen behind the last couple of releases, hopefully I can submit a couple of updates to it once I have figured out what to do 🙂
Hi @schmee, that’d be great if you could help us with the docs! When you say emit, do you mean to a downstream task?
indeed
You can use this in lieu of trigger/sync http://www.onyxplatform.org/docs/cheat-sheet/latest/#trigger-entry/:trigger/emit
I think I am misunderstanding something, cause I expected the state
map received in the dump-window!
function to be this map, but it seems like it only contains one word at a time?
An example can be found in a test here: https://github.com/onyx-platform/onyx/blob/0.12.x/test/onyx/windowing/emit_aggregate_test.clj
I’d love another onyx example demonstrating this
That example might be using the discarding refinement
Rather than the accumulating refinement
I’m on my phone so it’s hard to look too hard
ahh, I see, no worries 🙂
unfortunately the info around refinements is one of the parts that are really lacking from the docs at the moment
the changelog says that they have been replaced by pre- and post-evictors but there is no info on how to use these in the guide
I’ll play around a bit and see if I can figure it out 🙂
Hmm, yes that’s true.
Yes, the state in this example is just the count
Because it’s using the count aggregation
So it is not due to the evict or
hmm, I must be misinterpreting the docs: http://www.onyxplatform.org/docs/user-guide/latest/#_grouping
>>> For example, if you had the catalog entry set to :onyx/group-by-key with value :name, and you used a window aggregate of :onyx.windowing.aggregation/count, and you sent through segments [{:name “john”} {:name “tiffany”} {:name “john”}], the aggregate map would look like {“john” 2 “tiffany” 1}.
Ah yes, but the trigger is scoped to the group
So each segment will result in you only getting the state for that group
I see! Is there any way to get my hands on the mentioned “aggregate map” or is that an internal data structure?
That’s internal unless you build your own aggregation which maintains the whole thing
btw, here is how I rolled my own type counting example, I’m just curious if there is a better solution or if this is the canonical way to do it 🙂
(defn sum-init-fn [window]
{})
(defn sum-aggregation-fn [window segment]
{(:type segment) 1})
(defn sum-application-fn [window state value]
(merge-with + state value))
(def sum
{:aggregation/init sum-init-fn
:aggregation/create-state-update sum-aggregation-fn
:aggregation/apply-state-update sum-application-fn})
@schmee @lucasbradstreet :aggregation/create-state-update
fn signature has changed from (window, state, segment)
to (window, segment)
. (https://github.com/onyx-platform/onyx/blob/0.12.x/changes.md#0110).
But what’s the return type expected by sum-application-fn
? The code below gives me an error
Handling uncaught exception thrown inside task lifecycle :lifecycle/assign-windows. Killing the job. -> Exception type: java.lang.IllegalArgumentException. Exception message: Don't know how to create ISeq from: clojure.lang.Keyword
(defn sum-init-fn [window]
{})
(defn sum-aggregation-fn [window segment]
(let [k (second (:window/aggregation window))]
{:set-value k}))
(defn sum-application-fn [window state value]
(merge-with + state value))
Can you post the rest of the stack trace?
It might be from the second on (:window/aggregation window)
?
What’s that set to?
@lucasbradstreet Yeah, it was my bad. My payloads are A). And I changed the code to B).
A)
70144dea-cdd1-443d-9e7f-55cc5d0928d7,{:name "John" :age 49}
dbe61f62-257a-4f00-b85a-6d4c0cca44cd,{:name "Madeline" :age 55}
8e5998ab-5424-4ccb-9e54-5f3676aaa0b6,{:name "Geoffrey" :age 14}
;; B)
(defn sum-init-fn [window]
0)
(defn sum-aggregation-fn [window segment]
(let [k (-> segment :message :age)]
{:set-value k}))
(defn sum-application-fn [window state value]
(merge-with + state value))
So far I’ve been been able to implements Aggregation & State Management pretty cleanly.
Great 🙂
If you see anything in the docs that could be clarified, PRs are always welcome!
Triggers are also firing as expected 👍:skin-tone-5:
Yeah, I was following the Aggregation & State Management portion of the user guide. http://www.onyxplatform.org/docs/user-guide/0.12.x/#aggregation-state-management
I think that just needs to be updated.
I’ll try to whip up a PR. I really appreciate all the help!
Cool, I think we already received a PR there, so please branch off master if you decide to make fixes 🙂 https://github.com/onyx-platform/onyx/pull/835
master. Ok, lemme have a look….
I guess this is what you mean by “building your own aggregation”
Exactly, yes
sweet, then I’m on the right track, thanks for taking your time 👍
Good to hear. My reception is getting bad anyway. :)
@lucasbradstreet here’s a PR with a couple of doc fixes, I’ll make more as I go along 🙂 https://github.com/onyx-platform/onyx/pull/835