core-async

2019-09-06T00:41:00.085500Z

I've been working with core.async for a decent amount of time, but mostly with a small subset consisting of parking put and take, basic channels with sized buffers, and the thread macro. At the moment I'm making a more complex set of processes which need to interact with each other, and as a result I've found need of mult. It seems like sometimes my messages are getting lost on the mult though. Are there any common pitfalls with mults which would cause this?

2019-09-06T01:06:40.087200Z

There are a number of open mult issues, I think they don't lead to dropped messages, but I am not sure

2019-09-06T01:16:24.087800Z

Alright. Well I guess in this case I can just keep a vector of open channels since that will allow individual communication as well as broadcast communication.

2019-09-06T01:17:23.088700Z

You're not taking from the source Chan used to make the mult right?

2019-09-06T01:22:58.089900Z

No, I don't believe so.

2019-09-06T01:23:14.090300Z

I think the open issues are mostly about cleaning up on untap and closing channels

2019-09-06T01:23:44.090900Z

Fair enough. And also can confirm I'm not pulling from the channel used to make the mult.

2019-09-06T16:58:18.094800Z

just checking that I'm not missing something: If I have a channel, is there some way to replicate "attaching a callback" to the closing of that chan, and then passing the chan along, without wrapping take!/`<!!`?

2019-09-06T16:59:13.095500Z

I have a function that creates a chan that is designed to close, and I want it to return the chan, but there is also some cleanup stuff that needs to be done when the chan closes

2019-09-06T17:02:49.098300Z

There is no way to attach anything to respond to the closing of a channel. I often just use another channel to signal that kind of thing

2019-09-06T17:03:27.098700Z

gotcha, thanks

2019-09-06T17:04:34.099700Z

I guess I could use a mult and just take everything from a tapped chan and wait for it to return nil

markmarkmark 2019-09-06T17:14:57.100900Z

maybe you could make something like pipe that takes a callback. just loop over the results and if you get null from the source channel, do the callback and close the sink channel

markmarkmark 2019-09-06T17:15:13.101200Z

and of course, pass non-nil things onto the sink

2019-09-06T17:16:35.101500Z

oh yeah that's probably better

markmarkmark 2019-09-06T17:17:39.102500Z

of course, if the sink is being taken from slowly, then there's the possibility that the source channel will be closed for some time before the pipe would see that the source is closed

markmarkmark 2019-09-06T17:17:49.102800Z

so it depends on when exactly the callback needs to be called

2019-09-06T17:19:51.103100Z

nah this should be perfect for my use case

2019-09-06T21:21:14.106900Z

This is an example of what I cam up with: https://gist.github.com/jjttjj/854e725c2ae1e0a1f3b5a4b235252df3 I'm wondering if there's an obvious way to get around the fact that I have to either use a predefined out-ch in put-msgs! there, or pass the output channel as a parameter and have to mange 2 channels everywhere? I beleive the way it currently is, with out-ch as just a channel with a preset buffer won't work in situations where that buffer isn't appropriate

2019-09-06T21:28:10.107600Z

looks vaguely like maybe a custom implementation of the Pub protocol for pub/sub

2019-09-06T21:29:29.108500Z

it is hard to tell what is going on there, I think maybe the intent is for the process and handler stuff to simulate so external queue or pubsub system?

2019-09-06T21:30:13.109100Z

it is weird that put-msgs! removes a single handler, but stops the loop that would be feeding any other handlers

2019-09-06T21:30:32.109500Z

oh yeah you're right it shouldn't stop the loop there

2019-09-06T21:31:29.110700Z

basically i have an incoming stream of messages and i basically want to be able to call a function that puts a copy of each message into a chan so that I can subject it to a transducer

2019-09-06T21:32:16.111400Z

and sometimes there will be a point that makes sense for these messages to end, which is expressed via the transducer

2019-09-06T21:34:09.113100Z

I do think a mult would work, just pass all messages to a mult and tap the new transducer-channels as needed. But I was sort of trying to avoid having to keep the mult somewhere and tapping it. (Though perhaps that's not any worse than just calling add handlers on the connection anyway)

2019-09-06T21:37:23.114600Z

why not have whatever is consuming result-chan trigger the clean up?

2019-09-06T21:40:00.115300Z

because then i'd have to keep the handler value h around so it could be removed, though I suppose there are design choices around this as well

2019-09-06T21:40:12.115600Z

(let [c (chan xf)
      _ (give-me-messages whatever c)
      messages (async/&lt;! (async/into [] c))]
  (stop-giving-me-messages whatever c)
  messages)

2019-09-06T21:44:15.117700Z

hmmmm my thinking was that sometimes the processes are longer running and won't actually be put into [] and will be handled more asynchronously

2019-09-06T21:45:36.118800Z

some of the transducer-channels will end up in a reduced state but sometimes they will just be closed later on "manually" and the messages handled as they come in, but either way I want to remove the handler on the closing of the chan

2019-09-06T21:46:24.119400Z

and my thinking was it'd be convenient to do that at the same place where it's attached and not have to worry about it

2019-09-06T21:47:44.120300Z

basically extending mult's handling of closed downstream channels to whatever your source of information is

2019-09-06T21:48:10.120600Z

maybe implement async/Mult for your source

2019-09-06T21:48:42.121Z

interesting

2019-09-06T21:49:15.121700Z

or Pub, depending on if your handler stuff is meant to simulate different subscriptions or not

2019-09-06T21:56:22.123700Z

or, if you want to be really terrible(and if you are doing something pubsubish), create a standard async/pub, then use reflection to get access to the atom inside it, and add a watch to that atom to be notified when the topics subscribed to change

2019-09-06T22:00:14.124100Z

haha wow. I think I'll try the mult thing first