core-async

2020-10-27T13:57:44.107400Z

so, I've started using this:

(defn full? [c]
  (.full? (.buf c)))

(defn msgs-on-chan [c]
  (.count (.buf c)))

;; Thank you Dan Compton <https://danielcompton.net/2015/08/04/core-async-channel-size>
(defn chan-status [chan-map chan-keys]
  (into {}
        (map (fn [[k c]]
               [k {:full? (full? c)
                   :backlog (msgs-on-chan c)}]))
        (select-keys chan-map chan-keys)))

2020-10-27T13:58:18.108100Z

to just get an idea if my channels are stalled (it has already found one bug for me where I tapped a channel on a mult but didn't put anything on to consume from that channel

2020-10-27T13:59:27.109500Z

I see that I shouldn't use this kind of thing in production, but it has been pretty good for seeing what I might have done wrong in development. It is also giving me a clue about which channels might be running slowly and how to size things. Is there a better way of doing this?

2020-10-27T13:59:46.110Z

I'm trying to 1. See if things are actually flowing through my channels 2. See if I've got my channels appropriately sized

Jan K 2020-10-27T14:08:03.111500Z

You can also implement your own buffer (based on the built-in ones) and add logging when it's getting full.

2020-10-27T16:34:05.112300Z

@otfrom logic that uses full? or msgs-on-chan can have race conditions that core.async would usually make impossible

2020-10-27T16:34:17.112700Z

this is why these sorts of queries don't exist in the lib itself

2020-10-27T16:34:59.113Z

but I see you are just using it for diagnostics, not application logic

2020-10-27T16:43:08.113800Z

that's right. I had 4 channels hanging off a mult and I forgot to put the consuming process on one of them. The other 3 drained and the last one was full

2020-10-27T16:43:20.114300Z

It helped me figure it out right away

2020-10-27T16:50:38.114900Z

one could even make a metrics dashboard for core.async channels, like the ones used in big distsys frameworks :D

2020-10-27T17:01:08.116500Z

something like

(deftype MeteredChannel [take-meter put-meter ch]
  impl/Channel
  (close! [_]
    (impl/close! ch))
  (closed? [_]
    (impl/closed? ch))
  impl/ReadPort
  (take! [_ fn1]
    (take-meter)
    (impl/take! ch fn1))
  impl/WritePort
  (put! [_ val fn1]
    (put-meter)
    (impl/put! ch val fn1)))
might be useful for that kind of thing, and less prone to abuse, you would look for discrepancies in the rate of calls to take-meter and put-meter

2👍
2020-10-27T17:03:35.118Z

full? and msgs-on-chan above bother directly examine the buffer, which is a mutable not-threadsafe collection (the channel uses locks when mutating it)

dazld 2020-10-27T17:06:30.120500Z

hey, i’m converting an async tree walker from futures to core async, and a couple of things seem to be not quite right with how I’m going about it. • using core async recursively isn’t quite working out - it stalls at a certain depth. I’m guessing this is because it’s blocking until a thread becomes available..? • when I do block core async like this, how can i reset the thread executor to start again?

2020-10-27T17:06:51.120700Z

you can't

2020-10-27T17:07:03.121Z

don't do blocking things in a go block

dazld 2020-10-27T17:07:56.121800Z

https://github.com/clj-pour/pour/pull/14/files?w=1 PR I’m working on, for reference

dazld 2020-10-27T17:08:58.123100Z

so, recursion where the parent blocks waiting for the child to complete is a bad idea? thought it might be, but was wondering about other ways to go about it

2020-10-27T17:09:23.123900Z

if you are calling a function that contains <!! that is blocking

dazld 2020-10-27T17:10:14.124300Z

understood

dazld 2020-10-27T17:10:40.125100Z

the only way to start again, if CA does get into a fully blocked state, is to restart the repl..?

dazld 2020-10-27T17:11:12.126Z

i’m using this as an opportunity to learn more about ca, making many mistakes like this along the way

2020-10-27T17:12:27.127100Z

there might be some nonsense you can do, that could unstick things, but very likely will just end up in a wilder more unknown state

1👍
2020-10-27T17:13:30.128200Z

you cannot do any real blocking operations in the call graph of a go block, only >! <! and the flavors of alts!

dazld 2020-10-27T17:13:36.128400Z

so, on that PR, I only have (afaict) one blocking call, at the root

2020-10-27T17:16:13.129900Z

hard to say, I dunno what knit does, and there are some callbacks passed around

2020-10-27T17:17:23.131500Z

if you are using datomic, I don't know, but my guess would be the datomic async api is also using core.async, and the sync api uses the async api with a real blocking <!! at the "root"

2020-10-27T17:18:18.133100Z

so you could be filling the threadpool with tasks that are blocked on some datomic operation that are being it in the threadpool

2020-10-27T17:19:08.133300Z

behind it

dazld 2020-10-27T17:20:38.134700Z

I am using datomic yep, and this is a kind of experiment in using a custom pull -like approach where keywords can be given behaviour and weaved into the results

dazld 2020-10-27T17:21:54.135900Z

seems like core async is punishing me a bit for not defining a bit more precisely what it should do

2020-10-27T17:56:14.137100Z

the thing about measuring the buffer usage instead of the flow of messages through the channel, is it doesn't tell you anything useful for unbuffered channels, or channels with dropping or sliding buffers

dazld 2020-10-27T18:21:43.138700Z

if interested, figured it out - i think. I was using &lt;! inside reduce, map, keep etc - I turned this around to get a collection of channels instead of a collection of values, and all was ok 🙂

dazld 2020-10-27T18:22:28.139100Z

would be interested in why, though

2020-10-27T18:23:48.140Z

you were recursively calling parse which was blocking the thread with <!!

dazld 2020-10-27T18:26:30.142Z

interestingly, not - parse wasn’t using any blocking calls

2020-10-27T18:26:39.142400Z

parse was blocking

dazld 2020-10-27T18:26:41.142500Z

it did at one point, sorry if you saw an older version

dazld 2020-10-27T18:27:07.143200Z

I changed it out to return a channel from go-loop

2020-10-27T18:27:24.143700Z

yes, you made that change since I last looked

dazld 2020-10-27T18:28:11.144500Z

sorry, i thought had updated it

2020-10-27T18:29:24.146400Z

combing sequence processing with core.async is tricky because map, reduce, etc and it is an easy mistake to make to introduce blocking in that function

dazld 2020-10-27T18:29:36.146700Z

bingo, that was exactly what i did

dazld 2020-10-27T18:29:48.147100Z

have to process into channels

dazld 2020-10-27T18:30:32.148100Z

I was taking inside a keep for example, and that complained about <! not being inside a go block too

2020-10-27T18:30:36.148200Z

you might want to look at using tranducers, either on channels directly, or as part of a pipeline

2020-10-27T18:30:44.148400Z

correct

2020-10-27T18:31:00.148800Z

the go transformation does not cross function boundaries

1👍
dazld 2020-10-27T18:31:08.149Z

ah hah, good to know

dazld 2020-10-27T18:31:33.149500Z

this is quite a bit faster than the future version now too ❤️

dazld 2020-10-27T18:32:13.149800Z

do you have some examples..?

2020-10-27T18:33:51.150600Z

nope

dazld 2020-10-27T18:47:27.151300Z

All good, I’ll look into it, thank you for help