What is the best way to read from a channel till a certain condition is met (e.g., 2000 elements taken)?
You can apply a transducer to that channel, in this case (take 2000)
@erwinrooijakkers before its gone have a read of this post https://clojurians.slack.com/archives/C05423W6H/p1576240660002300
also curious to know if there is a definite answer yet
Thanks I saw that one yes
But have had no time to test yet
I really want 2000 messages and only then continue
transducer that does (take 2000)
Would be perfect
I’ll check this evening 🙂
Thanks!
The pipe go-loop is taking from one channel and puts on the other until either channel is closed. The first put of 0
closes the second channel, but it is technically successful (the put succeeded with no items), so pipe iterates one more time, takes 1
and then encounters the closed channel and quits, but the 1
is already out of the source channel.
Great explanation, thanks! When I tried this variant I got non deterministic results: https://clojurians.slack.com/archives/C05423W6H/p1576247793020000?thread_ts=1576243305.010900&cid=C05423W6H
So I suppose there are some concurrency aspects to it as well.
It’s a race between your take and the internal pipe take. One of you will get the 1
and become a funk king.
Makes sense, which means that race condition is also present in the original example given by @roklenarcic. Adding a few log/locking/println statements in my version did the trick to reveal the race condition.
playing with core.async:
(def c (async/chan (async/buffer 0) x))
(future (async/>!! c 1))
(async/<!! c) ;; returns 1
I know you're supposed to have an actual non empty buffer for xform to work, but I was suprised we didnt' get an early throw in that case (at channel creation time)
x is (map inc)
(async/chan 0) will throw
prob just an oversight that (async/buffer 0) doesn't (far more common to create fixed buffers directly in chan)
yeah, that's just badly placed assertion, will fix
I mean it sort of work as an unbuffered channel with (buffer 0)
thanks
unbuffered != buffer 0
yes, hence the "sort of"
implementation for unbuffered channel is probably quite smart
moved https://github.com/clojure/core.async/commit/39488d33b4933ef3eb4c4aac485ad21ac3c78825
:thumbsup:
Of course there’s a race condition there, that is intentional, after creating pipe I read from source of that pipe directly. The point of that snippet wasn’t to show working code, it was to show an additional item was read from the source channel