Hello everyone, @flowthing suggested me to post my question over here, I posted it in the clojure channel too, here it goes:
I'm using manifold
and I can't wrap my head around splitting a stream: here's an example code:
(defn split-stream [condition stream]
(let [pos-stream (ms/stream)
neg-stream (ms/stream)]
(ms/consume (fn [e]
(if (condition e)
(ms/put! pos-stream e)
(ms/put! neg-stream e)))
stream)
[(ms/source-only pos-stream)
(ms/source-only neg-stream)]))
(deftest streamz
(let [rand-stream (ms/->source (repeatedly #(- (rand) 0.5)))]
(->> rand-stream
(split-stream pos?)
(apply ms/zip)
; ... Process pairs of [pos neg] ...
(ms/stream->seq)
(take 10)
(println))))
A simple example that should capture my problem: I have a fn that generates values and put it into a stream. I need to process that stream in pairs based on a condition, can I split a stream into two streams? How can I accomplish that?I tried something like
(defn split-stream [condition stream]
[(ms/filter condition stream)
(ms/filter (comp not condition) stream)])
but this seems to not work with a single stream. I guess I could have two separate input streams but that will mean to discard valid inputs(defn split-stream [condition stream]
(let [pos-stream (ms/stream)
neg-stream (ms/stream)]
(ms/connect-via stream
(fn [e]
(if (condition e)
(ms/put! pos-stream e)
(ms/put! neg-stream e)))
pos-stream)
(ms/connect-via stream
(fn [e]
(if (condition e)
(ms/put! pos-stream e)
(ms/put! neg-stream e)))
neg-stream)
[(ms/source-only pos-stream)
(ms/source-only neg-stream)]))
Doing something like this seems to result in an infinite waiting. I guess you can't connect two stream to a single stream?when I limit the repeatedly
call to 10 elements above, it seems to work
Bit late to the party, but consume isn’t what you want in the middle of a stream. It’s more a terminal thing.
going to 1000 seems to lock up my REPL
so this seems to be an issue trying to chunk the stream of values
Which split-stream
does work with the 10 elements limit for you?
the first one you pasted in the channel
also just return a vector with [(ms/filter ,,,) (ms/filter ,,,)]
the first one should work because I'm forcing the consume
but it doesn't seem like a good solution (since there's no end to the generator)
Maybe I should take a different approach and making a transducer to be used with transform
, I don't really need two streams, I need pairs.
I guess I just don't see where the issue is yet.
Don't worry, it's probably me, I'm very new to Manifold! Thing is that I would like to split a stream into two separate streams based on a condition. I would like to generate stuff once and put it in the right stream, at the same time, I would like to generate only when someone needs to take
downstream from the two child streams.
“I would like to generate only when someone needs to take downstream from the two child streams.” This is a bit antithetical to how Manifold streams work. Manifold streams (and most stream libs) are push-based. A value enters the top of the stream, and a chain of callbacks and/or continuations propagates it automatically. What you describe is closer to standard lazy seqs, or the reactive-streams specification (e.g., java.util.concurrent.Flow), which are pull-based.
(That being said, a seq source wrapped around an infinite seq like repeatedly will pull values on demand)
so the examples I've been working through, eventually my REPL locks up trying to run them
I guess the problem is that the producer that connects the streams together needs to buffer the values
(deftest streamz
(let [rand-stream-1 (ms/->source (repeatedly #(- (rand) 0.5)))
rand-stream-2 (ms/->source (repeatedly #(- (rand) 0.5)))]
(->> (ms/zip (ms/filter pos? rand-stream-1)
(ms/filter neg? rand-stream-2))
; ... Process pairs of [pos neg] ...
(ms/stream->seq)
(take 10)
(println))))
For example this works fine, but in my use case generating is an expensive operation and I would really prefer to not waste generated stuff using filter.(require '[manifold.deferred :as m]
'[manifold.stream :as ms])
(defn split-stream [condition stream]
[(ms/buffer 100 (ms/filter condition stream))
(ms/buffer 100 (ms/filter (comp not condition) stream))])
(comment
(let [rand-stream (->> (repeatedly 100 #(- (rand) 0.5))
(ms/->source))]
(->> rand-stream
(split-stream pos?)
(apply ms/zip)
;; ... Process pairs of [pos neg] ...
(ms/consume
(fn [[a b]]
(prn a b)))))
)
this appears to work for me
Thanks, I'll try it out. Just by looking at it, is it working because you produce 100 elements and buffers have the same size? Because I don't know how much elements I have to produce in advance, I would like to be something that comes downstream, hence the take
at the bottom.
yes, I think the problem is that the zip
will repeatedly take!
from each stream, and if you end up with a stream that doesn't alternative between the two conditions, you end up in a deadlock
So, for example, the issue will be when you generate batch-size
elements that are all positive, am I right?
or just something like:
(let [rand-stream (->> [1 2 3 -4 1 2 3 -4]
(ms/->source))]
,,,)
I see, but I have an infinite generator of numbers! 😅
I think I'll go with the two generator streams and filter that discards elements. If anyone have a better solution, please ping me!
yeah, the issue is not the total number of positive or negative but the fact that you might end up needing to put!
multiple positive or negative numbers in a row
in this example, without buffering, the producer blocks trying to put!
the 2
value on the filter pos?
stream until it's consumed, and the zip
consumer won't take!
from the filter pos?
stream until it can take!
a value from the filter not pos?
stream
Thank you @lilactown for your help, I got a better understanding of the issue now!
perhaps you could do the filtering before creating the stream? I don't really know what this looks like in your application
ex. this works:
(let [rand-seq (repeatedly #(- (rand) 0.5))
pos (ms/->source (filter pos? rand-seq))
neg (ms/->source (filter neg? rand-seq))]
(->> (ms/zip pos neg)
;; ... Process pairs of [pos neg] ...
(ms/stream->seq)
(take 10)
prn))
since you said that the generation is expensive, this wraps it in a single sequence but then creates two different producers on top of it
Ah! You might be right!
So instead of doing the filtering inside the stream I generate two lazy lazy seqs from the original sequence.
I think the key insight is to create two sources based on the original sequence, I don't think it matters whether you do the filtering as a seq or a stream
(let [rand-seq (repeatedly #(- (rand) 0.5))
pos (->> (ms/->source rand-seq)
(ms/filter pos?))
neg (->> (ms/->source rand-seq)
(ms/filter neg?))]
(->> (ms/zip pos neg)
;; ... Process pairs of [pos neg] ...
(ms/stream->seq)
(take 10)
prn))
Right right I see.. And this will not call rand-seq
twice?
I can't imagine how it would
that would break a lot of assumptions I have about lazy seqs
I'm sorry, I need some hammock time to think about it but it really seems the solution I was looking for. @lilactown thank you so much for the help!
you're welcome!
I learned a bunch as well 😄 tbh I only have a surface knowledge of manifold
interestingly this is related to a problem I've been thinking about in asynchronous programming. would it be okay if I took the example we've been talking about here and used it in a blog post?
@beltramo.ale You tried using transform
with (juxt filter remove)
?
(juxt filter remove)
solves your problem for sequences
You sure can! There's nothing confidential in the example I made.
I tried and failed, but this was in the middle of trying out @lilactown solutions so I might have missed something..
I saw that usage of juxt
when I was looking at a transducer to be used and I stumble upon the separate
fn.