core-async

kwladyka 2019-12-30T09:41:53.068Z

Can you look on my question https://clojurians.slack.com/archives/C03S1KBA2/p1577696618239500 here? Am I using async correctly or is it not right pattern for async? When should I use async then and when not to use? Is it possible to solve my issue with async?

kwladyka 2019-12-30T09:42:15.068500Z

^ the discussion is in the thread of this message

dpsutton 2019-12-30T09:44:32.068800Z

does it need to be async rather than a thread pool?

kwladyka 2019-12-30T09:46:07.069800Z

well in general this was made to avoid situation when processing is running at the same time, because it makes issues. So it is a queue.

👍 1
kwladyka 2019-12-30T09:46:32.070200Z

anyway I would like to learn the topic around

kwladyka 2019-12-30T09:47:32.070500Z

so is it wrong pattern for async?

dpsutton 2019-12-30T09:48:37.071600Z

not wrong to my eyes. i just saw a queue taking runnable functions which is essentially a simpler version of a thread pool

kwladyka 2019-12-30T09:49:07.071900Z

how can I fix my issue using async?

kwladyka 2019-12-30T09:51:52.072700Z

the issue is (I guess) worker and worker-warehouse overflow capacity after 2-4 weeks

kwladyka 2019-12-30T09:53:02.073200Z

because nothing reads from then, this two are only workers who work hard 🙂

kwladyka 2019-12-30T09:53:19.073600Z

but output doesn’t matter

kwladyka 2019-12-30T09:53:27.073800Z

😉

kwladyka 2019-12-30T09:58:40.074200Z

and all in all what is the best solution for my case?

vemv 2019-12-30T10:04:50.074500Z

could it be one of these two? * go-loop works fine, but whoever is enqueueing has a problem * some other (unrelated) piece of code uses core.async in such a way that it causes trouble to c.a's internal thread pool also, the JMX suggestion you received is on point. Similarly you can run Yourkit in prod/staging and connect to it from your machine. Tools like yourkit tell you exactly where/how your threads are stalling

kwladyka 2019-12-30T10:06:52.074700Z

We can assume go-loop overflow after some time, so debugging is not really needed. But the question is how to fix it?

vemv 2019-12-30T10:11:30.074900Z

> We can assume go-loop overflow after some time What makes you believe so? Your go-loop consumes work in an orderly way thanks to its <! There's no such thing as consumption overflow (there is the notion of production overflow)

kwladyka 2019-12-30T10:12:24.075100Z

hmm that is what @delaguardo suggested

kwladyka 2019-12-30T10:14:05.075300Z

but yes I don’t understand it too how it overflows

kwladyka 2019-12-30T10:14:19.075500Z

hmm

kwladyka 2019-12-30T10:14:26.075700Z

unless it is about recur overflow?

vemv 2019-12-30T10:14:32.075900Z

He seems to talk about the case of using #'worker as a channel. Are you even doing that?

kwladyka 2019-12-30T10:14:35.076100Z

can it be?

kwladyka 2019-12-30T10:15:29.076300Z

I think not

vemv 2019-12-30T10:15:37.076500Z

no, if recur compiles it's tail-recursive and doesn't accrete a stack

kwladyka 2019-12-30T10:16:13.076700Z

* go-loop works fine, but whoever is enqueueing has a problem
What do you mean by this point then?

kwladyka 2019-12-30T10:18:03.076900Z

> He seems to talk about the case of using #'worker as a channel. Are you even doing that? I don’t and he said it is an issue, because this channel growing and growing

kwladyka 2019-12-30T10:18:30.077100Z

but I think it doesn’t get even 1 value, because it is endless loop with recur

kwladyka 2019-12-30T10:18:38.077300Z

but maybe I miss something

vemv 2019-12-30T10:19:17.077500Z

> What do you mean by this point then? Hypothetically, the problem is in the code that enqueues the work with >!. Maybe it's blocking indefinitely, making you believe the go-loop doesn't work Also, using put! can be dangerous because it disregards queue sizes. Do you have any calls to put!?

kwladyka 2019-12-30T10:20:13.077700Z

> Hypothetically, the problem is in the code that enqueues the work with >!. Maybe it’s blocking indefinitely, making you believe the go-loop doesn’t work it works, because I see in logs it going further and finish adding things to queue. I have a message on the end to debug it.

kwladyka 2019-12-30T10:20:22.077900Z

I don’t use put!

kwladyka 2019-12-30T10:20:47.078100Z

So adding to queue doesn’t stuck

