I've been working with core.async for a decent amount of time, but mostly with a small subset consisting of parking put and take, basic channels with sized buffers, and the thread macro. At the moment I'm making a more complex set of processes which need to interact with each other, and as a result I've found need of mult. It seems like sometimes my messages are getting lost on the mult though. Are there any common pitfalls with mults which would cause this?
There are a number of open mult issues, I think they don't lead to dropped messages, but I am not sure
Alright. Well I guess in this case I can just keep a vector of open channels since that will allow individual communication as well as broadcast communication.
You're not taking from the source Chan used to make the mult right?
No, I don't believe so.
I think the open issues are mostly about cleaning up on untap and closing channels
Fair enough. And also can confirm I'm not pulling from the channel used to make the mult.
just checking that I'm not missing something: If I have a channel, is there some way to replicate "attaching a callback" to the closing of that chan, and then passing the chan along, without wrapping take!
/`<!!`?
I have a function that creates a chan that is designed to close, and I want it to return the chan, but there is also some cleanup stuff that needs to be done when the chan closes
There is no way to attach anything to respond to the closing of a channel. I often just use another channel to signal that kind of thing
gotcha, thanks
I guess I could use a mult
and just take everything from a tapped chan and wait for it to return nil
maybe you could make something like pipe
that takes a callback. just loop over the results and if you get null from the source channel, do the callback and close the sink channel
and of course, pass non-nil things onto the sink
oh yeah that's probably better
of course, if the sink is being taken from slowly, then there's the possibility that the source channel will be closed for some time before the pipe would see that the source is closed
so it depends on when exactly the callback needs to be called
nah this should be perfect for my use case
This is an example of what I cam up with:
https://gist.github.com/jjttjj/854e725c2ae1e0a1f3b5a4b235252df3
I'm wondering if there's an obvious way to get around the fact that I have to either use a predefined out-ch
in put-msgs!
there, or pass the output channel as a parameter and have to mange 2 channels everywhere? I beleive the way it currently is, with out-ch
as just a channel with a preset buffer won't work in situations where that buffer isn't appropriate
looks vaguely like maybe a custom implementation of the Pub protocol for pub/sub
it is hard to tell what is going on there, I think maybe the intent is for the process and handler stuff to simulate so external queue or pubsub system?
it is weird that put-msgs! removes a single handler, but stops the loop that would be feeding any other handlers
oh yeah you're right it shouldn't stop the loop there
basically i have an incoming stream of messages and i basically want to be able to call a function that puts a copy of each message into a chan so that I can subject it to a transducer
and sometimes there will be a point that makes sense for these messages to end, which is expressed via the transducer
I do think a mult
would work, just pass all messages to a mult
and tap the new transducer-channels as needed. But I was sort of trying to avoid having to keep the mult somewhere and tapping it. (Though perhaps that's not any worse than just calling add handlers on the connection anyway)
why not have whatever is consuming result-chan trigger the clean up?
because then i'd have to keep the handler value h
around so it could be removed, though I suppose there are design choices around this as well
(let [c (chan xf)
_ (give-me-messages whatever c)
messages (async/<! (async/into [] c))]
(stop-giving-me-messages whatever c)
messages)
hmmmm my thinking was that sometimes the processes are longer running and won't actually be put into []
and will be handled more asynchronously
some of the transducer-channels will end up in a reduced state but sometimes they will just be closed later on "manually" and the messages handled as they come in, but either way I want to remove the handler on the closing of the chan
and my thinking was it'd be convenient to do that at the same place where it's attached and not have to worry about it
basically extending mult's handling of closed downstream channels to whatever your source of information is
maybe implement async/Mult for your source
interesting
or Pub, depending on if your handler stuff is meant to simulate different subscriptions or not
or, if you want to be really terrible(and if you are doing something pubsubish), create a standard async/pub, then use reflection to get access to the atom inside it, and add a watch to that atom to be notified when the topics subscribed to change
haha wow. I think I'll try the mult thing first