Looking through the Aleph manifold API, is there an easy way to merge 2 streams? I've primarily familiar with the Rx model of streams, I know manifold has different semantics. I'd like to do a merge-map
operation, like mapcat but instead of concating the streams, it merges all returned streams together, emitting events concurrenty. Tried to create the method myself, but struggling to get a working solution.
what are the semantics of the op you want @me1260? are you wanting to merge maps from each stream or are you wanting the contents of the streams unchanged but mixed?
Ended up implementing via:
(defn merge-streams
"Takes a stream of streams, and merges them into a single stream."
[s]
(let [out (s/stream)]
(s/consume #(s/consume (fn [value] (s/put! out value)) %) s)
(s/source-only out)))
Not sure how well this compares to concat
, but the semantics are to merge each stream unchanged into the destination stream. Seems like it would be a useful operator to have in the library.
The use case is a websocket server that recieves a subscribe message, then starts streaming updates about the resource to the client. Each time a new resource is subscribed via a websocket message, I merge in a new stream of updates about the resource.
@me1260 ah, ok, you aren't doing anything with the values on the stream - in similar circumstances I've connect
ed the new streams to the merged stream