I need to use a chan that expects n
values put on it then closes. n
will usually be small, less than 10, but in certain scenarios will be a few thousand messages that come in a few milliseconds, which I've found is impossible to for any consumers to keep up with before hitting the pending put!
limit.
What's people's opinion on the best way to address this?
Just use a fixed buffer of a number large enough to definitely account for all cases of n
?
From what I gather from quickly skimming the async source, a fixed buffer doesn't actually allocate anything, so having a very liberal fixed buffer for chan of say 1million even when most of the time it will only buffer one or two items tops wouldn't actually be a big inefficiency right?
Alternatively I could implement an unlimited buffer, but I'm thinking that would probably be worse
or a dropping-buffer, or a sliding-buffer
but if you know ahead of time a fixed number of messages will be put on a channel, it is often best to create the channel with a buffer that size
a very common case of this is, if you know you will only ever put one item to a channel, create the channel with buffer size 1, you'll see this used in a number of places in the core.async source
But if you are hitting the limit on puts, that is usually an indication of a bad pattern
i can't lose data so sliding/dropping buffers are out
and sometimes i can know the exact number of expected messages but there will be times when I can guestimate at best
are you using put! ?
yes
if you are on clojure, switching to >!! will likely superficially solve your problem
Actually I belive I tried that but still hit the pending puts error
because it's like 10k messages coming in basically "simultaneously"
coming in from where?
external api that pushes data to me
ok, so you need to communicate backpressure to that
but what if there's no mechanism for it to "slow down"
then you need an infinite buffer
which core.async purposefully doesn't give you
I think I can get fancy, and when possible choose an exact buffer size needed, or in the case of the large burst messages, choose the minimal buffer size that allows the messages to be kept up with. But yeah I was just wondering if there's a reason to implement the infinite buffer instead of just choosing a fixed buffer size that's effectively infinite?
there almost certainly is some kind of backpressure mechanism in whatever way the api is pushing data to you
tcp has one built in
Are you saying the infinite buffer is preferable because the fact that a fixed buffer doesn't currently allocate anything proportional to its buffer size is just an implementation detail and may change in the future?
but using put! without a callback, or something else, can break the chain of backpressure
the buffer you're hitting is not the channel buffer though and is not configurable
if I understand correctly
yeah, I get that there's no way around that, I need to either implement a unlimited buffer or chose an effectively infinite fixed buffer size
just wondering which of those is better
it's for a wrapper of a java stock broker api, most of the request types return a single message, but you can request historical data which can return like 10k messages, and unfortunately they all come in one burst and the api provides no mechanism of control for this
core.async isn't a perfect fit for this use case I know but currently I'm using my own hand rolled combo of atoms and promises and I'm starting to thing bending core.async to my will would be better than managing all the concurrency myself
but somewhere you have a single bit of code that is taking those messages and putting them into a channel
ah, true
and you are losing the backpress right there, you should not move on to put in the next thing until the current thing has been accepted
ah! this makes sense. so maybe doing something like put!
ing the next item in the callback of the first put!
yes, or use >!! which will block the real java thread until the input is accepted
ah ok cool. I was pretty sure that I tried both >!!
and put!
and was getting the pending puts exceeded error message either way, but I don't have the example immediately in front of me
using >!! will block that single thread, so if you have a single loop taking messages and putting them in to a channel that will work, if you have multiple concurrent places that is happening you can still run into the issue with >!!
gotcha, thanks for all the help
for simple cases, using >!! will often solve backpressure isssues, for more complicated cases put! with a callback, and of course a lot of example docs show using put! without a callback which is asking for a backpress issue
> which is asking for a backpress issue
Do you mean put!
without a callback is asking for issues caused by not exerting backpressure? i.e. you’re at the mercy of the system incidentally rather than explicitly. i.e. enforcing backpressure through a lack of resources, such as through the 1024 limit, memory leaks due to excessive spawned go
blocks etc?
gotcha yeah that's good to know
I don't think I've ever used the put! callback before
offer! also allows to check if put was successful or not, in a non blocking way
You re hitting the 1024 pending put limit per channel. As others mentioned you need to be able to signal backpressure upstream or use something like credit-based flow control or rate limiting.