Can you look on my question https://clojurians.slack.com/archives/C03S1KBA2/p1577696618239500 here? Am I using async correctly or is it not right pattern for async? When should I use async then and when not to use? Is it possible to solve my issue with async?
^ the discussion is in the thread of this message
does it need to be async rather than a thread pool?
well in general this was made to avoid situation when processing is running at the same time, because it makes issues. So it is a queue.
anyway I would like to learn the topic around
so is it wrong pattern for async?
not wrong to my eyes. i just saw a queue taking runnable functions which is essentially a simpler version of a thread pool
how can I fix my issue using async?
the issue is (I guess) worker
and worker-warehouse
overflow capacity after 2-4 weeks
because nothing reads from then, this two are only workers who work hard đ
but output doesnât matter
đ
and all in all what is the best solution for my case?
could it be one of these two?
* go-loop
works fine, but whoever is enqueueing has a problem
* some other (unrelated) piece of code uses core.async in such a way that it causes trouble to c.a's internal thread pool
also, the JMX suggestion you received is on point. Similarly you can run Yourkit in prod/staging and connect to it from your machine.
Tools like yourkit tell you exactly where/how your threads are stalling
We can assume go-loop
overflow after some time, so debugging is not really needed. But the question is how to fix it?
> We can assume go-loop overflow after some time
What makes you believe so? Your go-loop consumes work in an orderly way thanks to its <!
There's no such thing as consumption overflow (there is the notion of production overflow)
hmm that is what @delaguardo suggested
but yes I donât understand it too how it overflows
hmm
unless it is about recur
overflow?
He seems to talk about the case of using #'worker
as a channel. Are you even doing that?
can it be?
I think not
no, if recur
compiles it's tail-recursive and doesn't accrete a stack
* go-loop works fine, but whoever is enqueueing has a problem
What do you mean by this point then?> He seems to talk about the case of using #'worker
as a channel. Are you even doing that?
I donât and he said it is an issue, because this channel growing and growing
but I think it doesnât get even 1 value, because it is endless loop with recur
but maybe I miss something
> What do you mean by this point then?
Hypothetically, the problem is in the code that enqueues the work with >!
. Maybe it's blocking indefinitely, making you believe the go-loop
doesn't work
Also, using put!
can be dangerous because it disregards queue sizes. Do you have any calls to put!
?
> Hypothetically, the problem is in the code that enqueues the work with >!
. Maybe itâs blocking indefinitely, making you believe the go-loop
doesnât work
it works, because I see in logs it going further and finish adding things to queue. I have a message on the end to debug it.
I donât use put!
So adding to queue doesnât stuck
there are only 2 possibilites:
1. channels is closed for some magic reason 2. worker died 3. magic issues about memory or overflow something
but the strangest thing is both workers stop doing jobs at the same time - at least I think it is like that
2
can be checked with (<!! worker)
in the repl right? if it blocks, then it hasn't died
...it could be many things. thread
uses an unbounded (but cached) thread pool which might make the OS upset.
not really because I canât reproduce it on localhost
it happens after 2-4 weeks on production only
something like that
but because both workers stop doing jobs at the same timeâŚ. something overflow somehow probably
but I think it is not go-loop
because it never returns even 1 value
go-loop
as it name implies is made for looping so... I doubt it. It'd be extremely fragile if it'd didn't support infinite loops
I'd use something like AWS SQS for never losing work and having a greater degree of observability
I can lose work, it is designed to not be a problem. I would like to keep it in the app to decrease complexity
using third party queue system increase complexity, which I donât need
I am running a simple job 100 000 times to see if it will stuck on localhost
so far it looks like it works
ok it finished 100 000 times. It is probably not about overflow any channels.
or worker
ech maybe run REPL on production temporaryâŚ
to connect it during an issue and debug then
> I can lose work That's an interesting requirement I'd say, in that case you should be able to use whatever you feel more comfortable with. Could be agents, futures, core.async, j.u.concurrent, claypoole... obviously they have different demands in case something goes wrong (i.e., many people will recommend you j.u.c, but if something goes wrong will you pick up a Java book? Many devs won't)
> Thatâs an interesting requirement The processing is resistant for crash in any moment
But for other reasons processing can processing only 1 thing at the same time
because of the limitation of third party system
so this is why app needs queue
a thread pool of size 1
also works like that
maybe, but will it solve anything to switch for thread pool (I didnât use so far)?
it doesnât look like it is directly async issue
well I have no idea what is the issue
it doesnât make sense đ
but from logs I am sure both workers stop doing new jobs at the same time which probably should tell me something, but it doesnât
if you are comfortable with it, it's more observable say it's a thread pool with threadsize 1, backed by a queue of max size 1000 if someting goes wrong you can fetch the queue size, its contents, etc. Likewise you might interact with the thread pool itself (check javadocs). also the error handling will be more straightforward. Less magic overall (vs. core.async)
Does it tell you anything?
make sure your channel (queues and workers) are opened
(clojure.core.async.impl.protocols/closed? chan)
and be careful with that function because it is intended to be âimplementation detailsâ and it should not be a part of your code base
I canât recreate it on localhost, so only one thing which I can do is push it to production and wait 2-4 weeks
or more
this things which I already added to logs today to be sure
(clojure.core.async.impl.protocols/closed? chan)
do you know how to run it on production?
No implementation of method: :closed? of protocol: #'clojure.core.async.impl.protocols/Channel found for class: nil
while it works on localhost in repl and even compile with it
@delaguardo ^ do you know how to run it?
you are trying to call it with nil as an argument, but it expects a channel
but it is not true đ
Iâm gessing here, based on error message)
(defonce worker
(go-loop []
(let [job (<! queue)]
(l/debug "run job" job)
(when-not (nil? job)
(<! (thread
(try
(job)
(catch Throwable ex
(l/error ex)))))
(recur)))))
(defonce worker-warehouse
(dotimes [x 3]
(go-loop []
(let [job (<! queue-warehouse)]
(l/debug "run warehouse job" job)
(when-not (nil? job)
(<! (thread
(try
(job)
(catch Throwable ex
(l/error ex)))))
(recur))))))
(defn add-job [ch f]
(l/debug "async: add" f "to" ch ": " (>!! ch f))
(l/debug "worker:" (clojure.core.async.impl.protocols/closed? worker)
"worker-warehouse" (clojure.core.async.impl.protocols/closed? worker-warehouse)))
but worker-warehouse should be nil, isnât it?
there is dotimes call inside
ah⌠I am dump. true
there is also a strange (when-not (nil? job
The only way to get nil from a channel is to consume from closed and exhausted channel
yes, it is added as a good habit to not pass nil
for processing in such case
but should be never true
I mean false in this case
but after consuming everything your code is going into recur branch and will try to consume from already closed channel then will got to the same branch
ok
but this is a big assumption, I mean statement that it will nether be true
ah, sorry, my bad
If it will be true, then worker
will close itself, because of closed channel
misreading
but I never close the channel, so âŚ.. ok good đ
the funny thing it started to happen after some update, maybe version of async or who knows. It wasnât like that from beginning
Like I said in other thread, your go-loop is pretty weird. I would rewrite it so it logs on nil and stops looping. Since when the channel take returns nil, it means the channel has closed. At least that way you'd know if the issue you're encountering is due to the channel closing
it stops looping on nil already
(defonce worker
(go-loop []
(if-let [job (<! queue)]
(do
(l/debug "run job" job)
(<! (thread
(try
(job)
(catch Throwable ex
(l/error ex)))))
(recur))
(l/debug "channel closed"))))
Oh is the recur inside the when
Ok, well do you know if that's the case?
why this issue happening? I have no idea. I added extra debugs and I have to wait a few weeks
I meant, do you know if it exited the go-loop ?
I think it doesnât, but next time I will have a proof in logs
I have 0 close!
this channel code
it can close only itself cause by some magic bug in async
But I did async update now also, so we will se
But the issue is probably the most challenging which I ever had in Clojure
at least one of them
Hum, well the other thing I can think of is a race condition
I mean a deadlock
Like one job waiting on another
doesnât happen - I am sure from logs
https://clojurians.slack.com/archives/C05423W6H/p1577702684086300
this is the interesting point
Well, that could be a deadlock
They'd both stop simultaneously if they deadlocked each other
no, because I clearly see from logs functions not block
and this fn donât depend on each other in any way
So what do you mean then by they both stop at the same time ?
I have logs
(l/debug "run job" job)
and I see both workers stop run new jobs at the same timeit is not like one worker stop running job and later second one
the issue happening for both of them at the same time
If I was you though, I'd just do this instead:
(defonce worker
(thread
(loop [job (<!! queue)]
(l/debug "run job" job)
(when-not (nil? job)
(try
(job)
(catch Throwable ex
(l/error ex)))
(recur (<!! queue))))))
(defn worker [process-fn]
(let [in (chan)
out (chan)]
(go-loop []
(let [msg (<! in)]
(>! out (process-fn msg)))
(recur))
[in out]))
consider something like this. worker function that returns in and out queues connected by process-fn. It will give you more control on created channelsSo, if you stop seeing "run job" at the same time, it could be the queues are empty, and your puts are failing, it could be they are deadlocked, or it could be the channel have closed. That's all I can think of
yes, I added debug to see if queue and workers are opened / closed today
this is also allows you to restart worker on some condition
and for adding to channel fn
so I will know this 3 things next time
Is there any other thing in your app that would keep it alive if those loops exited?
it looks like out
channel will be full at one moment
and it will stuck
not sure what do you mean?
small question - from the code above:
(loop [job (<! queue)]
...
(job))
does it means that you are passing functions throw channels?yes
what is the advantage of this fn?
Well, your process would exit if it stops looping. Normally an application starts and closes when it runs out of instructions to run, unless it has a main loop of some sort. In your case, your go-loop seem to be what does it. So if there's no other infinite loop, your program should shutdown if it was that the channels were closed and returned nil which would have exited the loops.
is it possible for that functions to throw checked exceptions? they are not runtime so will not be catched by try ... Trhowable
something like ClassNotFoundException
Well, since you want to process one job per worker at a time. This doesn't create a new thread on every job, it just reuses the same thread, and runs one job after another on it.
the app wouldnât exist if workers will stop working
Also, it isn't as much at risk of having issues where you've blocked the parking threadpool.
Since each worker gets their own thread
@delaguardo which function?
those exception will be thrown in the thread so your worker will be there
(job)
What do you mean?
(try
(job)
(catch Throwable ex
(l/error ex)))
is not exactly this?There's another possibility I'm thinking. I'm not sure what happens to the channel returned by thread on that exception case
there is 0 errors in logs
*for this case
Hum, ok. And all (job) return a non nil value ?
if (job)
throws checked exception it will not be catched by your try/catch block
whatever (job) return it doesnât matter
why is that? iâm not restricting chan by any buffer-size
hmm I have to read about this
but it is not the case, because I donât see `
(l/debug "run job" job)
`Throwable does catch everything
I think we wouldnât sole it today. We have to wait 2-4 weeks for new logsâŚ
I can post the solution here if I will find out it
Ya, you could try and wait and see if others have ideas, but it seems to be a very rare ocurence so it will be hard
I know but I guess there is memory limit or whatever limit. It canât be endless
And you're sure none of the jobs use core.async inside them?
yes
What does the producer code look like?
(defn sync-auto []
(if (state/get-sync)
(do
(slack/send-info "> > > Sync auto start add to queue")
(let [shop-ids (keys conf/shops)]
(doseq [shop-id shop-ids]
(add-job queue #(orders/atomstore-sync->wfirma shop-id :bk)))
(add-job queue sync-warehouse)
(doseq [shop-id shop-ids]
(add-job queue #(invoices/send-ivoices shop-id))))
(slack/send-info "< < < Sync auto added tasks to queue"))
(slack/send-warning "! ! ! Synchronization if off. Nothing to do.")))
I have super simple fn to add jobsMaybe there's an issue on their side
Hum... And what triggers sync-auto? Maybe that's what stops working after a while?
one per 30 minutes
And those (slack/send-info "> > > Sync auto start add to queue") logs keep showing?
it is working, because I see logs Sync auto start add to queue
and Sync auto added tasks to queue
Hum... ya that's pretty strange
I'll be curious to know if you ever find out the problem
I appreciate your engage into the topic, but probably we can only wait
heh me too, give me a few months :troll:
Good Luck!
thank you
a/thread
are daemon threads so the JVM does not wait for them when exiting. Maybe something else is making your JVM terminate....
forget what I said, your loop is not in a a/thread
My guess would be something in (job)
blocks the thread meaning the loop never completes