So I have an event bus, and I want to selectively push events from this bus to clients over websockets according to their ‘subscriptions’ (each client subscribes to and is authorized to view specific messages). How should I design this using core.async?
Why not use async/pub
and async/sub
and set up "draining" go loops which will consume from the subscription and push to the websocket?
I'm not sure it will handle backpresure right
but you can try
do not use go blocks for IO, use async/thread
IO can starve the limited core.async thread pool, which impacts all go blocks
you're right! my oversight but what would you do about an unbounded number of consumers?
you can use the channels created by pub and sub inside thread, and core.async channels impose backpressure unless you actively try to subvert it
To clarify - if you start an infinite loop (until the socket is closed) in a thread for every user, you'll be allocating an unbounded number of threads. I'm not saying you can't use the channels, I'm wondering how to handle this unbounded thread creation
you can use alts!!
on a collection of channels in a single thread
you can isolate the io into a thread, and move other coordination code into go blocks
yeah that sounds good
I don’t think I want pub/sub because each nearly websocket connection is going to want different events
I must say, the core.async limitations about IO and CPU heavy processes inside go blocks seem to constantly violate the principle of least surprise, it seems it's an issue with more questions on this channel than all other issues combined...
oh! sounds like you want a router!
…and also I’m thinking I’d like to handle authorization on the ‘pub’ side
a router does sound like something I might want 🙂
but I don’t see anything about routers in the core.async api?
I wonder how closely "router with auth middleware and a peer to peer chat between clients" maps to this LOL - classic async 101 example
clojure has routers that are data-in / data-out
but I'm talking about the abstraction here, not necessarily a drop in implementation (because of course you don't want an HTTP router and that's what most people mean when they say router...)
ah, got it
yeah, essentially I’d like something like an ‘inverted’ router, that reads events from a single source and ‘fans out’ to deliver them to the subscription websockets
one idea I had was to have an atom containing the websocket ‘routes’/subscriptions, and a ‘router’ thread that reads messages and forwards them according to what’s in the atom
you don't even have to invert anything if you abstract the incoming message and outgoing sends to function calls - it just happens that your input io layer is an event bus and your "handlers" are channel writes
I guess the difference is that the result of the channel write doesn't need to go back to the event bus
right, yeah
so I’m thinking I could have a simple per-websocket loop in a future or similar, that receives messages from the router over a dedicated channel
or - a go-loop owning a set of clients it manages via recur arg, and it reads the channel that provides incoming event-bus events
the only thing that needs a real thread is the IO, and that can be abstracted out
hmm yeah I guess
how would I add/remove clients with that setup?
starting/stopping a websocket would happen independently of the event loop
a channel could provide sub / unsub operations to the router loop
or perhaps more elegantly, you only need a channel for sub, and closing a channel (which should already be detected and handled) can signal its removal
you mean signal over the same channel as for subs?
also, what are the advantages of this approach over keeping an atom?
one advantage of the atom approach is that it’ll make it trivial for me to inspect the ‘subscription state’, e.g. from a repl
the advantage of the loop param option is that it doesn't tie a block of code to a single global resource, which has implications for testing and refactoring
ah okay got it, thanks
also, you end up with coordination questions between the atom and the go-loop - eg. removing a chan from the atom while the go-loop is mid channel operation - not a likely problem, but one you need to keep in mind
makes sense
if the advantage is repl visibility, you can use reset!
inside the loop to update an atom, and still do all the internal ops in a way that leverages the advantages of core.async coordination
without mixing in atom swap! semantics
all that said, you know your app better than I do, sometimes the benefit of using an atom is simpler code, and you can prove the gotchas I mentioned won't matter
yeah, I think I’ll have to try both out and see what feels best 🙂
I think I have a pretty good idea of how to make this happen now, so thanks
one other small thing: a bare bones router - have a "dispatch" map from key to targets {dispatch-x #{chan0 chan2} dispatch-y #{chan1 chan2}}
then (let [subscribers (get dispatch (key-fn message))] ...)