so, I've started using this:
(defn full? [c]
(.full? (.buf c)))
(defn msgs-on-chan [c]
(.count (.buf c)))
;; Thank you Dan Compton <https://danielcompton.net/2015/08/04/core-async-channel-size>
(defn chan-status [chan-map chan-keys]
(into {}
(map (fn [[k c]]
[k {:full? (full? c)
:backlog (msgs-on-chan c)}]))
(select-keys chan-map chan-keys)))
to just get an idea if my channels are stalled (it has already found one bug for me where I tapped a channel on a mult but didn't put anything on to consume from that channel
I see that I shouldn't use this kind of thing in production, but it has been pretty good for seeing what I might have done wrong in development. It is also giving me a clue about which channels might be running slowly and how to size things. Is there a better way of doing this?
I'm trying to 1. See if things are actually flowing through my channels 2. See if I've got my channels appropriately sized
You can also implement your own buffer (based on the built-in ones) and add logging when it's getting full.
@otfrom logic that uses full?
or msgs-on-chan
can have race conditions that core.async would usually make impossible
this is why these sorts of queries don't exist in the lib itself
but I see you are just using it for diagnostics, not application logic
that's right. I had 4 channels hanging off a mult and I forgot to put the consuming process on one of them. The other 3 drained and the last one was full
It helped me figure it out right away
one could even make a metrics dashboard for core.async channels, like the ones used in big distsys frameworks :D
something like
(deftype MeteredChannel [take-meter put-meter ch]
impl/Channel
(close! [_]
(impl/close! ch))
(closed? [_]
(impl/closed? ch))
impl/ReadPort
(take! [_ fn1]
(take-meter)
(impl/take! ch fn1))
impl/WritePort
(put! [_ val fn1]
(put-meter)
(impl/put! ch val fn1)))
might be useful for that kind of thing, and less prone to abuse, you would look for discrepancies in the rate of calls to take-meter and put-meterfull? and msgs-on-chan above bother directly examine the buffer, which is a mutable not-threadsafe collection (the channel uses locks when mutating it)
hey, i’m converting an async tree walker from futures to core async, and a couple of things seem to be not quite right with how I’m going about it. • using core async recursively isn’t quite working out - it stalls at a certain depth. I’m guessing this is because it’s blocking until a thread becomes available..? • when I do block core async like this, how can i reset the thread executor to start again?
you can't
don't do blocking things in a go block
https://github.com/clj-pour/pour/pull/14/files?w=1 PR I’m working on, for reference
so, recursion where the parent blocks waiting for the child to complete is a bad idea? thought it might be, but was wondering about other ways to go about it
if you are calling a function that contains <!! that is blocking
understood
the only way to start again, if CA does get into a fully blocked state, is to restart the repl..?
i’m using this as an opportunity to learn more about ca, making many mistakes like this along the way
there might be some nonsense you can do, that could unstick things, but very likely will just end up in a wilder more unknown state
you cannot do any real blocking operations in the call graph of a go block, only >! <! and the flavors of alts!
so, on that PR, I only have (afaict) one blocking call, at the root
hard to say, I dunno what knit does, and there are some callbacks passed around
if you are using datomic, I don't know, but my guess would be the datomic async api is also using core.async, and the sync api uses the async api with a real blocking <!! at the "root"
so you could be filling the threadpool with tasks that are blocked on some datomic operation that are being it in the threadpool
behind it
I am using datomic yep, and this is a kind of experiment in using a custom pull
-like approach where keywords can be given behaviour and weaved into the results
seems like core async is punishing me a bit for not defining a bit more precisely what it should do
the thing about measuring the buffer usage instead of the flow of messages through the channel, is it doesn't tell you anything useful for unbuffered channels, or channels with dropping or sliding buffers
if interested, figured it out - i think. I was using <!
inside reduce, map, keep etc - I turned this around to get a collection of channels instead of a collection of values, and all was ok 🙂
would be interested in why, though
you were recursively calling parse which was blocking the thread with <!!
interestingly, not - parse wasn’t using any blocking calls
parse was blocking
it did at one point, sorry if you saw an older version
I changed it out to return a channel from go-loop
yes, you made that change since I last looked
sorry, i thought had updated it
combing sequence processing with core.async is tricky because map, reduce, etc and it is an easy mistake to make to introduce blocking in that function
bingo, that was exactly what i did
have to process into channels
I was taking inside a keep
for example, and that complained about <! not being inside a go block too
you might want to look at using tranducers, either on channels directly, or as part of a pipeline
correct
the go transformation does not cross function boundaries
ah hah, good to know
this is quite a bit faster than the future version now too ❤️
do you have some examples..?
nope
All good, I’ll look into it, thank you for help