core-async

respatialized 2020-11-04T15:10:04.192600Z

I'm not sure if this is the right place to ask, but is there a preferred lightweight solution for treating remote machines (e.g. those running separate JVM processes and accessible over a network) as core.async channels? What's the idiomatic way to send some long-running computation to another machine on the wire? Most of the stuff I've seen when looking up information (`onyx`, dragonmark, etc.) looks unmaintained.

2020-11-04T16:04:25.193300Z

the core.async semantics break with network fallibility, you don't get the same correctness guarantees or straightforward behaviors

2020-11-04T16:04:56.193800Z

I'd treat the network as an IO source/sink, not as a part of a larger async channel system

2020-11-04T16:05:16.194100Z

because it has failure modes core.async doesn't have

👍 4
2020-11-04T18:46:07.194800Z

;; clj -J-Xmx32m -Sdeps '{:deps {org.jgroups/jgroups {:mvn/version "4.0.12.Final"} org.clojure/core.async {:mvn/version "0.4.490"}}}'

(require '[clojure.core.async :as async]
         '[clojure.core.async.impl.protocols :as p])

(defn cluster-channel [cluster-name]
  (let [in (async/chan)
        out (async/chan)
        jchannel (org.jgroups.JChannel.)
        _ (.connect jchannel cluster-name)
        _ (.setReceiver
           jchannel
           (proxy [org.jgroups.ReceiverAdapter] []
             (receive [msg-batch]
               (doseq [msg (.array msg-batch)
                       :let [msg (.getObject msg)]]
                 (async/>!! in msg)))))]
    (async/go-loop []
      (let [msg (async/<! out)]
        (if (some? msg)
          (do
            (async/<!
             (async/thread
               (.send jchannel (org.jgroups.Message. nil msg))))
            (recur))
          (.close jchannel))))
    (reify
      p/ReadPort
      (take! [_ fn1-handler]
        (p/take! in fn1-handler))
      p/WritePort
      (put! [_ val fn1-handler]
        (p/put! out val fn1-handler))
      p/Channel
      (close! [_]
        (p/close! in)
        (p/close! out))
      (closed? [_]
        (or (p/closed? in)
            (p/closed? out))))))

😮 1
Jakub Holý 2020-11-04T19:10:16.197300Z

Hello! Given a number of channels, each with a single collection, how do I create a single channel with the individual items concatenated? It is certainly trivial but I cannot see it... I mean, are there functions that I can do that? Or do I need to make a low-level solution with go-loop ? I guess cat is not usable here?

ghadi 2020-11-04T19:14:01.197800Z

core.async/map "Takes a function and a collection of source channels, and returns a channel which contains the values produced by applying f to the set of first items taken from each source channel, followed by applying f to the set of second items from each channel, until any one of the channels is closed, at which point the output channel will be closed. The returned channel will be unbuffered by default, or a buf-or-n can be supplied"

ghadi 2020-11-04T19:14:21.198200Z

sounds simple, not a trivial impl though @holyjak

ghadi 2020-11-04T19:15:01.198700Z

(core.async/map vector chs)

ghadi 2020-11-04T19:15:28.199500Z

if I understand you correctly

Jakub Holý 2020-11-04T19:15:48.200300Z

Thank you! But this is not what I want, I have ch1 with [1 2 3] and ch2 with [4 5 6] and want a channel with 1 2 3 4 5 6.

ghadi 2020-11-04T19:15:58.200600Z

core.async/merge

Jakub Holý 2020-11-04T19:16:21.201Z

that will give me a single channel with [1 2 3] [4 5 6]

Jakub Holý 2020-11-04T19:16:51.201700Z

so it is a step in the right direction...

ghadi 2020-11-04T19:16:53.201800Z

cat transducer

ghadi 2020-11-04T19:17:05.202500Z

merge -> pipe -> ch with cat transducer

Jan K 2020-11-04T19:17:08.202600Z

(clojure.core.async/map concat chans) ? (ah this doesn't unwrap the collection)

Jakub Holý 2020-11-04T19:18:57.204Z

You mean (pipe ch-with-colls (chan 1 (cat *rf*))) ? But what should be the *rf* function?

ghadi 2020-11-04T19:19:25.204700Z

just cat

ghadi 2020-11-04T19:19:31.205Z

(chan 1 cat)

Jakub Holý 2020-11-04T19:19:39.205300Z

ah, ok, will try, thanks!!!

Jakub Holý 2020-11-04T19:20:12.206200Z

transducers are still a mystery to me even after having used them. And the docs are famously terse, for people that already know what they are doing 🙂

Jakub Holý 2020-11-04T19:21:22.207700Z

Thanks @jkr.sw but I am not sure what that would do. You would call (concat 1st-elm-of-1st-ch 1st-elm-of-2nd-ch ..) which would not work I think.

Jakub Holý 2020-11-05T15:14:07.208400Z

Lets take async out of the question. What you propose is similar to (map concat [[1 2 3] [4 5 6]]) You run (concat chN) for each channel, which does not make sense.

Jan K 2020-11-05T17:43:00.217600Z

I don't think this is equivalent to what happens with clojure.core.async/map. Try this:

(def c1 (doto (a/chan 10) (a/put! [1 2 3])))
(def c2 (doto (a/chan 10) (a/put! [4 5 6])))
(a/<!! (a/map concat [c1 c2]))
; => (1 2 3 4 5 6)

👍 1
Jan K 2020-11-04T19:22:49.208100Z

if I understood well that would be (concat [1 2 3] [4 5 6]) => (1 2 3 4 5 6)