core-async

erwinrooijakkers 2019-12-19T10:13:26.000700Z

What is the best way to read from a channel till a certain condition is met (e.g., 2000 elements taken)?

bertofer 2019-12-19T10:16:32.001200Z

You can apply a transducer to that channel, in this case (take 2000)

fmjrey 2019-12-19T11:49:29.002Z

@erwinrooijakkers before its gone have a read of this post https://clojurians.slack.com/archives/C05423W6H/p1576240660002300

fmjrey 2019-12-19T11:50:08.003Z

also curious to know if there is a definite answer yet

erwinrooijakkers 2019-12-19T12:46:50.003200Z

Thanks I saw that one yes

erwinrooijakkers 2019-12-19T12:47:21.004Z

But have had no time to test yet

erwinrooijakkers 2019-12-19T12:47:30.004300Z

I really want 2000 messages and only then continue

erwinrooijakkers 2019-12-19T12:48:02.004900Z

transducer that does (take 2000)

erwinrooijakkers 2019-12-19T12:48:15.005500Z

Would be perfect

erwinrooijakkers 2019-12-19T12:48:23.005800Z

I’ll check this evening 🙂

erwinrooijakkers 2019-12-19T12:48:25.006Z

Thanks!

2019-12-19T13:13:41.006200Z

The pipe go-loop is taking from one channel and puts on the other until either channel is closed. The first put of 0 closes the second channel, but it is technically successful (the put succeeded with no items), so pipe iterates one more time, takes 1 and then encounters the closed channel and quits, but the 1 is already out of the source channel.

fmjrey 2019-12-19T14:39:02.006500Z

Great explanation, thanks! When I tried this variant I got non deterministic results: https://clojurians.slack.com/archives/C05423W6H/p1576247793020000?thread_ts=1576243305.010900&cid=C05423W6H

fmjrey 2019-12-19T14:40:00.006800Z

So I suppose there are some concurrency aspects to it as well.

2019-12-19T16:51:41.007Z

It’s a race between your take and the internal pipe take. One of you will get the 1 and become a funk king.

fmjrey 2019-12-19T17:03:30.007200Z

Makes sense, which means that race condition is also present in the original example given by @roklenarcic. Adding a few log/locking/println statements in my version did the trick to reveal the race condition.

mpenet 2019-12-19T17:32:30.007700Z

playing with core.async:

(def c (async/chan (async/buffer 0) x))
(future (async/>!! c 1))
(async/<!! c) ;; returns  1

mpenet 2019-12-19T17:33:25.008500Z

I know you're supposed to have an actual non empty buffer for xform to work, but I was suprised we didnt' get an early throw in that case (at channel creation time)

mpenet 2019-12-19T17:34:00.008700Z

x is (map inc)

alexmiller 2019-12-19T17:35:28.009Z

(async/chan 0) will throw

alexmiller 2019-12-19T17:36:01.009700Z

prob just an oversight that (async/buffer 0) doesn't (far more common to create fixed buffers directly in chan)

alexmiller 2019-12-19T17:38:15.011100Z

yeah, that's just badly placed assertion, will fix

mpenet 2019-12-19T17:38:24.011300Z

I mean it sort of work as an unbuffered channel with (buffer 0)

mpenet 2019-12-19T17:38:28.011500Z

thanks

alexmiller 2019-12-19T17:38:48.011800Z

unbuffered != buffer 0

mpenet 2019-12-19T17:39:04.012100Z

yes, hence the "sort of"

mpenet 2019-12-19T17:39:24.012600Z

implementation for unbuffered channel is probably quite smart

mpenet 2019-12-19T17:41:11.013400Z

:thumbsup:

roklenarcic 2019-12-19T19:58:27.013600Z

Of course there’s a race condition there, that is intentional, after creating pipe I read from source of that pipe directly. The point of that snippet wasn’t to show working code, it was to show an additional item was read from the source channel