core-async

kwladyka 2020-02-27T10:09:57.042900Z

Do you remember my magic issue about async stuck? I promised you to write here if I will learn something new:

(defonce queue (chan 100))
(defonce queue-warehouse (chan 100))

(defonce worker
  (go-loop []
    (let [job (<! queue)]
      (l/debug "run job" job)
      (when-not (nil? job)
        (<! (thread
              (try
                (job)
                (catch Throwable ex
                  (l/error ex)))))
        (recur)))))

(def worker-warehouse
  (doall
    (repeat 3
      (go-loop []
        (let [job (<! queue-warehouse)]
          (l/debug "run warehouse job" job)
          (when-not (nil? job)
            (<! (thread
                  (try
                    (job)
                    (catch Throwable ex
                      (l/error ex)))))
            (recur)))))))

(defn add-job [ch f]
  (l/debug "async: add" f "to" ch ": " (>!! ch f))
  (l/debug "worker:" (clojure.core.async.impl.protocols/closed? worker)
           "worker-warehouse" (pr-str (map clojure.core.async.impl.protocols/closed? worker-warehouse))))
And while everything stuck I see:
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@671e9b85 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
so it means the task was added correctly and workers are not closed.

kwladyka 2020-02-27T10:10:29.043300Z

but for some reason all async things stop run at the same moment

kwladyka 2020-02-27T10:11:19.044Z

still I have no idea why this happening, but I have learned queues and workers didn’t crash and still running

kwladyka 2020-02-27T10:12:13.044800Z

I can only observe it is happening after about 3 weeks of running the app

kwladyka 2020-02-27T10:12:22.045Z

so this is super hard to debug

kwladyka 2020-02-27T10:12:39.045400Z

so… like last time I have still no idea WTF

kwladyka 2020-02-27T10:17:14.045800Z

> (l/debug “run job” job) and > (l/debug “run warehouse job” job) no appear in logs

kwladyka 2020-02-27T10:18:12.046700Z

so it looks like workers are running because of clojure.core.async.impl.protocols/closed? and new tasks are added because of (l/debug "async: add" f "to" ch ": " (>!! ch f)) but anyway this things are never run

kwladyka 2020-02-27T10:18:57.047300Z

How is it possible?

kwladyka 2020-02-27T10:21:45.047500Z

I can’t find any logic here

kwladyka 2020-02-27T10:22:08.048Z

Only 1 suspicious things is the time needed to make everything stuck

kwladyka 2020-02-27T10:24:55.048700Z

the only 1 explanation which I have is something in worker start to do job endless in time which doesn’t make sense

kwladyka 2020-02-27T10:25:15.049Z

because there are no loops there

kwladyka 2020-02-27T10:46:15.050700Z

ok I have crazy idea… I am curious what will happen if SOAP API (third party system) will keep connection endless, because of the bug on they side. If there is any timeout. Probably not this, but worth to check

serioga 2020-02-27T18:20:02.051200Z

@kwladyka what calls add-job ?

serioga 2020-02-27T18:21:04.051800Z

do you add jobs to worker or worker-warehouse ? why both?

serioga 2020-02-27T18:21:58.052500Z

do you log activity inside your jobs? enter/exit etc?

serioga 2020-02-27T18:23:43.053Z

why worker-warehouse is not defonce?

2020-02-27T18:26:23.053600Z

my guess is your jobs are adding things to the work queue, which is a very easy way to lock things up

2020-02-27T18:27:10.054400Z

your workers are waiting for existing jobs to finish before pulling from the work queue again, and your jobs are waiting for space in the work queue to add items before exiting

kwladyka 2020-02-27T18:35:05.055800Z

@serioga > what calls add-job ? What do you mean? Exactly what is in the code. You probably asking about (>!! ch f) part. > do you add jobs to worker or worker-warehouse ? both > do you log activity inside your jobs? > enter/exit etc? yes and there is silence > why worker-warehouse is not defonce? I was testing something and forgot to change before paste

kwladyka 2020-02-27T18:36:05.056500Z

@hiredman, no because I got from add-job clear feadback the job was added

