core-async

2019-08-28T19:24:50.004600Z

I need to use a chan that expects n values put on it then closes. n will usually be small, less than 10, but in certain scenarios will be a few thousand messages that come in a few milliseconds, which I've found is impossible to for any consumers to keep up with before hitting the pending put! limit. What's people's opinion on the best way to address this? Just use a fixed buffer of a number large enough to definitely account for all cases of n? From what I gather from quickly skimming the async source, a fixed buffer doesn't actually allocate anything, so having a very liberal fixed buffer for chan of say 1million even when most of the time it will only buffer one or two items tops wouldn't actually be a big inefficiency right?

2019-08-28T19:25:41.005200Z

Alternatively I could implement an unlimited buffer, but I'm thinking that would probably be worse

2019-08-28T19:31:50.005600Z

or a dropping-buffer, or a sliding-buffer

2019-08-28T19:32:36.006200Z

but if you know ahead of time a fixed number of messages will be put on a channel, it is often best to create the channel with a buffer that size

2019-08-28T19:33:42.007300Z

a very common case of this is, if you know you will only ever put one item to a channel, create the channel with buffer size 1, you'll see this used in a number of places in the core.async source

2019-08-28T19:38:43.008500Z

But if you are hitting the limit on puts, that is usually an indication of a bad pattern

2019-08-28T19:39:14.009Z

i can't lose data so sliding/dropping buffers are out

2019-08-28T19:39:49.009700Z

and sometimes i can know the exact number of expected messages but there will be times when I can guestimate at best

2019-08-28T19:40:16.009900Z

are you using put! ?

2019-08-28T19:40:25.010Z

yes

2019-08-28T19:40:56.010900Z

if you are on clojure, switching to >!! will likely superficially solve your problem

2019-08-28T19:41:18.011600Z

Actually I belive I tried that but still hit the pending puts error

2019-08-28T19:41:47.012400Z

because it's like 10k messages coming in basically "simultaneously"

2019-08-28T19:42:03.012800Z

coming in from where?

2019-08-28T19:42:20.013200Z

external api that pushes data to me

2019-08-28T19:42:34.013600Z

ok, so you need to communicate backpressure to that

2019-08-28T19:42:51.014Z

but what if there's no mechanism for it to "slow down"

2019-08-28T19:43:33.015200Z

then you need an infinite buffer

2019-08-28T19:43:42.015500Z

which core.async purposefully doesn't give you

2019-08-28T19:44:38.016800Z

I think I can get fancy, and when possible choose an exact buffer size needed, or in the case of the large burst messages, choose the minimal buffer size that allows the messages to be kept up with. But yeah I was just wondering if there's a reason to implement the infinite buffer instead of just choosing a fixed buffer size that's effectively infinite?

2019-08-28T19:45:21.018200Z

there almost certainly is some kind of backpressure mechanism in whatever way the api is pushing data to you

2019-08-28T19:45:32.018600Z

tcp has one built in

2019-08-28T19:46:08.019700Z

Are you saying the infinite buffer is preferable because the fact that a fixed buffer doesn't currently allocate anything proportional to its buffer size is just an implementation detail and may change in the future?

2019-08-28T19:46:13.020Z

but using put! without a callback, or something else, can break the chain of backpressure

alexmiller 2019-08-28T19:46:17.020200Z

the buffer you're hitting is not the channel buffer though and is not configurable

alexmiller 2019-08-28T19:47:09.021200Z

if I understand correctly

2019-08-28T19:47:16.021500Z

yeah, I get that there's no way around that, I need to either implement a unlimited buffer or chose an effectively infinite fixed buffer size

2019-08-28T19:47:23.021700Z

just wondering which of those is better

2019-08-28T19:48:29.023400Z

it's for a wrapper of a java stock broker api, most of the request types return a single message, but you can request historical data which can return like 10k messages, and unfortunately they all come in one burst and the api provides no mechanism of control for this

2019-08-28T19:49:40.025Z

core.async isn't a perfect fit for this use case I know but currently I'm using my own hand rolled combo of atoms and promises and I'm starting to thing bending core.async to my will would be better than managing all the concurrency myself

2019-08-28T19:49:52.025300Z

but somewhere you have a single bit of code that is taking those messages and putting them into a channel

2019-08-28T19:50:06.025700Z

ah, true

2019-08-28T19:50:24.026100Z

and you are losing the backpress right there, you should not move on to put in the next thing until the current thing has been accepted

2019-08-28T19:52:08.027200Z

ah! this makes sense. so maybe doing something like put!ing the next item in the callback of the first put!

2019-08-28T19:53:12.027800Z

yes, or use >!! which will block the real java thread until the input is accepted

2019-08-28T19:54:34.029200Z

ah ok cool. I was pretty sure that I tried both >!! and put! and was getting the pending puts exceeded error message either way, but I don't have the example immediately in front of me

2019-08-28T19:56:29.030500Z

using >!! will block that single thread, so if you have a single loop taking messages and putting them in to a channel that will work, if you have multiple concurrent places that is happening you can still run into the issue with >!!

2019-08-28T19:57:20.031Z

gotcha, thanks for all the help

2019-08-28T19:58:14.031800Z

for simple cases, using >!! will often solve backpressure isssues, for more complicated cases put! with a callback, and of course a lot of example docs show using put! without a callback which is asking for a backpress issue

2019-08-29T08:41:14.041300Z

> which is asking for a backpress issue Do you mean put! without a callback is asking for issues caused by not exerting backpressure? i.e. you’re at the mercy of the system incidentally rather than explicitly. i.e. enforcing backpressure through a lack of resources, such as through the 1024 limit, memory leaks due to excessive spawned go blocks etc?

2019-08-28T19:58:44.032Z

gotcha yeah that's good to know

2019-08-28T19:59:19.032600Z

I don't think I've ever used the put! callback before

mpenet 2019-08-28T20:15:41.034Z

offer! also allows to check if put was successful or not, in a non blocking way

mpenet 2019-08-28T20:20:02.037400Z

You re hitting the 1024 pending put limit per channel. As others mentioned you need to be able to signal backpressure upstream or use something like credit-based flow control or rate limiting.