kwladyka 2019-12-30T10:20:55.078300Z

there are only 2 possibilites:

kwladyka 2019-12-30T10:21:23.078500Z

1. channels is closed for some magic reason 2. worker died 3. magic issues about memory or overflow something

kwladyka 2019-12-30T10:21:54.078700Z

but the strangest thing is both workers stop doing jobs at the same time - at least I think it is like that

vemv 2019-12-30T10:23:54.078900Z

2 can be checked with (<!! worker) in the repl right? if it blocks, then it hasn't died

vemv 2019-12-30T10:24:38.079100Z

...it could be many things. thread uses an unbounded (but cached) thread pool which might make the OS upset.

kwladyka 2019-12-30T10:24:48.079300Z

not really because I can’t reproduce it on localhost

kwladyka 2019-12-30T10:25:14.079500Z

it happens after 2-4 weeks on production only

kwladyka 2019-12-30T10:25:24.079700Z

something like that

kwladyka 2019-12-30T10:26:09.079900Z

but because both workers stop doing jobs at the same time…. something overflow somehow probably

kwladyka 2019-12-30T10:26:54.080100Z

but I think it is not go-loop because it never returns even 1 value

vemv 2019-12-30T10:28:14.080300Z

go-loop as it name implies is made for looping so... I doubt it. It'd be extremely fragile if it'd didn't support infinite loops

vemv 2019-12-30T10:29:26.080500Z

I'd use something like AWS SQS for never losing work and having a greater degree of observability

kwladyka 2019-12-30T10:30:26.080700Z

I can lose work, it is designed to not be a problem. I would like to keep it in the app to decrease complexity

kwladyka 2019-12-30T10:30:39.080900Z

using third party queue system increase complexity, which I don’t need

kwladyka 2019-12-30T10:32:46.081700Z

I am running a simple job 100 000 times to see if it will stuck on localhost

👍 1
kwladyka 2019-12-30T10:32:54.082Z

so far it looks like it works

kwladyka 2019-12-30T10:34:48.082500Z

ok it finished 100 000 times. It is probably not about overflow any channels.

kwladyka 2019-12-30T10:34:55.082700Z

or worker

kwladyka 2019-12-30T10:36:06.083100Z

ech maybe run REPL on production temporary…

kwladyka 2019-12-30T10:36:21.083500Z

to connect it during an issue and debug then

vemv 2019-12-30T10:36:24.083600Z

> I can lose work That's an interesting requirement I'd say, in that case you should be able to use whatever you feel more comfortable with. Could be agents, futures, core.async, j.u.concurrent, claypoole... obviously they have different demands in case something goes wrong (i.e., many people will recommend you j.u.c, but if something goes wrong will you pick up a Java book? Many devs won't)

kwladyka 2019-12-30T10:37:35.083900Z

> That’s an interesting requirement The processing is resistant for crash in any moment

kwladyka 2019-12-30T10:38:21.084100Z

But for other reasons processing can processing only 1 thing at the same time

kwladyka 2019-12-30T10:38:37.084300Z

because of the limitation of third party system

kwladyka 2019-12-30T10:38:50.084500Z

so this is why app needs queue

vemv 2019-12-30T10:39:43.084700Z

a thread pool of size 1 also works like that

kwladyka 2019-12-30T10:40:29.084900Z

maybe, but will it solve anything to switch for thread pool (I didn’t use so far)?

kwladyka 2019-12-30T10:40:46.085100Z

it doesn’t look like it is directly async issue

kwladyka 2019-12-30T10:40:52.085300Z

well I have no idea what is the issue

kwladyka 2019-12-30T10:41:01.085500Z

it doesn’t make sense 😛

kwladyka 2019-12-30T10:44:44.086300Z

but from logs I am sure both workers stop doing new jobs at the same time which probably should tell me something, but it doesn’t

vemv 2019-12-30T10:45:17.086500Z

if you are comfortable with it, it's more observable say it's a thread pool with threadsize 1, backed by a queue of max size 1000 if someting goes wrong you can fetch the queue size, its contents, etc. Likewise you might interact with the thread pool itself (check javadocs). also the error handling will be more straightforward. Less magic overall (vs. core.async)

kwladyka 2019-12-30T10:53:53.087Z

Does it tell you anything?

2019-12-30T10:59:53.088500Z

make sure your channel (queues and workers) are opened (clojure.core.async.impl.protocols/closed? chan) and be careful with that function because it is intended to be “implementation details” and it should not be a part of your code base

👍 1
kwladyka 2019-12-30T11:01:19.089100Z

I can’t recreate it on localhost, so only one thing which I can do is push it to production and wait 2-4 weeks

kwladyka 2019-12-30T11:01:26.089300Z

or more

kwladyka 2019-12-30T11:07:46.089900Z

this things which I already added to logs today to be sure

kwladyka 2019-12-30T11:19:41.090700Z

(clojure.core.async.impl.protocols/closed? chan) do you know how to run it on production? No implementation of method: :closed? of protocol: #'clojure.core.async.impl.protocols/Channel found for class: nil while it works on localhost in repl and even compile with it

kwladyka 2019-12-30T11:20:17.091100Z

@delaguardo ^ do you know how to run it?

2019-12-30T11:21:25.091600Z

you are trying to call it with nil as an argument, but it expects a channel

kwladyka 2019-12-30T11:21:42.092Z

but it is not true 😕

2019-12-30T11:22:48.092700Z

I’m gessing here, based on error message)

