core-async

2019-07-30T10:28:54.031300Z

With core.async whats the recommended way to raise an error when a channel/buffer is full / blocked?

2019-07-30T10:30:19.032200Z

I’d like to exert back pressure up stream when my chan’s buffer is full by issuing a HTTP 503

2019-07-30T10:37:29.032600Z

ahh I can test full? on a buffer

2019-07-30T10:41:51.034100Z

though that doesn’t seem to solve it as another thread may put on the channel between when I check and add to the chan. I really need to raise an error on put rather than block.

2019-07-30T10:42:38.034400Z

ok offer! looks like what I want

Jan K 2019-07-30T12:06:18.035800Z

put! itself will eventually start throwing exceptions when the internal channel buffer fills up (1024 puts), that's in addition to the explicit channel buffer you can provide

2019-07-30T16:48:25.038800Z

One thing I’d like to do is dedupe duplicate requests on a core.async channel. I initially wrote (chan 10 (distinct)), which works but will eventually OOM the server as the volatile inside the distinct transducer isn’t bounded. I was thinking I could do this by creating my own DistinctBuffer deftype that implements the Buffer protocol. Does that sound like a good idea?

2019-07-30T16:55:07.040700Z

I there are broadly 3 places you could implement that: 1. a custom buffer 2. a custom transducer 3. a custom version of async/pipe (copies between 2 channels)

2019-07-30T16:55:31.041200Z

the last is maybe the quickest to prototype and try out

2019-07-30T16:56:28.042800Z

I did think about 2 and 3 also, but didn’t mention the other requirement; which was to enforce a fixed buffer size on the set.

2019-07-30T16:56:51.043300Z

so I think a custom buffer might be best

2019-07-30T16:57:00.043600Z

ah, you want to use the buffer as the set

2019-07-30T16:57:05.043800Z

yeah — essentially use a set as the buffer, with a max size

2019-07-30T17:02:25.045300Z

ok this should be pretty easy actually

2019-07-30T17:02:38.045700Z

but need to go home now 😞 so it’s gonna have to wait

alexmiller 2019-07-30T17:14:23.046Z

did you try the dedupe transducer?

alexmiller 2019-07-30T17:15:01.046400Z

that will remove consecutive dupes (but not overall dupes)

2019-07-30T18:29:50.047Z

yeah I did but I’d like to remove any dupes in the buffer

2019-07-30T18:31:20.047400Z

I wonder if some lib has implemented cache-chan yet

2019-07-30T18:31:46.048Z

with a clojure.core.cache cache, and a keying function, you could get TTL / FIFO / LIFO etc.

2019-07-30T18:33:37.048400Z

this lib looks like it tries that idea, can't vouch for how well it does so https://github.com/benashford/memoasync

Joe Lane 2019-07-30T18:34:28.049Z

Would it be simpler to make that part of a stateful transducer instead?

2019-07-30T18:34:46.049400Z

I think that's a better approach than the above lib, yeah

2019-07-30T18:56:11.050900Z

what does a "cache-chan" do? memoization works for functions becuase they map an input to an output, but a channel is input in and input out, so it is not clear to me what memoizing that means

2019-07-30T18:59:26.051700Z

that's why I suggested a keying function (I guess one would want a transform function as well)

2019-07-30T18:59:36.052Z

I guess it's a half baked idea really

alexmiller 2019-07-30T19:00:38.052700Z

promise-chan is a one-slot cache :)

2019-07-30T19:00:48.053200Z

for the original question, not passing along dups, the message itself would be the cache key

2019-07-30T19:01:03.053900Z

and then the cache rules would constrain the size of that dedupe cache

2019-07-30T19:01:12.054300Z

and promise-chan works great when memoizing

alexmiller 2019-07-30T19:01:15.054500Z

the problem with wanting to filter all dupes is that you need arbitrary sized memory to remember what you've seen

2019-07-30T19:01:26.054800Z

the memoasync project likely predates promise-chan

2019-07-30T19:42:00.057400Z

not all dupes ever, just all dupes in a bounded buffer

2019-07-30T19:43:44.058300Z

yeah it predates transducers too