onyx

FYI: alternative Onyx :onyx: chat is at <https://gitter.im/onyx-platform/onyx> ; log can be found at <https://clojurians-log.clojureverse.org/onyx/index.html>
twashing 2017-12-10T19:38:19.000059Z

… - Is there an example of multiple calls to submit-job, for separate but overlapping workflows (core.async or kafka)?

twashing 2017-12-10T19:38:23.000141Z

- So that would be i) an input core.async channel (or kafka topic) that goes into an onyx processing function which then ii) outputs to a channel (or topic), then subsequently iii) is the input to a downstream onyx processing function

twashing 2017-12-10T19:38:38.000065Z

- I’m trying and failing (both with kafka and core.async), and don’t see any example here. https://github.com/onyx-platform/learn-onyx https://github.com/onyx-platform/onyx-examples https://github.com/onyx-platform/onyx-commander-example

twashing 2017-12-10T19:38:50.000082Z

- Only 1 output path in the workflow :inputA -&gt; :processB -&gt; :outputC (ex: https://github.com/onyx-platform/onyx-commander-example/blob/master/src/onyx_commander_example/commander.clj#L3-L5).

twashing 2017-12-10T19:38:59.000086Z

- And an example with the node in the middle, is just a processing function (https://github.com/onyx-platform/learn-onyx/blob/master/test/workshop/jobs/challenge_1_3_test.clj#L14-L20).

lucasbradstreet 2017-12-10T19:41:51.000069Z

I don’t have any examples handy, but there shouldn’t be any problem doing so.

twashing 2017-12-10T19:46:30.000059Z

I have these 2 workflows (using, core.async channels).

(def workflow1
  [[:scanner-command :ibgateway]
   [:ibgateway :scanner-command-result]
   [:ibgateway :scanner]])

(def workflow2
  [[:scanner :market-scanner]
   [:market-scanner :filtered-stocks]])

twashing 2017-12-10T19:47:33.000066Z

I loop over both and call (onyx.api/submit-job peer-config job) on each iteration. I have successful job-ids, so I think they both went through.

lucasbradstreet 2017-12-10T19:47:36.000016Z

Are you setting up the channels manually or are you using the convenience functions to generate some?

lucasbradstreet 2017-12-10T19:47:53.000032Z

If it’s the latter, it’s probably generating two channels for :scanner

twashing 2017-12-10T19:49:10.000104Z

I’m manually creating channels, and putting into lifecycles like so.

(defn inject-scanner-command-ch [event lifecycle]
  {:core.async/buffer in-buffer
   :core.async/chan base/chan-scanner-command})
(defn inject-scanner-command-result-ch [event lifecycle] {:core.async/chan base/chan-scanner-command-result})
(defn inject-scanner-ch [event lifecycle] {:core.async/chan base/chan-scanner})

(def in-calls-scanner-command {:lifecycle/before-task-start inject-scanner-command-ch})
(def out-calls-scanner-command-result {:lifecycle/before-task-start inject-scanner-command-result-ch})
(def out-calls-scanner {:lifecycle/before-task-start inject-scanner-ch})

(defn lifecycles [platform-type]
  ({:kafka []
    :onyx [{:lifecycle/task :scanner-command
            :lifecycle/calls :com.interrupt.streaming.platform.scanner-command/in-calls-scanner-command}
           {:lifecycle/task :scanner-command
            :lifecycle/calls :onyx.plugin.core-async/reader-calls}

           {:lifecycle/task :scanner-command-result
            :lifecycle/calls :com.interrupt.streaming.platform.scanner-command/out-calls-scanner-command-result}
           {:lifecycle/task :scanner-command-result
            :lifecycle/calls :onyx.plugin.core-async/writer-calls}

           {:lifecycle/task :scanner
            :lifecycle/calls :com.interrupt.streaming.platform.scanner-command/out-calls-scanner}
           {:lifecycle/task :scanner
            :lifecycle/calls :onyx.plugin.core-async/writer-calls}]}
   platform-type))

twashing 2017-12-10T19:50:23.000030Z

… So the :scanner channel in both workflows, is the same channel (`base/chan-scanner`).

twashing 2017-12-10T19:51:11.000014Z

I can see that messages are process from base/chan-scanner-command to base/chan-scanner.

twashing 2017-12-10T19:52:03.000086Z

But my repl hangs when trying to consume from a downstream channel.

twashing 2017-12-10T19:54:37.000074Z

twashing 2017-12-10T19:55:10.000017Z

Does this make sense? Let me know what other context I should provide.

lucasbradstreet 2017-12-10T20:00:08.000001Z

chan-scanner-command is a var with the channel?

lucasbradstreet 2017-12-10T20:00:30.000106Z

You’ve tested that they get to the output channel on the first job, right?

twashing 2017-12-10T20:01:22.000138Z

Yes. Correct on both points.

lucasbradstreet 2017-12-10T20:02:16.000008Z

Have you looked at your onyx.log for any errors?

lucasbradstreet 2017-12-10T20:02:56.000025Z

Sorry, I’m really busy today so I can’t really help you dig in further. There’s no technical reason that I can think of why it shouldn’t work.

lucasbradstreet 2017-12-10T20:03:12.000053Z

We’ve chained onyx jobs together via kafka topics - core.async should be no different.

lucasbradstreet 2017-12-10T20:03:36.000097Z

I suspect the second job is erroring and killing itself

twashing 2017-12-10T20:06:07.000087Z

No, not at all. Thanks for responding as far as you have.

twashing 2017-12-10T20:06:31.000012Z

Let me have a look at onyx.log.