Do you remember my magic issue about async stuck? I promised you to write here if I will learn something new:
(defonce queue (chan 100))
(defonce queue-warehouse (chan 100))
(defonce worker
(go-loop []
(let [job (<! queue)]
(l/debug "run job" job)
(when-not (nil? job)
(<! (thread
(try
(job)
(catch Throwable ex
(l/error ex)))))
(recur)))))
(def worker-warehouse
(doall
(repeat 3
(go-loop []
(let [job (<! queue-warehouse)]
(l/debug "run warehouse job" job)
(when-not (nil? job)
(<! (thread
(try
(job)
(catch Throwable ex
(l/error ex)))))
(recur)))))))
(defn add-job [ch f]
(l/debug "async: add" f "to" ch ": " (>!! ch f))
(l/debug "worker:" (clojure.core.async.impl.protocols/closed? worker)
"worker-warehouse" (pr-str (map clojure.core.async.impl.protocols/closed? worker-warehouse))))
And while everything stuck I see:
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@671e9b85 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
so it means the task was added correctly and workers are not closed.but for some reason all async things stop run at the same moment
still I have no idea why this happening, but I have learned queues and workers didn’t crash and still running
I can only observe it is happening after about 3 weeks of running the app
so this is super hard to debug
so… like last time I have still no idea WTF
> (l/debug “run job” job) and > (l/debug “run warehouse job” job) no appear in logs
so it looks like workers are running because of clojure.core.async.impl.protocols/closed?
and new tasks are added because of (l/debug "async: add" f "to" ch ": " (>!! ch f))
but anyway this things are never run
How is it possible?
I can’t find any logic here
Only 1 suspicious things is the time needed to make everything stuck
the only 1 explanation which I have is something in worker
start to do job endless in time which doesn’t make sense
because there are no loops there
ok I have crazy idea… I am curious what will happen if SOAP API (third party system) will keep connection endless, because of the bug on they side. If there is any timeout. Probably not this, but worth to check
@kwladyka what calls add-job
?
do you add jobs to worker
or worker-warehouse
?
why both?
do you log activity inside your jobs? enter/exit etc?
why worker-warehouse
is not defonce?
my guess is your jobs are adding things to the work queue, which is a very easy way to lock things up
your workers are waiting for existing jobs to finish before pulling from the work queue again, and your jobs are waiting for space in the work queue to add items before exiting
@serioga
> what calls add-job
?
What do you mean? Exactly what is in the code. You probably asking about (>!! ch f)
part.
> do you add jobs to worker
or worker-warehouse
?
both
> do you log activity inside your jobs?
> enter/exit etc?
yes and there is silence
> why worker-warehouse
is not defonce?
I was testing something and forgot to change before paste
@hiredman, no because I got from add-job
clear feadback the job was added
async: add etingroup.sync$sync_auto$fn__24044@671e9b85 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
but are you publishing to queue or warehouse-queue from the job?
(defn add-job [ch f]
(l/debug "async: add" f "to" ch ": " (>!! ch f))
(l/debug "worker:" (clojure.core.async.impl.protocols/closed? worker)
"worker-warehouse" (pr-str (map clojure.core.async.impl.protocols/closed? worker-warehouse))))
it gets 2 parameters, first is the channel. I add this to both queues«What do you mean? Exactly what is in the code.» In your code this function is declared but not used
the other very common thing is you've been messing around in a repl for while, and who knows what state your repl is in
I mean from the job
like when f is run, does it call add-job
are your jobs recursive
@serioga oh sorry I misread
«yes and there is silence» so there are no job which are started but not finished? they just don't start?
> the other very common thing is you’ve been messing around in a repl for while, and who knows what state your repl is in the bug is on production
oh, then what I said the first time almost certainly
you have a feedback loop
Why do you think so?
> so there are no job which are started but not finished? any job start or finish, silence in logs. Logs showing only adding to queue
because it is an easy mistake to make when you are trying to build some kind of work queue thing in core.async instead of using executors directly
but it can be overlooked in testing if your buffer sizes are large enough
still I am adding to queue and I am sure because I see this in logs. Workers still exists, because I see this in logs.
but I don’t see (l/debug "run job" job)
or any other logs
so nothing is run anymore
right
I have only one explanation for that
because you have jobs running that cannot complete running
worker
has to freeze for some reason in something endless or there is bug in modules which I use
yes, right
because they are waiting for space in the queue, but there won't be space in the queue until the job exits so the worker go loops consume from the queue
but because I don’t use loops in my jobs… that is strange
unless endless API query can exist
or somethingl ike that
literally you don't use loops? or do you mean your jobs don't have a recursive structure
the actual structure of your jobs doesn't matter, it only matters if they ever call add-job
> because they are waiting for space in the queue, but there won’t be space in the queue until the job exits so the worker go loops consume from the queue no. As I said logs show clear the job is added and not waiting for space in the queue
I think I don’t use loops in this jobs and there is no recursive structure
"And while everything stuck I see:"
is everything stuck or not?
This is API query mainly and data processing between
«unless endless API query can exist» If you are not sure then better to add timeout channels
do you call add-job in you jobs?
Making top level def go routines is an antipattern
everything stuck expect adding new items to the queue
to which queue?
«do you call add-job in you jobs?» this is top secret, who and how call it 🙂
so to make this even more clear. I added 2 queues instead of 1 because of this bug. Before there was only 1 queue and issue was the same.
App add to queue once per 30 minutes
yeah
definitely what I am saying
which is why adding another queue just sort of works around it
it has the same effect as increasing the buffer size of the channel
> «do you call add-job in you jobs?» now I add in queue to qeueue-warehouse before I didn’t do this and the issue was the same
you have the same feedback loop, just slightly larger
get rid of this code, use an executor directly
«Before there was only 1 queue» Which one? Warehouse?
> Making top level def go routines is an antipattern What is right pattern for this case then? @ghadi
> «Before there was only 1 queue»
> Which one? Warehouse?
no, queue
Call a function passing it the input chan, that fn returns a goroutine
executors by default have an unbounded queue which will break your feedback loop (but unbounded queues can introduce other problems)
«no, queue
»
so you don't need parallel execution for you jobs?
you could do something like setting the buffer size on the channel to Long/MAX_VALUE
but like, just use an executor
As I mentioned before I added queue-warehouse
to split things to easier debug and to speed the process. But before it was working with only one queueu
I don’t think the buffer size is the issue
it is
and it is why you don't see the issue in testing
I did test running the 100000000 times and this didn’t stuck
you have a large enough buffer to avoid filling the channel in tests
just do (chan) without a buffer in your tests
«I don’t think the buffer size is the issue» Well, anyway you should decide behaviour if there is no room in queue chan like dropping or sliding buffer
lests assume this will stuck because of the buffer size. Even then from time to time worker will finish the job and new job can come in.
but this things don’t happen
it depends
@kwladyka do you have any other core.async operations in your project?
@serioga no I use only here async
if the channel ever fills up and then it will stop
buffer
it is like feedback between a speaker and a mic
below some volume you won't get feedback
not really, because here is only one direction cron -> queue -> queue-warehouse
and before queue-warehouse didn’t exist
do you ever call add-job from within a job
«queue -> queue-warehouse» I don't see this in your code
as I mentioned before only from queue
to add to queue-warehouse
> I don’t see this in your code I know, this is in different part
(defn sync-warehouse []
(let [shop-ids (keys conf/shops)
products-reservations (->> (pmap #(vector % (orders/atomstore-orders-reservations->products-reservation %)) shop-ids)
(into {}))
products-from-orders-not-synchronized-yet (->> (pmap #(vector % (orders/atomstore-orders-not-synchronized-yet->products-reservation %)) shop-ids)
(into {}))
products-reservations+not-synchronized-orders (reduce (fn [m shop-id]
(assoc m shop-id
(->> ((juxt products-reservations products-from-orders-not-synchronized-yet) shop-id)
(apply merge-with +))))
{} shop-ids)
stock-change (orders/products-reservations->stock-change-for-shop products-reservations+not-synchronized-orders)
products (products/wfirma->products :bk)
update-stock (fn [shop-id]
(->> products
(map (fn [product]
(if-let [reservation (get-in stock-change [shop-id (:code product)])]
(update product :quantity #(- % reservation))
product)))))
stock-round-down (fn [stock]
(map (fn [product]
(update product :quantity #(Math/floor %)))
stock))]
(add-job queue-warehouse #(products/products->atomstore :kh (update-stock :kh) {:prices? true
:stock? true}))
(add-job queue-warehouse #(products/products->atomstore :yms (update-stock :yms) {:stock? true}))
(add-job queue-warehouse #(products/products->atomstore :hurt (stock-round-down (update-stock :hurt)) {:stock? true}))
(add-job queue-warehouse #(warehouse/update-cache-supplier-orders products products-reservations+not-synchronized-orders))))
(defn sync-auto []
(if (state/get-sync)
(do
(slack/send-info "> > > Sync auto start add to queue")
(let [shop-ids (keys conf/shops)]
(doseq [shop-id shop-ids]
(add-job queue #(orders/atomstore-sync->wfirma shop-id :bk)))
(add-job queue sync-warehouse)
(doseq [shop-id shop-ids]
(add-job queue #(invoices/send-ivoices shop-id))))
(slack/send-info "< < < Sync auto added tasks to queue"))
(slack/send-warning "! ! ! Synchronization if off. Nothing to do.")))
there is no other place with add-job
so my guess is queue-worker
stuck in something endless…. But I don’t see how. Also I don’t see any CPU overusage.
or memory issues
which could say me there is endless loop issue
@kwladyka «And while everything stuck I see:» you see it every 30 minute after everything is stuck?
uh I lost to which my reply you reffer
Where sync-auto
is called from? 🙂
every 30 minutes by some kind of cron solution
all of which points to a deadlock waiting on your queues
I refer to https://clojurians.slack.com/archives/C05423W6H/p1582798197042900
> all of which points to a deadlock waiting on your queues
Where in your opinion can be deadlock?
Nothing block queue-warehouse
the things go only into 1 direction
@serioga yes, I see this every 30 minutes
https://clojurians.slack.com/archives/C05423W6H/p1582830301101200 and jobs don't run?
0-02-26 09:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 09:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.slack:26] - Slack send: {:color "#0000FF", :text "< < < Sync auto added tasks to queue"}
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.slack:26] - Slack send: {:color "#0000FF", :text "> > > Sync auto start add to queue"}
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@1ebb3fc to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@40a18579 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@28d752b5 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_warehouse@647d254d to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@1b54dc3e to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@36c12338 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@5f1cb6ba to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.slack:26] - Slack send: {:color "#0000FF", :text "< < < Sync auto added tasks to queue"}
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.slack:26] - Slack send: {:color "#0000FF", :text "> > > Sync auto start add to queue"}
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@141dd6dc to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@3b5a82d3 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@65462cf9 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_warehouse@647d254d to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@2ed18f00 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@671e9b85 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
yes and jobs don’t start
so, as @hiredman said you call add-job
from inside you jobs
yes, as I mentioned now I call add-job
from qeueue
to queue-warehouse
and before I didn’t have queue-warehouse
the process was running in 1 shot
only cron was adding to queue
every 30 minutes
nothing more
@hiredman can you point where do you see possibility of deadlock? I don’t see this.
well, only ManyToManyChannel@7a4a107e is alive in log, it look like it is queue
I don't know, I don't have all your code, and I don't really want to read it, but the situation you are describing matches one
> well, only ManyToManyChannel@7a4a107e is alive in log, it look like it is queue
yes it makes sense, because to queue-warehouse
add only from queue
look, just remove the buffers from your channels and run your tests and see if things lock up
@hiredman I know it looks like that, but I don’t see logical possibility of that. Especially also there was only 1 queue to which cron was adding
I was trying this
I was running this even 1000000 times to see the result
when logic and reality fight, reality always wins
I think there is no other way to solve this then add more debug to determine exect line where it stops
At least I can’t find another solution
this is the most mysterious bug which I ever had
or you could just use an executor 🙂
@kwladyka
well, I feel like >!!
is blocked inside your thread
— to many jobs for fixed size chan
my suggestions:
1. replace >!!
with non-blocking put!
2. add logging in add-job
before calling >!!
(now you will see nothing if >!!
stuck)
(def queue (java.util.concurrent.Executors/newFixedThreadPool. 3))
(defn add-job [queue f]
(.execute queue f))
replaces all the core.async code above(well, you'll need to wrap f to get the logging)
> well, I feel like >!!
is blocked inside your thread
— to many jobs for fixed size chan
> my suggestions:
lets assume this can be true. So what about:
1. we see in logs new jobs are added to queue
2. even if queue
-> queue-warehouse
will wait because of the buffer size from time to time worker-warehouse
will finish the job, so the new job will be added.
Right? Do I miss something?
of course
like your analysis is based on the code you are looking at, are you sure it is exactly the same as the code in production?
yes
the logs you shared, as already mentioned, only show add-job being called with the same channel over and over again
which is not the behavior your'd get from the code you shared
it is
(defn add-job [ch f]
(l/debug "async: add" f "to" ch ": " (>!! ch f))
(l/debug "worker:" (clojure.core.async.impl.protocols/closed? worker)
"worker-warehouse" (pr-str (map clojure.core.async.impl.protocols/closed? worker-warehouse))))
it isn't
(>!! ch f)
because add-job, in the code you shared, is called for both queues
so you should get logging where 'ch' is two different channels
is it about current implementation or future one? https://clojurians.slack.com/archives/C05423W6H/p1582831471121700
@serioga it works like that currently
sync-auto
add to queue
. In queue
add to queue-warehouse
. Everything is correct.
also you only have 1 go loop consuming from queue-warehouse
you just repeat the channel returned 3 times
which make from this 3 workers
no
(repeat 3 x)
makes a seq of the value of x
3 times
it doesn't evaluate x
three times
user=> (repeat 3 (println "foo"))
foo
(nil nil nil)
user=>
hmm I have to check this, but I found this somewhere as a solution to make X workers
you want repeatedly
yes, I fixed did
1. We don't see failed attempts to add job in logs for current implementation.
2. worker-warehouse
cannot finish any job, it waits for adding job to the channel which is not consumed anymore.
https://clojurians.slack.com/archives/C05423W6H/p1582831471121700
@serioga correct
at least I think it is
there is nothing to check, it is empirically the case
Ah! Now I understand why testing queue-warehouse did not work parallel https://clojurians.slack.com/archives/C05423W6H/p1582831795128100
damn… you are right here
my mistake
tricky bug 🙂
but initially there was single queue anyway...
yes, not everything in google work like it should 😉
and even with 1 warehouse-worker this should be fine
only a little slower
but nice catch @hiredman 👍
I don’t know how I missed that
but that is an example of the kind of human factor thing
I'd return to implementation without queue-warehouse
you need to make a diagram, not code
no one wants to read code
s/no one/I/
I will fix this 3 workers thing, add more debugs, etc. Thank for help. I know this was hard discussion, but the issue is also very hard to figure out.
@kwladyka just do these two simple changes and remove queue-warehouse back (as not relevant) https://clojurians.slack.com/archives/C05423W6H/p1582831143118000
sure, I will also try decrease buffer size. Just I need to add more things to debug this
last time I thought maybe worker crash in some way, but it looks like not
what for? https://clojurians.slack.com/archives/C05423W6H/p1582832534142300
I don’t know. To determine some magic 😉 For example if with buffer 10 it will stuck after a week always this will mean something. I don’t think this is the case, but have to experiment.
programming is not magic better think longer but decide logically
I have limited time to solve this issue
need to get rid of those top level processes
but will do my the best
and stop logging I/O inside a go block
pass in a log channel (with dropping buffer) to which workers can send log entries
or use a thread
and not care
my opinion: using put!
you eliminate possibility to block
adding logging is “just in case”
thanks for tips, I will tell you if I will find the solution
using put! is terrible
«and stop logging I/O inside a go block» generally it is correct but I don't bother so hard 🙂 I configure async loggers instead
asking to hit the 1024 pending puts limit
«asking to hit the 1024 pending puts limit» fine. Then he will need to think about back-pressure but I doubt in this particular case
it's incorrect/problematic advice
I think that architecture of this solution is problematic itself I'm not ready to offer architectural changes quickly.
Function put!
exists and should be used when it has sense
right, it doesn't make sense here @serioga
architecture is the most important part of getting these little concurrent machines to work well
where does new data come from? where does it go? what happens when the data is produced fast than it can be consumed? etc. etc.
if you're able to reproduce while running in debug mode (say in intellij) then have you tried to inspect your threads to see what blocks?
go routines will not show up in thread stack dumps except when they're active
if they're waiting on a channel op they're not actually running on thread
async/thread will show up
you can also inspect channels and see their internal state
> you can also inspect channels and see their internal state How?
> if you’re able to reproduce while running in debug mode (say in intellij) then have you tried to inspect your threads to see what blocks? I can’t
This video helps a lot in understanding internals: https://vimeo.com/100518968
as one of the authors of core.async, I would avoid breaking into the channel
you need to clarify intent, develop hypotheses, and validate with experiments
observing the impl details of channels is inherently racey
> I would avoid breaking into the channel What do you mean?
I think @fmjrey was suggesting looking inside the channel
oh I thought you are writing to me
(racey, for the same reason the closed?
method is not public API -- it's almost guaranteed to be racey)
about somethign what I said
I am writing to you too
whatever is responsible for creating workers should pass their input channels as arguments to those go routines, along with maybe a logging channel
(defn launch-worker [in cancel log]
(go ....))
(defonce worker
(go-loop []
(let [job (<! queue)]
(l/debug "run job" job)
(when-not (nil? job)
(<! (thread
(try
(job)
(catch Throwable ex
(l/error ex)))))
(recur)))))
hmm what is wrong with this?that's a top level side effect that starts a process when the namespace loads
oh I see
so the best practice is to launch-worker
each time when run the job? And each time create a channel for this purpose?
Or only launch workers once with initialization?
you still want to launch your workers once, I mean don't do it with global defs
@kwladyka you should read and understand the pmax
example https://stuartsierra.com/2013/12/08/parallel-processing-with-core-async
ok thanks, I need to take a break. It is too much for me today 🙂
I did not mean to change the internal state, only to inspect it so you know if buffer is full, the number of pending put, etc
still a bad idea
observing concurrent devices changes the rhythm of the program
just like stopping the world in a debugger might not show you races that happen commonly without the debugger
you don't need to stop execution just wait for the deadlock state and then inspect