Say I have a go block, and I need to mapcat a collection inside that go block, but for every element inside the collection I also use a go block? So what I did is:
(go (into [] (mapcat (fn [e] (go (something e)))) [1 2 3])
But now I don't want my outer go block to return a vector of channels, so I'd like to take the value out of them, but I would need another map or for or anything which would have a nested fn with a go block doing a take again in it, and then I'd have the same issue
Hum, my solution for now is to do this:
(go
(loop [acc [] coll (into []
(map
(fn [c]
(go
(<! (somefn c))))
[1 2 3)]
(if-let [ch (first coll)]
(recur (conj acc (<! ch)) (next coll))
(into [] cat acc)))))
You can just map somefn
And there is some combinator in core.async for this, I just don't recall what it is
Takes a collection of channels and returns a channel of a collection
I'm guessing that's merge
into in fact
(async/<! (async/into [] (map some-channel-returning-fn [1 2 3 4])))
but you might want to look at replacing that with one of the pipeline variations
Oh... interesting.
Too bad that async/into doesn't take a transducer
Hum, I don't think that works
Because the second arg of async/into is supposed to be a channel, not an ISeq of channels
it would work with async/merge in place of map though
err, merge wrapping map, sorry
Also, why does this not work? (doto (async/chan [10]) async/close!)
is [10] really a valid buff-or-n ?
So it does seem you can, async/merge and then async/into and then async/<!. At that point, I might prefer my loop
variant 😛
Oh, right, I was being dumb, and treating chan like atom
This is what I wanted: (doto (async/chan) (async/put! [10]) (async/close!))
That's a lot of contortion just to mapcat with a channel-returning-fn :S
You don't need merge
I don't know what you are doing now, but if you wrap in apply concat, my async/nto line completely replaces your earlier loop+into
(apply concat
(async/into []
(map (constantly (async/go [1 2])) [1 2 3])))
Throws java.lang.IllegalArgumentException: No implementation of method: :take! of protocol: #'clojure.core.async.impl.protocols/ReadPort found for class: clojure.lang.LazySeqBecause async/into takes a channel, not a collection
Fundamentally I'm trying to do:
(async/<!!
(async/go
(mapcat (constantly (async/go [1 2])) [1 2 3])))
So mapcat inside a go block over a non channel coll where the mapping fn is a channel returning one
Merge is not ordered