kwladyka 2019-12-30T11:23:22.093100Z

(defonce worker
  (go-loop []
    (let [job (<! queue)]
      (l/debug "run job" job)
      (when-not (nil? job)
        (<! (thread
              (try
                (job)
                (catch Throwable ex
                  (l/error ex)))))
        (recur)))))

(defonce worker-warehouse
  (dotimes [x 3]
    (go-loop []
      (let [job (<! queue-warehouse)]
        (l/debug "run warehouse job" job)
        (when-not (nil? job)
          (<! (thread
                (try
                  (job)
                  (catch Throwable ex
                    (l/error ex)))))
          (recur))))))

(defn add-job [ch f]
  (l/debug "async: add" f "to" ch ": " (>!! ch f))
  (l/debug "worker:" (clojure.core.async.impl.protocols/closed? worker)
           "worker-warehouse" (clojure.core.async.impl.protocols/closed? worker-warehouse)))

2019-12-30T11:24:58.093700Z

but worker-warehouse should be nil, isn’t it?

2019-12-30T11:25:11.094Z

there is dotimes call inside

kwladyka 2019-12-30T11:25:29.094300Z

ah… I am dump. true

2019-12-30T11:26:34.095300Z

there is also a strange (when-not (nil? job The only way to get nil from a channel is to consume from closed and exhausted channel

kwladyka 2019-12-30T11:28:21.096Z

yes, it is added as a good habit to not pass nil for processing in such case

kwladyka 2019-12-30T11:28:53.096600Z

but should be never true

kwladyka 2019-12-30T11:29:02.097Z

I mean false in this case

2019-12-30T11:29:12.097200Z

but after consuming everything your code is going into recur branch and will try to consume from already closed channel then will got to the same branch

2019-12-30T11:29:49.097400Z

ok

2019-12-30T11:30:19.098100Z

but this is a big assumption, I mean statement that it will nether be true

2019-12-30T11:30:49.098700Z

ah, sorry, my bad

kwladyka 2019-12-30T11:30:51.099Z

If it will be true, then worker will close itself, because of closed channel

2019-12-30T11:31:02.099300Z

misreading

kwladyka 2019-12-30T11:31:10.099500Z

but I never close the channel, so ….. ok good 🙂

kwladyka 2019-12-30T11:32:25.100300Z

the funny thing it started to happen after some update, maybe version of async or who knows. It wasn’t like that from beginning

2019-12-30T11:37:38.101600Z

Like I said in other thread, your go-loop is pretty weird. I would rewrite it so it logs on nil and stops looping. Since when the channel take returns nil, it means the channel has closed. At least that way you'd know if the issue you're encountering is due to the channel closing

kwladyka 2019-12-30T11:39:06.102500Z

it stops looping on nil already

2019-12-30T11:40:42.102700Z

(defonce worker
  (go-loop []
    (if-let [job (<! queue)]
      (do
        (l/debug "run job" job)
        (<! (thread
              (try
                (job)
                (catch Throwable ex
                  (l/error ex)))))
        (recur))
      (l/debug "channel closed"))))

2019-12-30T11:41:07.102900Z

Oh is the recur inside the when

2019-12-30T11:41:29.103400Z

Ok, well do you know if that's the case?

kwladyka 2019-12-30T11:42:00.103900Z

why this issue happening? I have no idea. I added extra debugs and I have to wait a few weeks

2019-12-30T11:42:29.104200Z

I meant, do you know if it exited the go-loop ?

kwladyka 2019-12-30T11:43:15.104600Z

I think it doesn’t, but next time I will have a proof in logs

kwladyka 2019-12-30T11:44:08.105Z

I have 0 close! this channel code

kwladyka 2019-12-30T11:44:19.105300Z

it can close only itself cause by some magic bug in async

kwladyka 2019-12-30T11:44:29.105600Z

But I did async update now also, so we will se

kwladyka 2019-12-30T11:45:28.106Z

But the issue is probably the most challenging which I ever had in Clojure

kwladyka 2019-12-30T11:45:36.106200Z

at least one of them

2019-12-30T11:46:02.106600Z

Hum, well the other thing I can think of is a race condition

2019-12-30T11:46:07.106800Z

I mean a deadlock

2019-12-30T11:46:39.107Z

Like one job waiting on another

kwladyka 2019-12-30T11:50:22.107300Z

doesn’t happen - I am sure from logs

kwladyka 2019-12-30T11:52:55.107900Z

this is the interesting point

2019-12-30T11:53:37.108200Z

Well, that could be a deadlock

2019-12-30T11:53:57.108700Z

They'd both stop simultaneously if they deadlocked each other

kwladyka 2019-12-30T11:54:05.109Z

no, because I clearly see from logs functions not block

kwladyka 2019-12-30T11:54:21.109500Z

and this fn don’t depend on each other in any way

2019-12-30T11:54:36.109800Z

So what do you mean then by they both stop at the same time ?

kwladyka 2019-12-30T11:55:10.110400Z

I have logs

(l/debug "run job" job)
and I see both workers stop run new jobs at the same time

kwladyka 2019-12-30T11:55:23.110800Z

it is not like one worker stop running job and later second one

kwladyka 2019-12-30T11:55:36.111200Z

the issue happening for both of them at the same time

2019-12-30T11:57:55.111900Z

If I was you though, I'd just do this instead:

(defonce worker
  (thread
    (loop [job (<!! queue)]
      (l/debug "run job" job)
      (when-not (nil? job)
        (try
          (job)
          (catch Throwable ex
            (l/error ex)))
        (recur (<!! queue))))))

2019-12-30T11:59:52.113300Z

(defn worker [process-fn]
    (let [in (chan)
          out (chan)]
      (go-loop []
        (let [msg (<! in)]
          (>! out (process-fn msg)))
        (recur))
      [in out]))
consider something like this. worker function that returns in and out queues connected by process-fn. It will give you more control on created channels

2019-12-30T12:00:19.113800Z

So, if you stop seeing "run job" at the same time, it could be the queues are empty, and your puts are failing, it could be they are deadlocked, or it could be the channel have closed. That's all I can think of

kwladyka 2019-12-30T12:00:51.114200Z

yes, I added debug to see if queue and workers are opened / closed today

2019-12-30T12:01:18.114700Z

this is also allows you to restart worker on some condition

kwladyka 2019-12-30T12:01:22.115Z

and for adding to channel fn

kwladyka 2019-12-30T12:01:33.115300Z

so I will know this 3 things next time

2019-12-30T12:02:15.115900Z

Is there any other thing in your app that would keep it alive if those loops exited?

kwladyka 2019-12-30T12:02:45.116500Z

it looks like out channel will be full at one moment

kwladyka 2019-12-30T12:03:05.117100Z

and it will stuck

kwladyka 2019-12-30T12:03:24.117500Z

not sure what do you mean?

2019-12-30T12:03:44.117700Z

small question - from the code above:

(loop [job (<! queue)]
   ...
   (job))
does it means that you are passing functions throw channels?

kwladyka 2019-12-30T12:04:00.118200Z

yes

kwladyka 2019-12-30T12:04:48.119400Z

what is the advantage of this fn?

2019-12-30T12:05:31.120800Z

Well, your process would exit if it stops looping. Normally an application starts and closes when it runs out of instructions to run, unless it has a main loop of some sort. In your case, your go-loop seem to be what does it. So if there's no other infinite loop, your program should shutdown if it was that the channels were closed and returned nil which would have exited the loops.

2019-12-30T12:05:42.120900Z

is it possible for that functions to throw checked exceptions? they are not runtime so will not be catched by try ... Trhowable

2019-12-30T12:06:28.121400Z

something like ClassNotFoundException

2019-12-30T12:06:32.121600Z

Well, since you want to process one job per worker at a time. This doesn't create a new thread on every job, it just reuses the same thread, and runs one job after another on it.

kwladyka 2019-12-30T12:06:50.122100Z

the app wouldn’t exist if workers will stop working

2019-12-30T12:07:24.122200Z

Also, it isn't as much at risk of having issues where you've blocked the parking threadpool.

2019-12-30T12:07:54.122400Z

Since each worker gets their own thread

kwladyka 2019-12-30T12:08:32.123200Z

@delaguardo which function?

2019-12-30T12:08:36.123400Z

those exception will be thrown in the thread so your worker will be there

2019-12-30T12:08:47.123700Z

(job)

kwladyka 2019-12-30T12:09:13.124200Z

What do you mean?

(try
                  (job)
                  (catch Throwable ex
                    (l/error ex)))
is not exactly this?

2019-12-30T12:09:24.124500Z

There's another possibility I'm thinking. I'm not sure what happens to the channel returned by thread on that exception case

kwladyka 2019-12-30T12:09:38.124700Z

there is 0 errors in logs

kwladyka 2019-12-30T12:10:02.125400Z

*for this case

2019-12-30T12:10:03.125500Z

Hum, ok. And all (job) return a non nil value ?

2019-12-30T12:10:23.126100Z

if (job) throws checked exception it will not be catched by your try/catch block

kwladyka 2019-12-30T12:10:30.126300Z

whatever (job) return it doesn’t matter

2019-12-30T12:11:51.126400Z

why is that? i’m not restricting chan by any buffer-size

kwladyka 2019-12-30T12:12:44.126900Z

hmm I have to read about this

kwladyka 2019-12-30T12:13:25.127300Z

but it is not the case, because I don’t see `

(l/debug "run job" job)
`

2019-12-30T12:14:40.127700Z

Throwable does catch everything

kwladyka 2019-12-30T12:15:17.128200Z

I think we wouldn’t sole it today. We have to wait 2-4 weeks for new logs…

kwladyka 2019-12-30T12:15:50.128900Z

I can post the solution here if I will find out it

2019-12-30T12:16:02.129200Z

Ya, you could try and wait and see if others have ideas, but it seems to be a very rare ocurence so it will be hard

kwladyka 2019-12-30T12:16:46.129300Z

I know but I guess there is memory limit or whatever limit. It can’t be endless

2019-12-30T12:17:14.129700Z

And you're sure none of the jobs use core.async inside them?

kwladyka 2019-12-30T12:17:35.129900Z

yes

2019-12-30T12:18:05.130500Z

What does the producer code look like?

kwladyka 2019-12-30T12:18:16.130700Z

(defn sync-auto []
  (if (state/get-sync)
    (do
      (slack/send-info "> > > Sync auto start add to queue")
      (let [shop-ids (keys conf/shops)]
        (doseq [shop-id shop-ids]
          (add-job queue #(orders/atomstore-sync->wfirma shop-id :bk)))
        (add-job queue sync-warehouse)
        (doseq [shop-id shop-ids]
          (add-job queue #(invoices/send-ivoices shop-id))))
      (slack/send-info "< < < Sync auto added tasks to queue"))
    (slack/send-warning "! ! ! Synchronization if off. Nothing to do.")))
I have super simple fn to add jobs

2019-12-30T12:18:22.130900Z

Maybe there's an issue on their side

2019-12-30T12:20:10.131600Z

Hum... And what triggers sync-auto? Maybe that's what stops working after a while?

kwladyka 2019-12-30T12:20:35.131800Z

one per 30 minutes

2019-12-30T12:21:04.132400Z

And those (slack/send-info "> > > Sync auto start add to queue") logs keep showing?

kwladyka 2019-12-30T12:21:07.132600Z

it is working, because I see logs Sync auto start add to queue and Sync auto added tasks to queue

2019-12-30T12:21:30.132900Z

Hum... ya that's pretty strange

2019-12-30T12:21:50.133500Z

I'll be curious to know if you ever find out the problem

kwladyka 2019-12-30T12:21:58.133800Z

I appreciate your engage into the topic, but probably we can only wait

kwladyka 2019-12-30T12:22:13.134100Z

heh me too, give me a few months :troll:

2019-12-30T12:22:28.134300Z

Good Luck!

kwladyka 2019-12-30T12:22:39.134500Z

thank you

fmjrey 2019-12-30T15:31:24.134600Z

a/thread are daemon threads so the JVM does not wait for them when exiting. Maybe something else is making your JVM terminate....

fmjrey 2019-12-30T15:33:24.134900Z

forget what I said, your loop is not in a a/thread

fmjrey 2019-12-30T16:03:27.137500Z

My guess would be something in (job) blocks the thread meaning the loop never completes