I'm not sure if this is the right place to ask, but is there a preferred lightweight solution for treating remote machines (e.g. those running separate JVM processes and accessible over a network) as core.async
channels? What's the idiomatic way to send some long-running computation to another machine on the wire? Most of the stuff I've seen when looking up information (`onyx`, dragonmark
, etc.) looks unmaintained.
the core.async semantics break with network fallibility, you don't get the same correctness guarantees or straightforward behaviors
I'd treat the network as an IO source/sink, not as a part of a larger async channel system
because it has failure modes core.async doesn't have
;; clj -J-Xmx32m -Sdeps '{:deps {org.jgroups/jgroups {:mvn/version "4.0.12.Final"} org.clojure/core.async {:mvn/version "0.4.490"}}}'
(require '[clojure.core.async :as async]
'[clojure.core.async.impl.protocols :as p])
(defn cluster-channel [cluster-name]
(let [in (async/chan)
out (async/chan)
jchannel (org.jgroups.JChannel.)
_ (.connect jchannel cluster-name)
_ (.setReceiver
jchannel
(proxy [org.jgroups.ReceiverAdapter] []
(receive [msg-batch]
(doseq [msg (.array msg-batch)
:let [msg (.getObject msg)]]
(async/>!! in msg)))))]
(async/go-loop []
(let [msg (async/<! out)]
(if (some? msg)
(do
(async/<!
(async/thread
(.send jchannel (org.jgroups.Message. nil msg))))
(recur))
(.close jchannel))))
(reify
p/ReadPort
(take! [_ fn1-handler]
(p/take! in fn1-handler))
p/WritePort
(put! [_ val fn1-handler]
(p/put! out val fn1-handler))
p/Channel
(close! [_]
(p/close! in)
(p/close! out))
(closed? [_]
(or (p/closed? in)
(p/closed? out))))))
Hello! Given a number of channels, each with a single collection, how do I create a single channel with the individual items concatenated? It is certainly trivial but I cannot see it... I mean, are there functions that I can do that? Or do I need to make a low-level solution with go-loop
? I guess cat
is not usable here?
core.async/map
"Takes a function and a collection of source channels, and returns a
channel which contains the values produced by applying f to the set
of first items taken from each source channel, followed by applying
f to the set of second items from each channel, until any one of the
channels is closed, at which point the output channel will be
closed. The returned channel will be unbuffered by default, or a
buf-or-n can be supplied"
sounds simple, not a trivial impl though @holyjak
(core.async/map vector chs)
if I understand you correctly
Thank you! But this is not what I want, I have ch1 with [1 2 3] and ch2 with [4 5 6] and want a channel with 1 2 3 4 5 6.
core.async/merge
that will give me a single channel with [1 2 3] [4 5 6]
so it is a step in the right direction...
cat transducer
merge -> pipe -> ch with cat transducer
(clojure.core.async/map concat chans)
? (ah this doesn't unwrap the collection)
You mean (pipe ch-with-colls (chan 1 (cat *rf*)))
? But what should be the *rf*
function?
just cat
(chan 1 cat)
ah, ok, will try, thanks!!!
transducers are still a mystery to me even after having used them. And the docs are famously terse, for people that already know what they are doing 🙂
Thanks @jkr.sw but I am not sure what that would do. You would call (concat 1st-elm-of-1st-ch 1st-elm-of-2nd-ch ..)
which would not work I think.
Lets take async out of the question. What you propose is similar to (map concat [[1 2 3] [4 5 6]])
You run (concat chN)
for each channel, which does not make sense.
I don't think this is equivalent to what happens with clojure.core.async/map. Try this:
(def c1 (doto (a/chan 10) (a/put! [1 2 3])))
(def c2 (doto (a/chan 10) (a/put! [4 5 6])))
(a/<!! (a/map concat [c1 c2]))
; => (1 2 3 4 5 6)
if I understood well that would be (concat [1 2 3] [4 5 6]) => (1 2 3 4 5 6)