aleph

Sam DeSota 2020-05-03T13:49:45.004300Z

Looking through the Aleph manifold API, is there an easy way to merge 2 streams? I've primarily familiar with the Rx model of streams, I know manifold has different semantics. I'd like to do a merge-map operation, like mapcat but instead of concating the streams, it merges all returned streams together, emitting events concurrenty. Tried to create the method myself, but struggling to get a working solution.

mccraigmccraig 2020-05-03T15:17:05.007700Z

what are the semantics of the op you want @me1260? are you wanting to merge maps from each stream or are you wanting the contents of the streams unchanged but mixed?

Sam DeSota 2020-05-03T15:30:29.008100Z

Ended up implementing via:

(defn merge-streams
  "Takes a stream of streams, and merges them into a single stream."
  [s]
  (let [out (s/stream)]
    (s/consume #(s/consume (fn [value] (s/put! out value)) %) s)
    (s/source-only out)))

Sam DeSota 2020-05-03T15:31:39.009300Z

Not sure how well this compares to concat , but the semantics are to merge each stream unchanged into the destination stream. Seems like it would be a useful operator to have in the library.

Sam DeSota 2020-05-03T15:33:07.010600Z

The use case is a websocket server that recieves a subscribe message, then starts streaming updates about the resource to the client. Each time a new resource is subscribed via a websocket message, I merge in a new stream of updates about the resource.

mccraigmccraig 2020-05-03T20:14:42.014900Z

@me1260 ah, ok, you aren't doing anything with the values on the stream - in similar circumstances I've connected the new streams to the merged stream