kwladyka 2020-02-27T18:36:46.057100Z

async: add etingroup.sync$sync_auto$fn__24044@671e9b85 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e : true

2020-02-27T18:37:00.057400Z

but are you publishing to queue or warehouse-queue from the job?

kwladyka 2020-02-27T18:37:40.058400Z

(defn add-job [ch f]
  (l/debug "async: add" f "to" ch ": " (>!! ch f))
  (l/debug "worker:" (clojure.core.async.impl.protocols/closed? worker)
           "worker-warehouse" (pr-str (map clojure.core.async.impl.protocols/closed? worker-warehouse))))
it gets 2 parameters, first is the channel. I add this to both queues

serioga 2020-02-27T18:37:57.059100Z

«What do you mean? Exactly what is in the code.» In your code this function is declared but not used

2020-02-27T18:38:09.059400Z

the other very common thing is you've been messing around in a repl for while, and who knows what state your repl is in

2020-02-27T18:38:16.059600Z

I mean from the job

2020-02-27T18:38:24.060Z

like when f is run, does it call add-job

2020-02-27T18:38:41.060700Z

are your jobs recursive

kwladyka 2020-02-27T18:38:58.061200Z

@serioga oh sorry I misread

serioga 2020-02-27T18:39:05.061500Z

«yes and there is silence» so there are no job which are started but not finished? they just don't start?

kwladyka 2020-02-27T18:39:23.061900Z

> the other very common thing is you’ve been messing around in a repl for while, and who knows what state your repl is in the bug is on production

2020-02-27T18:39:44.062300Z

oh, then what I said the first time almost certainly

2020-02-27T18:39:55.062600Z

you have a feedback loop

kwladyka 2020-02-27T18:40:10.062900Z

Why do you think so?

kwladyka 2020-02-27T18:41:19.065300Z

> so there are no job which are started but not finished? any job start or finish, silence in logs. Logs showing only adding to queue

2020-02-27T18:41:27.065500Z

because it is an easy mistake to make when you are trying to build some kind of work queue thing in core.async instead of using executors directly

2020-02-27T18:41:55.066100Z

but it can be overlooked in testing if your buffer sizes are large enough

kwladyka 2020-02-27T18:42:18.066500Z

still I am adding to queue and I am sure because I see this in logs. Workers still exists, because I see this in logs.

kwladyka 2020-02-27T18:43:02.067Z

but I don’t see (l/debug "run job" job) or any other logs

kwladyka 2020-02-27T18:43:08.067300Z

so nothing is run anymore

2020-02-27T18:43:11.067600Z

right

kwladyka 2020-02-27T18:43:19.068Z

I have only one explanation for that

2020-02-27T18:43:22.068100Z

because you have jobs running that cannot complete running

kwladyka 2020-02-27T18:43:47.068900Z

worker has to freeze for some reason in something endless or there is bug in modules which I use

kwladyka 2020-02-27T18:44:02.069300Z

yes, right

2020-02-27T18:44:07.069600Z

because they are waiting for space in the queue, but there won't be space in the queue until the job exits so the worker go loops consume from the queue

kwladyka 2020-02-27T18:44:16.069900Z

but because I don’t use loops in my jobs… that is strange

kwladyka 2020-02-27T18:44:40.070500Z

unless endless API query can exist

kwladyka 2020-02-27T18:44:46.070800Z

or somethingl ike that

2020-02-27T18:44:49.071Z

literally you don't use loops? or do you mean your jobs don't have a recursive structure

2020-02-27T18:45:15.071900Z

the actual structure of your jobs doesn't matter, it only matters if they ever call add-job

kwladyka 2020-02-27T18:45:15.072Z

> because they are waiting for space in the queue, but there won’t be space in the queue until the job exits so the worker go loops consume from the queue no. As I said logs show clear the job is added and not waiting for space in the queue

kwladyka 2020-02-27T18:46:08.072800Z

I think I don’t use loops in this jobs and there is no recursive structure

2020-02-27T18:46:08.072900Z

"And while everything stuck I see:"

2020-02-27T18:46:13.073300Z

is everything stuck or not?

