core-async

2020-01-06T05:49:40.170900Z

Say I have a go block, and I need to mapcat a collection inside that go block, but for every element inside the collection I also use a go block? So what I did is:

(go (into [] (mapcat (fn [e] (go (something e)))) [1 2 3])

2020-01-06T05:50:47.172100Z

But now I don't want my outer go block to return a vector of channels, so I'd like to take the value out of them, but I would need another map or for or anything which would have a nested fn with a go block doing a take again in it, and then I'd have the same issue

2020-01-06T06:11:42.173900Z

Hum, my solution for now is to do this:

(go
  (loop [acc [] coll (into []
                               (map
                                (fn [c]
                                  (go
                                    (<! (somefn c))))
                               [1 2 3)]
        (if-let [ch (first coll)]
          (recur (conj acc (<! ch)) (next coll))
          (into [] cat acc)))))

2020-01-06T06:26:40.174600Z

You can just map somefn

2020-01-06T06:27:16.175700Z

And there is some combinator in core.async for this, I just don't recall what it is

2020-01-06T06:27:38.176400Z

Takes a collection of channels and returns a channel of a collection

2020-01-06T19:22:34.176700Z

I'm guessing that's merge

2020-01-06T19:41:08.176900Z

into in fact

2020-01-06T19:41:54.177700Z

(async/<! (async/into [] (map some-channel-returning-fn [1 2 3 4])))

2020-01-06T19:42:14.178100Z

but you might want to look at replacing that with one of the pipeline variations

2020-01-06T19:57:08.179100Z

Oh... interesting.

2020-01-06T19:58:11.179400Z

Too bad that async/into doesn't take a transducer

2020-01-06T20:10:42.179600Z

Hum, I don't think that works

2020-01-06T20:11:07.180200Z

Because the second arg of async/into is supposed to be a channel, not an ISeq of channels

2020-01-06T20:15:26.180500Z

it would work with async/merge in place of map though

2020-01-06T20:16:37.181100Z

err, merge wrapping map, sorry

2020-01-06T20:16:55.181500Z

Also, why does this not work? (doto (async/chan [10]) async/close!)

2020-01-06T20:17:44.182Z

is [10] really a valid buff-or-n ?

2020-01-06T20:18:00.182400Z

So it does seem you can, async/merge and then async/into and then async/<!. At that point, I might prefer my loop variant 😛

2020-01-06T20:18:20.182700Z

Oh, right, I was being dumb, and treating chan like atom

2020-01-06T20:19:30.183Z

This is what I wanted: (doto (async/chan) (async/put! [10]) (async/close!))

2020-01-06T20:20:40.183500Z

That's a lot of contortion just to mapcat with a channel-returning-fn :S

2020-01-06T20:25:23.183800Z

You don't need merge

2020-01-06T20:27:01.186100Z

I don't know what you are doing now, but if you wrap in apply concat, my async/nto line completely replaces your earlier loop+into

2020-01-06T20:28:52.186700Z

(apply concat
             (async/into []
                         (map (constantly (async/go [1 2])) [1 2 3])))
Throws java.lang.IllegalArgumentException: No implementation of method: :take! of protocol: #'clojure.core.async.impl.protocols/ReadPort found for class: clojure.lang.LazySeq

2020-01-06T20:29:02.187200Z

Because async/into takes a channel, not a collection

2020-01-06T20:31:19.187700Z

Fundamentally I'm trying to do:

(async/&lt;!!
       (async/go
         (mapcat (constantly (async/go [1 2])) [1 2 3])))

2020-01-06T20:31:57.188300Z

So mapcat inside a go block over a non channel coll where the mapping fn is a channel returning one

2020-01-06T20:32:12.188700Z

Merge is not ordered