Hi, reading the doc and trying the examples. I’m wondering if there’s a way to redirect the output of a window aggregation to a catalog entry part of the workflow. It seems that every example will dump the window either to stdout or a db (with side effects). However, what if i’d like to do transformations post aggregation? Is that possible?
Yes
very common
@guillaume.carbonneau here is example
(defn health-check-memory-task
[task-name peer-config task-opts]
{:task {:task-map (merge
{:onyx/name task-name
:onyx/group-by-key :hash
:onyx/flux-policy :recover
:onyx/n-peers 2
:onyx/type :reduce}
task-opts)
:windows [{:window/id :health-check-memory
:window/task task-name
:window/type :global
:window/aggregation ::health-check-memory-aggregation}]
:triggers [{:trigger/window-id :health-check-memory
:trigger/id :health-check-memory0
:trigger/emit ::emit-health-check
:trigger/on :onyx.triggers/segment
:trigger/threshold [1 :elements]}]}})
(defn emit-health-check
[event window trigger window-data state]
(when-not (= :job-completed (:event-type window-data))
(:health-check state)))
Then just connect them in workflow, and optionally add flow conditions.