core-async

wombawomba 2020-06-18T10:50:00.485500Z

So I have an event bus, and I want to selectively push events from this bus to clients over websockets according to their ‘subscriptions’ (each client subscribes to and is authorized to view specific messages). How should I design this using core.async?

Ben Sless 2020-06-18T14:41:22.485600Z

Why not use async/pub and async/sub and set up "draining" go loops which will consume from the subscription and push to the websocket?

Ben Sless 2020-06-18T14:41:36.485800Z

I'm not sure it will handle backpresure right

Ben Sless 2020-06-18T14:41:38.486Z

but you can try

2020-06-18T14:44:45.486200Z

do not use go blocks for IO, use async/thread

2020-06-18T14:45:12.486400Z

IO can starve the limited core.async thread pool, which impacts all go blocks

Ben Sless 2020-06-18T14:45:39.486600Z

you're right! my oversight but what would you do about an unbounded number of consumers?

2020-06-18T14:46:52.486800Z

you can use the channels created by pub and sub inside thread, and core.async channels impose backpressure unless you actively try to subvert it

Ben Sless 2020-06-18T14:48:45.487Z

To clarify - if you start an infinite loop (until the socket is closed) in a thread for every user, you'll be allocating an unbounded number of threads. I'm not saying you can't use the channels, I'm wondering how to handle this unbounded thread creation

2020-06-18T14:53:27.487200Z

you can use alts!! on a collection of channels in a single thread

2020-06-18T14:53:50.487400Z

you can isolate the io into a thread, and move other coordination code into go blocks

wombawomba 2020-06-18T14:56:59.487600Z

yeah that sounds good

wombawomba 2020-06-18T14:58:22.487800Z

I don’t think I want pub/sub because each nearly websocket connection is going to want different events

2020-06-18T14:58:50.488Z

I must say, the core.async limitations about IO and CPU heavy processes inside go blocks seem to constantly violate the principle of least surprise, it seems it's an issue with more questions on this channel than all other issues combined...

2020-06-18T14:58:50.488200Z

oh! sounds like you want a router!

wombawomba 2020-06-18T14:59:01.488400Z

…and also I’m thinking I’d like to handle authorization on the ‘pub’ side

wombawomba 2020-06-18T14:59:55.488600Z

a router does sound like something I might want 🙂

wombawomba 2020-06-18T15:00:17.488800Z

but I don’t see anything about routers in the core.async api?

2020-06-18T15:00:46.489Z

I wonder how closely "router with auth middleware and a peer to peer chat between clients" maps to this LOL - classic async 101 example

2020-06-18T15:00:59.489200Z

clojure has routers that are data-in / data-out

2020-06-18T15:01:51.489400Z

but I'm talking about the abstraction here, not necessarily a drop in implementation (because of course you don't want an HTTP router and that's what most people mean when they say router...)

wombawomba 2020-06-18T15:01:58.489600Z

ah, got it

wombawomba 2020-06-18T15:02:50.489800Z

yeah, essentially I’d like something like an ‘inverted’ router, that reads events from a single source and ‘fans out’ to deliver them to the subscription websockets

wombawomba 2020-06-18T15:04:28.490Z

one idea I had was to have an atom containing the websocket ‘routes’/subscriptions, and a ‘router’ thread that reads messages and forwards them according to what’s in the atom

2020-06-18T15:04:30.490200Z

you don't even have to invert anything if you abstract the incoming message and outgoing sends to function calls - it just happens that your input io layer is an event bus and your "handlers" are channel writes

2020-06-18T15:04:50.490400Z

I guess the difference is that the result of the channel write doesn't need to go back to the event bus

wombawomba 2020-06-18T15:05:02.490600Z

right, yeah

wombawomba 2020-06-18T15:05:38.490800Z

so I’m thinking I could have a simple per-websocket loop in a future or similar, that receives messages from the router over a dedicated channel

2020-06-18T15:06:56.491Z

or - a go-loop owning a set of clients it manages via recur arg, and it reads the channel that provides incoming event-bus events

2020-06-18T15:07:17.491200Z

the only thing that needs a real thread is the IO, and that can be abstracted out

wombawomba 2020-06-18T15:08:58.491400Z

hmm yeah I guess

wombawomba 2020-06-18T15:09:17.491600Z

how would I add/remove clients with that setup?

wombawomba 2020-06-18T15:09:50.491800Z

starting/stopping a websocket would happen independently of the event loop

2020-06-18T15:10:04.492Z

a channel could provide sub / unsub operations to the router loop

2020-06-18T15:10:39.492200Z

or perhaps more elegantly, you only need a channel for sub, and closing a channel (which should already be detected and handled) can signal its removal

wombawomba 2020-06-18T15:22:24.492400Z

you mean signal over the same channel as for subs?

wombawomba 2020-06-18T15:22:43.492700Z

also, what are the advantages of this approach over keeping an atom?

wombawomba 2020-06-18T15:23:29.492900Z

one advantage of the atom approach is that it’ll make it trivial for me to inspect the ‘subscription state’, e.g. from a repl

2020-06-18T15:24:27.493100Z

the advantage of the loop param option is that it doesn't tie a block of code to a single global resource, which has implications for testing and refactoring

wombawomba 2020-06-18T15:25:01.493300Z

ah okay got it, thanks

2020-06-18T15:26:07.493500Z

also, you end up with coordination questions between the atom and the go-loop - eg. removing a chan from the atom while the go-loop is mid channel operation - not a likely problem, but one you need to keep in mind

wombawomba 2020-06-18T15:26:32.493700Z

makes sense

2020-06-18T15:27:27.493900Z

if the advantage is repl visibility, you can use reset! inside the loop to update an atom, and still do all the internal ops in a way that leverages the advantages of core.async coordination

2020-06-18T15:27:43.494100Z

without mixing in atom swap! semantics

2020-06-18T15:28:37.494300Z

all that said, you know your app better than I do, sometimes the benefit of using an atom is simpler code, and you can prove the gotchas I mentioned won't matter

wombawomba 2020-06-18T15:29:07.494500Z

yeah, I think I’ll have to try both out and see what feels best 🙂

wombawomba 2020-06-18T15:29:58.494700Z

I think I have a pretty good idea of how to make this happen now, so thanks

2020-06-18T15:35:37.494900Z

one other small thing: a bare bones router - have a "dispatch" map from key to targets {dispatch-x #{chan0 chan2} dispatch-y #{chan1 chan2}} then (let [subscribers (get dispatch (key-fn message))] ...)

👍 1