core-async

Alexis Vincent 2020-04-16T15:38:02.063Z

Is there a way to override the MAX-QUEUE-SIZE for a particular chan. I have a case where I create a promise-chan that resolves to some status when some computation finishes. And there are more then 1024 listeners who need to block until the status is finished. The amount of listeners scales with the complexity of our APP and could up to 5000 pending takes, and in another case scales with the amount of active connections in a partition, again is unlikely to go above 2000. I want to be able to set these limits on a per chan level. Not really a solution to set this globally (although still better), since this is an internal lib and I don’t want to pass on these limits to the app level. Its kind of ok for this to be some hack job with redefining MAX-QUEUE-SIZE with a patch. I’m kind of thinking a (binding [clojure.core.async.impl.protocols/MAX-QUEUE-SIZE 2000] …) or something similar is what im after, however clojure.core.async.impl.protocols/MAX-QUEUE-SIZE isnt dynamic so that wouldnt work. Any recomendations?

alexmiller 2020-04-16T16:04:13.064600Z

This is not currently configurable and that’s intentional as it’s generally not something people should run into

alexmiller 2020-04-16T16:04:29.065200Z

Perhaps you should consider using an actual promise?

Alexis Vincent 2020-04-16T16:46:53.069300Z

The code is heavily reliant on core.async, and needs to be clojurescript compat. For this case I might need to work around the issue using mult’s or something similar. But this is a pattern that is widely used in the codebase. As far as I can tell, theres no reason an arbitary upper-limit should be set on takes of a chan when quantity is well known and finite?

Alexis Vincent 2020-04-16T16:47:25.069900Z

I do understand the motivation behind capping it low in general though.

2020-04-16T16:47:41.070500Z

it exposes subtle bugs that happen under load, earlier

Alexis Vincent 2020-04-16T16:48:31.071800Z

It’s also something I wouldnt want to set globally in the software precisely because it might delay finding bugs when careless code is written

2020-04-16T16:48:31.071900Z

the pedantic answer to the scenario is someone should be applying backpressure or using a proper queue somewhere

Alexis Vincent 2020-04-16T16:48:50.072300Z

right. Which is almost always the case

Alexis Vincent 2020-04-16T16:49:03.072900Z

But in this case, thats not whats happening

Alexis Vincent 2020-04-16T16:50:50.074600Z

Back pressure is carefully accounted for in this system, however here we have a case where a finite known value needs to be used for pending takes, rather then in a buffer

Alexis Vincent 2020-04-16T16:52:01.075700Z

Imagine this value was 5, you would only be able to have 5 things sync around this point.

Alexis Vincent 2020-04-16T16:52:23.076100Z

For a promise-chan, this makes little sense

Alexis Vincent 2020-04-16T16:54:49.077500Z

@alexmiller Thanks for the response. Is there perhaps a way to simply make a different chan that would have this property, without copying the entire impl for ManyToManyChannel.

alexmiller 2020-04-16T17:17:09.078600Z

channels implement some protocols, so you could go that route, but it's probably not trivial if you want channels as generic as m2m

alexmiller 2020-04-16T17:17:30.079Z

but maybe for this narrow case it would work

alexmiller 2020-04-16T17:18:26.079200Z

look at timeout - that's a channel impl

ghadi 2020-04-16T17:18:53.079400Z

timeout reuses m2m

alexmiller 2020-04-16T17:19:16.079800Z

ah, well the relevant protocols are Channel, ReadPort, and WritePort

Alexis Vincent 2020-04-16T17:19:49.080600Z

awesome thanks. Will post results back here

ghadi 2020-04-16T17:20:11.081300Z

it's kind of subtle how to handle things, I would probably for fixing your app with a mult before reimpl'ing chans

Alexis Vincent 2020-04-16T17:20:46.082100Z

Either going to use a mult or try the impl route

alexmiller 2020-04-16T17:20:52.082300Z

there are lots of facilities to do this kind of thing in java.util.concurrent in the jvm, but I don't know the range of options in cljs

Alexis Vincent 2020-04-16T17:22:13.083500Z

its mainly a case of having a heavy core.async lib (virtual connection multiplexer), and wanting to use the primitives we have to interact with promise like things

Alexis Vincent 2020-04-16T17:22:38.084100Z

also needs to place nice in a go block

Alexis Vincent 2020-04-16T17:22:52.084400Z

Will let you know what I come up with

Alexis Vincent 2020-04-16T17:23:26.085200Z

Although count this as a vote to add some super secret way to dynamise MAX-QUEUE

Alexis Vincent 2020-04-16T17:23:36.085400Z

😉

Alexis Vincent 2020-04-16T17:23:50.085700Z

No one will ever know

2020-04-16T17:38:11.087500Z

https://gist.github.com/hiredman/1788aa052f26d127c00a1679656026f0 is an example of extending ReadPort to CompletionStage (the the completable future stuff in the jvm), which lets you use <! and alt! on a CompletableFuture, which doesn't have the channel limit, and you can do something similar in cljs with js promises

2020-04-16T17:39:22.087800Z

(I had the cljs version somewhere, but seemed to have lost it)

Alexis Vincent 2020-04-16T19:20:59.089200Z

@hiredman This is cool, thanks!

Alexis Vincent 2020-04-16T23:24:34.090100Z

(defn big-promise []
  (let [p (a/promise-chan)
        c (a/chan)
        m (a/mult c)]

    (a/go
      (let [v (a/&lt;! p)]
        (when v
          (a/&gt;! c v))
        (a/close! c)))

    (reify
      impl/ReadPort
      (take! [this handler]
          (a/go
            ;; not thread safe but... yolo
            (let [v (or (a/poll! p)
                        (let [c' (a/chan)]
                          (a/tap m c')
                          (a/&lt;! c')))]

              #?(:clj
                 (.lock handler))
              (let [good (and (impl/active? handler)
                              (impl/commit handler))]
                #?(:clj
                   (.unlock handler))
                (when good
                  (dispatch/run #(good v))))))
        nil)

      impl/WritePort
      (put! [this v handler]
        (impl/put! p v handler))

      impl/Channel
      (close! [this]
        (impl/close! p))
      (closed? [this]
        (impl/closed? p)))))

Alexis Vincent 2020-04-16T23:24:53.090500Z

This seems to work @alexmiller @hiredman

Alexis Vincent 2020-04-16T23:25:12.090800Z

@hiredman thanks for the gist!

2020-04-16T23:30:01.091400Z

😞

2020-04-16T23:33:30.093100Z

I strongly recommend you not do that, and instead go with the extension of ReadPort to CompletionStage, and if you really must extend WritePort to it as well, that will give you something that is kind of good and threadsafe instead of whatever this is