kwladyka 2020-02-27T18:46:20.073700Z

This is API query mainly and data processing between

serioga 2020-02-27T18:46:23.074100Z

«unless endless API query can exist» If you are not sure then better to add timeout channels

2020-02-27T18:46:33.074700Z

do you call add-job in you jobs?

ghadi 2020-02-27T18:46:44.075300Z

Making top level def go routines is an antipattern

kwladyka 2020-02-27T18:46:47.075400Z

everything stuck expect adding new items to the queue

2020-02-27T18:46:56.075600Z

to which queue?

serioga 2020-02-27T18:47:21.076200Z

«do you call add-job in you jobs?» this is top secret, who and how call it 🙂

kwladyka 2020-02-27T18:47:47.076800Z

so to make this even more clear. I added 2 queues instead of 1 because of this bug. Before there was only 1 queue and issue was the same.

kwladyka 2020-02-27T18:47:58.077200Z

App add to queue once per 30 minutes

2020-02-27T18:47:58.077300Z

yeah

2020-02-27T18:48:07.077600Z

definitely what I am saying

2020-02-27T18:48:20.078100Z

which is why adding another queue just sort of works around it

2020-02-27T18:48:45.078800Z

it has the same effect as increasing the buffer size of the channel

kwladyka 2020-02-27T18:48:45.078900Z

> «do you call add-job in you jobs?» now I add in queue to qeueue-warehouse before I didn’t do this and the issue was the same

2020-02-27T18:49:14.079500Z

you have the same feedback loop, just slightly larger

2020-02-27T18:50:14.080400Z

get rid of this code, use an executor directly

serioga 2020-02-27T18:50:24.080900Z

«Before there was only 1 queue» Which one? Warehouse?

kwladyka 2020-02-27T18:50:35.081200Z

> Making top level def go routines is an antipattern What is right pattern for this case then? @ghadi

kwladyka 2020-02-27T18:50:51.081800Z

> «Before there was only 1 queue» > Which one? Warehouse? no, queue

ghadi 2020-02-27T18:51:13.082900Z

Call a function passing it the input chan, that fn returns a goroutine

2020-02-27T18:51:13.083Z

executors by default have an unbounded queue which will break your feedback loop (but unbounded queues can introduce other problems)

serioga 2020-02-27T18:51:50.083600Z

«no, queue» so you don't need parallel execution for you jobs?

2020-02-27T18:52:29.084500Z

you could do something like setting the buffer size on the channel to Long/MAX_VALUE

2020-02-27T18:52:39.085Z

but like, just use an executor

kwladyka 2020-02-27T18:52:50.085300Z

As I mentioned before I added queue-warehouse to split things to easier debug and to speed the process. But before it was working with only one queueu

kwladyka 2020-02-27T18:53:12.085600Z

I don’t think the buffer size is the issue

2020-02-27T18:53:15.085800Z

it is

2020-02-27T18:53:28.086300Z

and it is why you don't see the issue in testing

kwladyka 2020-02-27T18:53:40.086800Z

I did test running the 100000000 times and this didn’t stuck

2020-02-27T18:53:44.086900Z

you have a large enough buffer to avoid filling the channel in tests

2020-02-27T18:54:03.087300Z

just do (chan) without a buffer in your tests

serioga 2020-02-27T18:55:12.088800Z

«I don’t think the buffer size is the issue» Well, anyway you should decide behaviour if there is no room in queue chan like dropping or sliding buffer

kwladyka 2020-02-27T18:56:51.089900Z

lests assume this will stuck because of the buffer size. Even then from time to time worker will finish the job and new job can come in.

kwladyka 2020-02-27T18:57:10.090800Z

but this things don’t happen

2020-02-27T18:57:10.090900Z

it depends

serioga 2020-02-27T18:57:13.091100Z

@kwladyka do you have any other core.async operations in your project?

kwladyka 2020-02-27T18:57:28.091700Z

@serioga no I use only here async

2020-02-27T18:57:33.091900Z

if the channel ever fills up and then it will stop

2020-02-27T18:57:36.092100Z

buffer

2020-02-27T18:57:59.092400Z

