…
- Is there an example of multiple calls to submit-job
, for separate but overlapping workflows (core.async or kafka)?
- So that would be i) an input core.async channel (or kafka topic) that goes into an onyx processing function which then ii) outputs to a channel (or topic), then subsequently iii) is the input to a downstream onyx processing function
- I’m trying and failing (both with kafka and core.async), and don’t see any example here. https://github.com/onyx-platform/learn-onyx https://github.com/onyx-platform/onyx-examples https://github.com/onyx-platform/onyx-commander-example
- Only 1 output path in the workflow :inputA -> :processB -> :outputC
(ex: https://github.com/onyx-platform/onyx-commander-example/blob/master/src/onyx_commander_example/commander.clj#L3-L5).
- And an example with the node in the middle, is just a processing function (https://github.com/onyx-platform/learn-onyx/blob/master/test/workshop/jobs/challenge_1_3_test.clj#L14-L20).
I don’t have any examples handy, but there shouldn’t be any problem doing so.
I have these 2 workflows (using, core.async channels).
(def workflow1
[[:scanner-command :ibgateway]
[:ibgateway :scanner-command-result]
[:ibgateway :scanner]])
(def workflow2
[[:scanner :market-scanner]
[:market-scanner :filtered-stocks]])
I loop over both and call (onyx.api/submit-job peer-config job)
on each iteration. I have successful job-ids, so I think they both went through.
Are you setting up the channels manually or are you using the convenience functions to generate some?
If it’s the latter, it’s probably generating two channels for :scanner
I’m manually creating channels, and putting into lifecycles like so.
(defn inject-scanner-command-ch [event lifecycle]
{:core.async/buffer in-buffer
:core.async/chan base/chan-scanner-command})
(defn inject-scanner-command-result-ch [event lifecycle] {:core.async/chan base/chan-scanner-command-result})
(defn inject-scanner-ch [event lifecycle] {:core.async/chan base/chan-scanner})
(def in-calls-scanner-command {:lifecycle/before-task-start inject-scanner-command-ch})
(def out-calls-scanner-command-result {:lifecycle/before-task-start inject-scanner-command-result-ch})
(def out-calls-scanner {:lifecycle/before-task-start inject-scanner-ch})
(defn lifecycles [platform-type]
({:kafka []
:onyx [{:lifecycle/task :scanner-command
:lifecycle/calls :com.interrupt.streaming.platform.scanner-command/in-calls-scanner-command}
{:lifecycle/task :scanner-command
:lifecycle/calls :onyx.plugin.core-async/reader-calls}
{:lifecycle/task :scanner-command-result
:lifecycle/calls :com.interrupt.streaming.platform.scanner-command/out-calls-scanner-command-result}
{:lifecycle/task :scanner-command-result
:lifecycle/calls :onyx.plugin.core-async/writer-calls}
{:lifecycle/task :scanner
:lifecycle/calls :com.interrupt.streaming.platform.scanner-command/out-calls-scanner}
{:lifecycle/task :scanner
:lifecycle/calls :onyx.plugin.core-async/writer-calls}]}
platform-type))
… So the :scanner channel in both workflows, is the same channel (`base/chan-scanner`).
I can see that messages are process from base/chan-scanner-command
to base/chan-scanner
.
But my repl hangs when trying to consume from a downstream channel.
…
Does this make sense? Let me know what other context I should provide.
chan-scanner-command is a var with the channel?
You’ve tested that they get to the output channel on the first job, right?
Yes. Correct on both points.
Have you looked at your onyx.log for any errors?
Sorry, I’m really busy today so I can’t really help you dig in further. There’s no technical reason that I can think of why it shouldn’t work.
We’ve chained onyx jobs together via kafka topics - core.async should be no different.
I suspect the second job is erroring and killing itself
No, not at all. Thanks for responding as far as you have.
Let me have a look at onyx.log.