Is there a way to override the MAX-QUEUE-SIZE for a particular chan. I have a case where I create a promise-chan that resolves to some status when some computation finishes. And there are more then 1024 listeners who need to block until the status is finished. The amount of listeners scales with the complexity of our APP and could up to 5000 pending takes, and in another case scales with the amount of active connections in a partition, again is unlikely to go above 2000. I want to be able to set these limits on a per chan level. Not really a solution to set this globally (although still better), since this is an internal lib and I don’t want to pass on these limits to the app level. Its kind of ok for this to be some hack job with redefining MAX-QUEUE-SIZE with a patch. I’m kind of thinking a (binding [clojure.core.async.impl.protocols/MAX-QUEUE-SIZE 2000] …) or something similar is what im after, however clojure.core.async.impl.protocols/MAX-QUEUE-SIZE isnt dynamic so that wouldnt work. Any recomendations?
This is not currently configurable and that’s intentional as it’s generally not something people should run into
Perhaps you should consider using an actual promise?
The code is heavily reliant on core.async, and needs to be clojurescript compat. For this case I might need to work around the issue using mult’s or something similar. But this is a pattern that is widely used in the codebase. As far as I can tell, theres no reason an arbitary upper-limit should be set on takes of a chan when quantity is well known and finite?
I do understand the motivation behind capping it low in general though.
it exposes subtle bugs that happen under load, earlier
It’s also something I wouldnt want to set globally in the software precisely because it might delay finding bugs when careless code is written
the pedantic answer to the scenario is someone should be applying backpressure or using a proper queue somewhere
right. Which is almost always the case
But in this case, thats not whats happening
Back pressure is carefully accounted for in this system, however here we have a case where a finite known value needs to be used for pending takes, rather then in a buffer
Imagine this value was 5, you would only be able to have 5 things sync around this point.
For a promise-chan, this makes little sense
@alexmiller Thanks for the response. Is there perhaps a way to simply make a different chan that would have this property, without copying the entire impl for ManyToManyChannel.
channels implement some protocols, so you could go that route, but it's probably not trivial if you want channels as generic as m2m
but maybe for this narrow case it would work
look at timeout - that's a channel impl
timeout reuses m2m
ah, well the relevant protocols are Channel, ReadPort, and WritePort
awesome thanks. Will post results back here
it's kind of subtle how to handle things, I would probably for fixing your app with a mult before reimpl'ing chans
Either going to use a mult or try the impl route
there are lots of facilities to do this kind of thing in java.util.concurrent in the jvm, but I don't know the range of options in cljs
its mainly a case of having a heavy core.async lib (virtual connection multiplexer), and wanting to use the primitives we have to interact with promise like things
also needs to place nice in a go block
Will let you know what I come up with
Although count this as a vote to add some super secret way to dynamise MAX-QUEUE
😉
No one will ever know
https://gist.github.com/hiredman/1788aa052f26d127c00a1679656026f0 is an example of extending ReadPort to CompletionStage (the the completable future stuff in the jvm), which lets you use <! and alt! on a CompletableFuture, which doesn't have the channel limit, and you can do something similar in cljs with js promises
(I had the cljs version somewhere, but seemed to have lost it)
@hiredman This is cool, thanks!
(defn big-promise []
(let [p (a/promise-chan)
c (a/chan)
m (a/mult c)]
(a/go
(let [v (a/<! p)]
(when v
(a/>! c v))
(a/close! c)))
(reify
impl/ReadPort
(take! [this handler]
(a/go
;; not thread safe but... yolo
(let [v (or (a/poll! p)
(let [c' (a/chan)]
(a/tap m c')
(a/<! c')))]
#?(:clj
(.lock handler))
(let [good (and (impl/active? handler)
(impl/commit handler))]
#?(:clj
(.unlock handler))
(when good
(dispatch/run #(good v))))))
nil)
impl/WritePort
(put! [this v handler]
(impl/put! p v handler))
impl/Channel
(close! [this]
(impl/close! p))
(closed? [this]
(impl/closed? p)))))
This seems to work @alexmiller @hiredman
@hiredman thanks for the gist!
😞
I strongly recommend you not do that, and instead go with the extension of ReadPort to CompletionStage, and if you really must extend WritePort to it as well, that will give you something that is kind of good and threadsafe instead of whatever this is