it is like feedback between a speaker and a mic

2020-02-27T18:58:17.092900Z

below some volume you won't get feedback

kwladyka 2020-02-27T18:58:23.093200Z

not really, because here is only one direction cron -> queue -> queue-warehouse

kwladyka 2020-02-27T18:58:38.093700Z

and before queue-warehouse didn’t exist

2020-02-27T18:58:52.094200Z

do you ever call add-job from within a job

serioga 2020-02-27T18:59:12.094900Z

«queue -> queue-warehouse» I don't see this in your code

kwladyka 2020-02-27T18:59:15.095Z

as I mentioned before only from queue to add to queue-warehouse

kwladyka 2020-02-27T18:59:34.095300Z

> I don’t see this in your code I know, this is in different part

kwladyka 2020-02-27T18:59:54.095600Z

(defn sync-warehouse []
  (let [shop-ids (keys conf/shops)
        products-reservations (->> (pmap #(vector % (orders/atomstore-orders-reservations->products-reservation %)) shop-ids)
                                   (into {}))
        products-from-orders-not-synchronized-yet (->> (pmap #(vector % (orders/atomstore-orders-not-synchronized-yet->products-reservation %)) shop-ids)
                                                       (into {}))
        products-reservations+not-synchronized-orders (reduce (fn [m shop-id]
                                                                (assoc m shop-id
                                                                         (->> ((juxt products-reservations products-from-orders-not-synchronized-yet) shop-id)
                                                                              (apply merge-with +))))
                                                              {} shop-ids)
        stock-change (orders/products-reservations->stock-change-for-shop products-reservations+not-synchronized-orders)
        products (products/wfirma->products :bk)
        update-stock (fn [shop-id]
                       (->> products
                            (map (fn [product]
                                   (if-let [reservation (get-in stock-change [shop-id (:code product)])]
                                     (update product :quantity #(- % reservation))
                                     product)))))
        stock-round-down (fn [stock]
                           (map (fn [product]
                                  (update product :quantity #(Math/floor %)))
                                stock))]
    (add-job queue-warehouse #(products/products->atomstore :kh (update-stock :kh) {:prices? true
                                                                    :stock? true}))
    (add-job queue-warehouse #(products/products->atomstore :yms (update-stock :yms) {:stock? true}))
    (add-job queue-warehouse #(products/products->atomstore :hurt (stock-round-down (update-stock :hurt)) {:stock? true}))
    (add-job queue-warehouse #(warehouse/update-cache-supplier-orders products products-reservations+not-synchronized-orders))))

(defn sync-auto []
  (if (state/get-sync)
    (do
      (slack/send-info "> > > Sync auto start add to queue")
      (let [shop-ids (keys conf/shops)]
        (doseq [shop-id shop-ids]
          (add-job queue #(orders/atomstore-sync->wfirma shop-id :bk)))
        (add-job queue sync-warehouse)
        (doseq [shop-id shop-ids]
          (add-job queue #(invoices/send-ivoices shop-id))))
      (slack/send-info "< < < Sync auto added tasks to queue"))
    (slack/send-warning "! ! ! Synchronization if off. Nothing to do.")))

kwladyka 2020-02-27T19:00:29.096Z

there is no other place with add-job

kwladyka 2020-02-27T19:01:16.097Z

so my guess is queue-worker stuck in something endless…. But I don’t see how. Also I don’t see any CPU overusage.

kwladyka 2020-02-27T19:01:27.097400Z

or memory issues

kwladyka 2020-02-27T19:01:34.097700Z

which could say me there is endless loop issue

serioga 2020-02-27T19:01:47.098100Z

@kwladyka «And while everything stuck I see:» you see it every 30 minute after everything is stuck?

kwladyka 2020-02-27T19:02:47.098600Z

uh I lost to which my reply you reffer

serioga 2020-02-27T19:03:03.099Z

Where sync-auto is called from? 🙂

kwladyka 2020-02-27T19:03:31.099600Z

every 30 minutes by some kind of cron solution

2020-02-27T19:03:32.099700Z

all of which points to a deadlock waiting on your queues

kwladyka 2020-02-27T19:04:28.100900Z

> all of which points to a deadlock waiting on your queues Where in your opinion can be deadlock? Nothing block queue-warehouse the things go only into 1 direction

kwladyka 2020-02-27T19:05:01.101200Z

@serioga yes, I see this every 30 minutes

serioga 2020-02-27T19:05:40.101800Z

https://clojurians.slack.com/archives/C05423W6H/p1582830301101200 and jobs don't run?

kwladyka 2020-02-27T19:05:41.102100Z

0-02-26 09:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 09:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.slack:26] - Slack send: {:color "#0000FF", :text "< < < Sync auto added tasks to queue"}
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.slack:26] - Slack send: {:color "#0000FF", :text "> > > Sync auto start add to queue"}
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@1ebb3fc to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@40a18579 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@28d752b5 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_warehouse@647d254d to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@1b54dc3e to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@36c12338 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@5f1cb6ba to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:15:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.slack:26] - Slack send: {:color "#0000FF", :text "< < < Sync auto added tasks to queue"}
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.slack:26] - Slack send: {:color "#0000FF", :text "> > > Sync auto start add to queue"}
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@141dd6dc to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@3b5a82d3 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24036@65462cf9 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_warehouse@647d254d to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@2ed18f00 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:46] - async: add etingroup.sync$sync_auto$fn__24044@671e9b85 to clojure.core.async.impl.channels.ManyToManyChannel@7a4a107e :  true
20-02-26 10:45:33 etingroup-sync-ff7dd5fdb-sf24n DEBUG [etingroup.sync:47] - worker: false worker-warehouse (false false false)

kwladyka 2020-02-27T19:07:26.102800Z

yes and jobs don’t start

serioga 2020-02-27T19:08:02.103200Z

so, as @hiredman said you call add-job from inside you jobs

kwladyka 2020-02-27T19:08:34.103800Z

yes, as I mentioned now I call add-job from qeueue to queue-warehouse

kwladyka 2020-02-27T19:08:56.104100Z

and before I didn’t have queue-warehouse

kwladyka 2020-02-27T19:09:04.104400Z

the process was running in 1 shot

kwladyka 2020-02-27T19:09:47.104800Z

only cron was adding to queue every 30 minutes

kwladyka 2020-02-27T19:09:54.105Z

nothing more

kwladyka 2020-02-27T19:11:31.105600Z

@hiredman can you point where do you see possibility of deadlock? I don’t see this.

serioga 2020-02-27T19:12:14.106800Z

well, only ManyToManyChannel@7a4a107e is alive in log, it look like it is queue

2020-02-27T19:12:14.106900Z

I don't know, I don't have all your code, and I don't really want to read it, but the situation you are describing matches one

kwladyka 2020-02-27T19:12:40.107300Z

> well, only ManyToManyChannel@7a4a107e is alive in log, it look like it is queue

kwladyka 2020-02-27T19:12:56.107900Z

yes it makes sense, because to queue-warehouse add only from queue

2020-02-27T19:13:29.109Z

look, just remove the buffers from your channels and run your tests and see if things lock up

kwladyka 2020-02-27T19:13:43.109600Z

@hiredman I know it looks like that, but I don’t see logical possibility of that. Especially also there was only 1 queue to which cron was adding

kwladyka 2020-02-27T19:13:53.109800Z

I was trying this

kwladyka 2020-02-27T19:14:05.110400Z

I was running this even 1000000 times to see the result

2020-02-27T19:14:58.111500Z

when logic and reality fight, reality always wins

🔥 1
kwladyka 2020-02-27T19:16:54.114900Z

I think there is no other way to solve this then add more debug to determine exect line where it stops

kwladyka 2020-02-27T19:17:13.115600Z

At least I can’t find another solution

kwladyka 2020-02-27T19:17:32.116100Z

this is the most mysterious bug which I ever had

2020-02-27T19:18:15.117100Z

or you could just use an executor 🙂

serioga 2020-02-27T19:19:03.118Z

@kwladyka well, I feel like >!! is blocked inside your thread — to many jobs for fixed size chan my suggestions: 1. replace >!! with non-blocking put! 2. add logging in add-job before calling >!! (now you will see nothing if >!! stuck)

2020-02-27T19:20:56.119Z

(def queue (java.util.concurrent.Executors/newFixedThreadPool. 3))

(defn add-job [queue f]
  (.execute queue f))
replaces all the core.async code above

2020-02-27T19:21:28.119300Z

(well, you'll need to wrap f to get the logging)

kwladyka 2020-02-27T19:24:31.121700Z

> well, I feel like >!! is blocked inside your thread — to many jobs for fixed size chan > my suggestions: lets assume this can be true. So what about: 1. we see in logs new jobs are added to queue 2. even if queue -> queue-warehouse will wait because of the buffer size from time to time worker-warehouse will finish the job, so the new job will be added. Right? Do I miss something?

2020-02-27T19:25:33.122400Z

of course

2020-02-27T19:26:02.123Z

like your analysis is based on the code you are looking at, are you sure it is exactly the same as the code in production?

kwladyka 2020-02-27T19:26:18.123500Z

yes

2020-02-27T19:26:29.123800Z

the logs you shared, as already mentioned, only show add-job being called with the same channel over and over again

2020-02-27T19:26:42.124100Z

which is not the behavior your'd get from the code you shared

kwladyka 2020-02-27T19:26:49.124300Z

it is

kwladyka 2020-02-27T19:27:02.124600Z

(defn add-job [ch f]
  (l/debug "async: add" f "to" ch ": " (>!! ch f))
  (l/debug "worker:" (clojure.core.async.impl.protocols/closed? worker)
           "worker-warehouse" (pr-str (map clojure.core.async.impl.protocols/closed? worker-warehouse))))

2020-02-27T19:27:09.124900Z

it isn't

kwladyka 2020-02-27T19:27:10.125100Z

(>!! ch f)

2020-02-27T19:27:26.125500Z

because add-job, in the code you shared, is called for both queues

2020-02-27T19:27:39.125900Z

so you should get logging where 'ch' is two different channels

serioga 2020-02-27T19:28:10.126500Z

is it about current implementation or future one? https://clojurians.slack.com/archives/C05423W6H/p1582831471121700

kwladyka 2020-02-27T19:28:42.126900Z

@serioga it works like that currently

kwladyka 2020-02-27T19:29:36.127600Z

sync-auto add to queue . In queue add to queue-warehouse . Everything is correct.

2020-02-27T19:29:55.128100Z

also you only have 1 go loop consuming from queue-warehouse

2020-02-27T19:30:05.128500Z

you just repeat the channel returned 3 times

kwladyka 2020-02-27T19:30:24.129100Z

which make from this 3 workers

2020-02-27T19:30:28.129400Z

no

2020-02-27T19:30:54.130300Z

(repeat 3 x) makes a seq of the value of x 3 times

2020-02-27T19:31:05.130700Z

it doesn't evaluate x three times

2020-02-27T19:31:29.131400Z

user=> (repeat 3 (println "foo"))
foo
(nil nil nil)
user=>

kwladyka 2020-02-27T19:31:41.131900Z

hmm I have to check this, but I found this somewhere as a solution to make X workers

2020-03-05T06:52:45.208Z

you want repeatedly

kwladyka 2020-03-05T07:54:14.214Z

yes, I fixed did

serioga 2020-02-27T19:32:08.132400Z

1. We don't see failed attempts to add job in logs for current implementation. 2. worker-warehouse cannot finish any job, it waits for adding job to the channel which is not consumed anymore. https://clojurians.slack.com/archives/C05423W6H/p1582831471121700

kwladyka 2020-02-27T19:32:46.132800Z

@serioga correct

kwladyka 2020-02-27T19:32:57.133Z

at least I think it is

2020-02-27T19:33:42.133400Z

there is nothing to check, it is empirically the case

serioga 2020-02-27T19:34:11.134Z

Ah! Now I understand why testing queue-warehouse did not work parallel https://clojurians.slack.com/archives/C05423W6H/p1582831795128100

kwladyka 2020-02-27T19:34:17.134200Z

damn… you are right here

kwladyka 2020-02-27T19:34:28.134400Z

my mistake

serioga 2020-02-27T19:35:04.134900Z

tricky bug 🙂

serioga 2020-02-27T19:35:26.135900Z

but initially there was single queue anyway...

kwladyka 2020-02-27T19:35:30.136100Z

yes, not everything in google work like it should 😉

kwladyka 2020-02-27T19:35:46.136400Z

and even with 1 warehouse-worker this should be fine

kwladyka 2020-02-27T19:35:50.136600Z

only a little slower

kwladyka 2020-02-27T19:36:09.136900Z

but nice catch @hiredman 👍

kwladyka 2020-02-27T19:36:27.137300Z

I don’t know how I missed that

2020-02-27T19:36:48.137600Z

but that is an example of the kind of human factor thing

serioga 2020-02-27T19:37:21.138400Z

I'd return to implementation without queue-warehouse

ghadi 2020-02-27T19:38:23.138700Z

you need to make a diagram, not code

ghadi 2020-02-27T19:38:36.139Z

no one wants to read code

ghadi 2020-02-27T19:38:48.139300Z

s/no one/I/

kwladyka 2020-02-27T19:41:33.141500Z

I will fix this 3 workers thing, add more debugs, etc. Thank for help. I know this was hard discussion, but the issue is also very hard to figure out.

serioga 2020-02-27T19:41:41.141700Z

@kwladyka just do these two simple changes and remove queue-warehouse back (as not relevant) https://clojurians.slack.com/archives/C05423W6H/p1582831143118000

kwladyka 2020-02-27T19:42:14.142300Z

sure, I will also try decrease buffer size. Just I need to add more things to debug this

kwladyka 2020-02-27T19:42:27.142800Z

last time I thought maybe worker crash in some way, but it looks like not

serioga 2020-02-27T19:43:09.143100Z

what for? https://clojurians.slack.com/archives/C05423W6H/p1582832534142300

kwladyka 2020-02-27T19:44:30.144500Z

I don’t know. To determine some magic 😉 For example if with buffer 10 it will stuck after a week always this will mean something. I don’t think this is the case, but have to experiment.

serioga 2020-02-27T19:45:17.145300Z

programming is not magic better think longer but decide logically

kwladyka 2020-02-27T19:45:33.145900Z

I have limited time to solve this issue

ghadi 2020-02-27T19:45:37.146200Z

need to get rid of those top level processes

kwladyka 2020-02-27T19:45:40.146500Z

but will do my the best

ghadi 2020-02-27T19:45:48.146900Z

and stop logging I/O inside a go block

ghadi 2020-02-27T19:46:15.147700Z

pass in a log channel (with dropping buffer) to which workers can send log entries

ghadi 2020-02-27T19:46:23.148200Z

or use a thread and not care

serioga 2020-02-27T19:46:33.148400Z

my opinion: using put! you eliminate possibility to block adding logging is “just in case”

kwladyka 2020-02-27T19:47:25.148700Z

thanks for tips, I will tell you if I will find the solution

2020-02-27T19:48:29.149900Z

using put! is terrible

serioga 2020-02-27T19:48:38.150300Z

«and stop logging I/O inside a go block» generally it is correct but I don't bother so hard 🙂 I configure async loggers instead

2020-02-27T19:49:01.150900Z

asking to hit the 1024 pending puts limit

💯 1
serioga 2020-02-27T19:51:03.152300Z

«asking to hit the 1024 pending puts limit» fine. Then he will need to think about back-pressure but I doubt in this particular case

ghadi 2020-02-27T19:51:35.152800Z

it's incorrect/problematic advice

serioga 2020-02-27T19:55:21.155100Z

I think that architecture of this solution is problematic itself I'm not ready to offer architectural changes quickly.

serioga 2020-02-27T19:57:01.156200Z

Function put! exists and should be used when it has sense

ghadi 2020-02-27T20:00:42.159Z

right, it doesn't make sense here @serioga

ghadi 2020-02-27T20:01:25.160300Z

architecture is the most important part of getting these little concurrent machines to work well

ghadi 2020-02-27T20:02:02.161700Z

where does new data come from? where does it go? what happens when the data is produced fast than it can be consumed? etc. etc.

fmjrey 2020-02-27T20:02:33.162Z

if you're able to reproduce while running in debug mode (say in intellij) then have you tried to inspect your threads to see what blocks?

ghadi 2020-02-27T20:03:28.162600Z

go routines will not show up in thread stack dumps except when they're active

ghadi 2020-02-27T20:03:44.163100Z

if they're waiting on a channel op they're not actually running on thread

ghadi 2020-02-27T20:04:22.164100Z

async/thread will show up

fmjrey 2020-02-27T20:04:29.164500Z

you can also inspect channels and see their internal state

kwladyka 2020-02-27T20:09:24.164800Z

> you can also inspect channels and see their internal state How?

kwladyka 2020-02-27T20:11:25.165700Z

> if you’re able to reproduce while running in debug mode (say in intellij) then have you tried to inspect your threads to see what blocks? I can’t

fmjrey 2020-02-27T20:11:26.165800Z

This video helps a lot in understanding internals: https://vimeo.com/100518968

ghadi 2020-02-27T20:12:02.166500Z

as one of the authors of core.async, I would avoid breaking into the channel

➕ 1
ghadi 2020-02-27T20:13:01.167600Z

you need to clarify intent, develop hypotheses, and validate with experiments

ghadi 2020-02-27T20:13:25.168100Z

observing the impl details of channels is inherently racey

kwladyka 2020-02-27T20:13:31.168600Z

> I would avoid breaking into the channel What do you mean?

ghadi 2020-02-27T20:13:51.169Z

I think @fmjrey was suggesting looking inside the channel

kwladyka 2020-02-27T20:14:09.169600Z

oh I thought you are writing to me

ghadi 2020-02-27T20:14:13.169800Z

(racey, for the same reason the closed? method is not public API -- it's almost guaranteed to be racey)

kwladyka 2020-02-27T20:14:15.170Z

about somethign what I said

ghadi 2020-02-27T20:14:17.170100Z

I am writing to you too

ghadi 2020-02-27T20:15:54.171500Z

whatever is responsible for creating workers should pass their input channels as arguments to those go routines, along with maybe a logging channel

ghadi 2020-02-27T20:16:16.172Z

(defn launch-worker [in cancel log]
  (go ....))

kwladyka 2020-02-27T20:17:34.172900Z

(defonce worker
  (go-loop []
    (let [job (<! queue)]
      (l/debug "run job" job)
      (when-not (nil? job)
        (<! (thread
              (try
                (job)
                (catch Throwable ex
                  (l/error ex)))))
        (recur)))))
hmm what is wrong with this?

ghadi 2020-02-27T20:17:55.173500Z

that's a top level side effect that starts a process when the namespace loads

kwladyka 2020-02-27T20:18:23.173700Z

oh I see

kwladyka 2020-02-27T20:20:42.174900Z

so the best practice is to launch-worker each time when run the job? And each time create a channel for this purpose? Or only launch workers once with initialization?

ghadi 2020-02-27T20:22:28.176800Z

you still want to launch your workers once, I mean don't do it with global defs

👍 1
ghadi 2020-02-27T20:23:08.177400Z

@kwladyka you should read and understand the pmax example https://stuartsierra.com/2013/12/08/parallel-processing-with-core-async

👍 1
kwladyka 2020-02-27T20:24:04.178600Z

ok thanks, I need to take a break. It is too much for me today 🙂

fmjrey 2020-02-27T20:38:04.181600Z

I did not mean to change the internal state, only to inspect it so you know if buffer is full, the number of pending put, etc

ghadi 2020-02-27T20:40:03.182100Z

still a bad idea

ghadi 2020-02-27T20:40:30.182600Z

observing concurrent devices changes the rhythm of the program

ghadi 2020-02-27T20:40:51.183100Z

just like stopping the world in a debugger might not show you races that happen commonly without the debugger

fmjrey 2020-02-27T20:41:47.184300Z

you don't need to stop execution just wait for the deadlock state and then inspect