aleph

ABeltramo 2021-03-11T15:31:15.033600Z

Hello everyone, @flowthing suggested me to post my question over here, I posted it in the clojure channel too, here it goes:

ABeltramo 2021-03-11T15:32:44.034600Z

I'm using manifold and I can't wrap my head around splitting a stream: here's an example code:

(defn split-stream [condition stream]
  (let [pos-stream (ms/stream)
        neg-stream (ms/stream)]
    (ms/consume (fn [e]
                  (if (condition e)
                    (ms/put! pos-stream e)
                    (ms/put! neg-stream e)))
                stream)

    [(ms/source-only pos-stream)
     (ms/source-only neg-stream)]))

(deftest streamz
  (let [rand-stream (ms/->source (repeatedly #(- (rand) 0.5)))]
    (->> rand-stream
         (split-stream pos?)
         (apply ms/zip)
         ; ... Process pairs of [pos neg] ...
         (ms/stream->seq)
         (take 10)
         (println))))
A simple example that should capture my problem: I have a fn that generates values and put it into a stream. I need to process that stream in pairs based on a condition, can I split a stream into two streams? How can I accomplish that?

ABeltramo 2021-03-11T15:36:08.035500Z

I tried something like

(defn split-stream [condition stream]
  [(ms/filter condition stream)
   (ms/filter (comp not condition) stream)])
but this seems to not work with a single stream. I guess I could have two separate input streams but that will mean to discard valid inputs

ABeltramo 2021-03-11T15:44:07.036600Z

(defn split-stream [condition stream]
  (let [pos-stream (ms/stream)
        neg-stream (ms/stream)]
    (ms/connect-via stream
                    (fn [e]
                      (if (condition e)
                        (ms/put! pos-stream e)
                        (ms/put! neg-stream e)))
                    pos-stream)

    (ms/connect-via stream
                    (fn [e]
                      (if (condition e)
                        (ms/put! pos-stream e)
                        (ms/put! neg-stream e)))
                    neg-stream)

    [(ms/source-only pos-stream)
     (ms/source-only neg-stream)]))
Doing something like this seems to result in an infinite waiting. I guess you can't connect two stream to a single stream?

lilactown 2021-03-11T15:45:46.037200Z

when I limit the repeatedly call to 10 elements above, it seems to work

kingmob 2021-03-12T22:07:43.073Z

Bit late to the party, but consume isn’t what you want in the middle of a stream. It’s more a terminal thing.

lilactown 2021-03-11T15:47:57.037900Z

going to 1000 seems to lock up my REPL

lilactown 2021-03-11T15:48:02.038100Z

so this seems to be an issue trying to chunk the stream of values

ABeltramo 2021-03-11T15:49:10.038200Z

Which split-stream does work with the 10 elements limit for you?

lilactown 2021-03-11T15:49:44.038400Z

the first one you pasted in the channel

lilactown 2021-03-11T15:49:58.038600Z

also just return a vector with [(ms/filter ,,,) (ms/filter ,,,)]

ABeltramo 2021-03-11T15:50:45.038800Z

the first one should work because I'm forcing the consume but it doesn't seem like a good solution (since there's no end to the generator)

ABeltramo 2021-03-11T15:55:19.039700Z

Maybe I should take a different approach and making a transducer to be used with transform, I don't really need two streams, I need pairs.

lilactown 2021-03-11T15:56:50.040500Z

I guess I just don't see where the issue is yet.

ABeltramo 2021-03-11T16:00:32.043800Z

Don't worry, it's probably me, I'm very new to Manifold! Thing is that I would like to split a stream into two separate streams based on a condition. I would like to generate stuff once and put it in the right stream, at the same time, I would like to generate only when someone needs to take downstream from the two child streams.

kingmob 2021-03-12T22:21:03.073200Z

“I would like to generate only when someone needs to take  downstream from the two child streams.” This is a bit antithetical to how Manifold streams work. Manifold streams (and most stream libs) are push-based. A value enters the top of the stream, and a chain of callbacks and/or continuations propagates it automatically. What you describe is closer to standard lazy seqs, or the reactive-streams specification (e.g., java.util.concurrent.Flow), which are pull-based.

kingmob 2021-03-12T22:24:39.073400Z

(That being said, a seq source wrapped around an infinite seq like repeatedly will pull values on demand)

lilactown 2021-03-11T16:02:48.045400Z

so the examples I've been working through, eventually my REPL locks up trying to run them

lilactown 2021-03-11T16:02:55.045700Z

I guess the problem is that the producer that connects the streams together needs to buffer the values

ABeltramo 2021-03-11T16:03:04.046100Z

(deftest streamz
  (let [rand-stream-1 (ms/->source (repeatedly  #(- (rand) 0.5)))
        rand-stream-2 (ms/->source (repeatedly  #(- (rand) 0.5)))]
    (->> (ms/zip (ms/filter pos? rand-stream-1)
                 (ms/filter neg? rand-stream-2))
         ; ... Process pairs of [pos neg] ...
         (ms/stream->seq)
         (take 10)
         (println))))
For example this works fine, but in my use case generating is an expensive operation and I would really prefer to not waste generated stuff using filter.

lilactown 2021-03-11T16:05:14.046600Z

(require '[manifold.deferred :as m]
         '[manifold.stream :as ms])

(defn split-stream [condition stream]
  [(ms/buffer 100 (ms/filter condition stream))
   (ms/buffer 100 (ms/filter (comp not condition) stream))])

(comment
  (let [rand-stream (->> (repeatedly 100 #(- (rand) 0.5))
                         (ms/->source))]
    (->> rand-stream
         (split-stream pos?)
         (apply ms/zip)
         ;; ... Process pairs of [pos neg] ...
         (ms/consume
          (fn [[a b]]
            (prn a b)))))
  )

lilactown 2021-03-11T16:05:24.046800Z

this appears to work for me

ABeltramo 2021-03-11T16:06:40.048600Z

Thanks, I'll try it out. Just by looking at it, is it working because you produce 100 elements and buffers have the same size? Because I don't know how much elements I have to produce in advance, I would like to be something that comes downstream, hence the take at the bottom.

lilactown 2021-03-11T16:12:04.050Z

yes, I think the problem is that the zip will repeatedly take! from each stream, and if you end up with a stream that doesn't alternative between the two conditions, you end up in a deadlock

ABeltramo 2021-03-11T16:14:01.052Z

So, for example, the issue will be when you generate batch-size elements that are all positive, am I right?

lilactown 2021-03-11T16:15:26.052400Z

or just something like:

(let [rand-stream (->> [1 2 3 -4 1 2 3 -4]
                         (ms/->source))]
   ,,,)

ABeltramo 2021-03-11T16:16:22.053Z

I see, but I have an infinite generator of numbers! 😅

ABeltramo 2021-03-11T16:17:16.054400Z

I think I'll go with the two generator streams and filter that discards elements. If anyone have a better solution, please ping me!

lilactown 2021-03-11T16:17:31.054700Z

yeah, the issue is not the total number of positive or negative but the fact that you might end up needing to put! multiple positive or negative numbers in a row

lilactown 2021-03-11T16:19:22.056600Z

in this example, without buffering, the producer blocks trying to put! the 2 value on the filter pos? stream until it's consumed, and the zip consumer won't take! from the filter pos? stream until it can take! a value from the filter not pos? stream

👆 1
ABeltramo 2021-03-11T16:20:20.058200Z

Thank you @lilactown for your help, I got a better understanding of the issue now!

lilactown 2021-03-11T16:23:46.059100Z

perhaps you could do the filtering before creating the stream? I don't really know what this looks like in your application

lilactown 2021-03-11T16:26:27.061300Z

ex. this works:

(let [rand-seq (repeatedly #(- (rand) 0.5))
        pos (ms/->source (filter pos? rand-seq))
        neg (ms/->source (filter neg? rand-seq))]
    (->> (ms/zip pos neg)
         ;; ... Process pairs of [pos neg] ...
         (ms/stream->seq)
         (take 10)
         prn))

lilactown 2021-03-11T16:27:34.062400Z

since you said that the generation is expensive, this wraps it in a single sequence but then creates two different producers on top of it

ABeltramo 2021-03-11T16:28:07.062900Z

Ah! You might be right!

ABeltramo 2021-03-11T16:29:22.063700Z

So instead of doing the filtering inside the stream I generate two lazy lazy seqs from the original sequence.

lilactown 2021-03-11T16:30:16.065Z

I think the key insight is to create two sources based on the original sequence, I don't think it matters whether you do the filtering as a seq or a stream

lilactown 2021-03-11T16:30:22.065300Z

(let [rand-seq (repeatedly #(- (rand) 0.5))
        pos (->> (ms/->source rand-seq)
                 (ms/filter pos?))
        neg (->> (ms/->source rand-seq)
                 (ms/filter neg?))]
    (->> (ms/zip pos neg)
         ;; ... Process pairs of [pos neg] ...
         (ms/stream->seq)
         (take 10)
         prn))

ABeltramo 2021-03-11T16:31:22.066100Z

Right right I see.. And this will not call rand-seq twice?

lilactown 2021-03-11T16:34:29.066900Z

I can't imagine how it would

🎉 1
lilactown 2021-03-11T16:36:14.067700Z

that would break a lot of assumptions I have about lazy seqs

ABeltramo 2021-03-11T17:02:44.068400Z

I'm sorry, I need some hammock time to think about it but it really seems the solution I was looking for. @lilactown thank you so much for the help!

1
lilactown 2021-03-11T17:16:47.068700Z

you're welcome!

lilactown 2021-03-11T17:17:33.069700Z

I learned a bunch as well 😄 tbh I only have a surface knowledge of manifold

lilactown 2021-03-11T17:18:37.070800Z

interestingly this is related to a problem I've been thinking about in asynchronous programming. would it be okay if I took the example we've been talking about here and used it in a blog post?

pablore 2021-03-11T17:19:21.071700Z

@beltramo.ale You tried using transform with (juxt filter remove)?

pablore 2021-03-11T17:19:48.072100Z

(juxt filter remove) solves your problem for sequences

ABeltramo 2021-03-11T17:35:28.072200Z

You sure can! There's nothing confidential in the example I made.

ABeltramo 2021-03-11T17:36:29.072400Z

I tried and failed, but this was in the middle of trying out @lilactown solutions so I might have missed something..

ABeltramo 2021-03-11T17:38:14.072600Z

I saw that usage of juxt when I was looking at a transducer to be used and I stumble upon the separate fn.