With core.async whats the recommended way to raise an error when a channel/buffer is full / blocked?
I’d like to exert back pressure up stream when my chan’s buffer is full by issuing a HTTP 503
ahh I can test full?
on a buffer
though that doesn’t seem to solve it as another thread may put on the channel between when I check and add to the chan.
I really need to raise an error on put
rather than block.
ok offer!
looks like what I want
put!
itself will eventually start throwing exceptions when the internal channel buffer fills up (1024 puts), that's in addition to the explicit channel buffer you can provide
One thing I’d like to do is dedupe duplicate requests on a core.async channel. I initially wrote (chan 10 (distinct))
, which works but will eventually OOM the server as the volatile inside the distinct
transducer isn’t bounded.
I was thinking I could do this by creating my own DistinctBuffer
deftype
that implements the Buffer
protocol.
Does that sound like a good idea?
I there are broadly 3 places you could implement that: 1. a custom buffer 2. a custom transducer 3. a custom version of async/pipe (copies between 2 channels)
the last is maybe the quickest to prototype and try out
I did think about 2 and 3 also, but didn’t mention the other requirement; which was to enforce a fixed buffer size on the set.
so I think a custom buffer might be best
ah, you want to use the buffer as the set
yeah — essentially use a set as the buffer, with a max size
ok this should be pretty easy actually
but need to go home now 😞 so it’s gonna have to wait
did you try the dedupe
transducer?
that will remove consecutive dupes (but not overall dupes)
yeah I did but I’d like to remove any dupes in the buffer
I wonder if some lib has implemented cache-chan
yet
with a clojure.core.cache cache, and a keying function, you could get TTL / FIFO / LIFO etc.
this lib looks like it tries that idea, can't vouch for how well it does so https://github.com/benashford/memoasync
Would it be simpler to make that part of a stateful transducer instead?
I think that's a better approach than the above lib, yeah
what does a "cache-chan" do? memoization works for functions becuase they map an input to an output, but a channel is input in and input out, so it is not clear to me what memoizing that means
that's why I suggested a keying function (I guess one would want a transform function as well)
I guess it's a half baked idea really
promise-chan is a one-slot cache :)
for the original question, not passing along dups, the message itself would be the cache key
and then the cache rules would constrain the size of that dedupe cache
and promise-chan works great when memoizing
the problem with wanting to filter all dupes is that you need arbitrary sized memory to remember what you've seen
the memoasync project likely predates promise-chan
not all dupes ever, just all dupes in a bounded buffer
yeah it predates transducers too