core-async

Timur Latypoff 2020-02-25T06:18:28.008500Z

Hi all! I am having "random" issues with async pubs (or maybe interaction of them with the REPL) — would you please help me with diagnosing what I am doing wrong. Below is the sample code. A stream of news messages is coming through all-news chan. The :country of the message shows to which country the news piece relates, absent or nil :country means the message relates to the whole world. What I want is to route the messages through the a/pub, so that individual subscribers can get only the news they want. There I create a chan russian-news which subscribes to news from country "RU" and to global news at the same time.

(ns myproject.test-async
  (:require [clojure.core.async :as a]))

(def all-news (a/chan (a/sliding-buffer 1000)))
(def the-pub (a/pub all-news :country (fn [_] (a/sliding-buffer 1000))))
(def russian-news (a/chan (a/sliding-buffer 1000)))
(a/sub the-pub "RU" russian-news)
(a/sub the-pub nil russian-news)

(a/offer! all-news {:country "RU" :title "News from Russia 1"})
(a/offer! all-news {:country "TR" :title "News from Turkey 1"})
(a/offer! all-news {:title "Global news 1"})
(a/offer! all-news {:country "TR" :title "News from Turkey 2"})
(a/offer! all-news {:country "RU" :title "News from Russia 2"})
(a/offer! all-news {:title "Global news 2"})
(a/close! all-news)

(print (a/<!! (a/into [] russian-news)))
When I run the code, I expect two Russian and two global news to be printed. I don't rely on the order of Russian news relative to global news, but I would like to rely that all the news messages will get eventually properly routed (in this case, the input channel is closed in the end, so we can ensure eventuality). The problem is: as I run the same code in the REPL multiple times, one after the other, I get different results:
Loading src/myproject/test_async.clj... 
[{:country RU, :title News from Russia 1} {:title Global news 1} {:country RU, :title News from Russia 2} {:title Global news 2}]Loaded
Loading src/myproject/test_async.clj... 
[{:country RU, :title News from Russia 1} {:title Global news 1} {:country RU, :title News from Russia 2} {:title Global news 2}]Loaded
Loading src/myproject/test_async.clj... 
[{:country RU, :title News from Russia 1} {:title Global news 1} {:title Global news 2}]Loaded
Loading src/myproject/test_async.clj... 
[{:country RU, :title News from Russia 1} {:title Global news 1} {:country RU, :title News from Russia 2} {:title Global news 2}]Loaded
Loading src/myproject/test_async.clj... 
[{:country RU, :title News from Russia 1} {:title Global news 1} {:country RU, :title News from Russia 2} {:title Global news 2}]Loaded
Loading src/myproject/test_async.clj... 
[{:country RU, :title News from Russia 1} {:country RU, :title News from Russia 2}]Loaded
In short, I sometimes (2 out of 6 times) don't get some of the news in the resulting stream. What could be the problem?

2020-02-25T12:10:27.012300Z

You have a race condition when subscribing the same channel to more than one topic. When you close the source chan all-news the go-loop created by pub closes the topic channels when it’s done, which causes the go-loops created by mult for each topic to close the subscribed channel by default. One of these mult go-loops will close russian-news before the other, and that other could be in the middle of still putting values on russian-news. It will silently fail and untap automatically.

👍 1
Timur Latypoff 2020-02-25T12:38:15.015700Z

@yonatanel thank you! Would you please suggest, what is the proper way to subscribe a single channel to multiple topics? I’d assume it’s a/mix, but the whole thing starts looking really heavy-weight with multiple (4—5) intermediary buffers.

markmarkmark 2020-02-25T13:24:03.015900Z

you could use a mult directly and tap with a channel using a filter transducer.

👍 1
markmarkmark 2020-02-25T13:24:21.016100Z

not sure if you'll run into similar races with that though

2020-02-25T20:20:53.035700Z

any particular reason to use offer! instead of put!?

Timur Latypoff 2020-02-26T07:36:52.039300Z

@noisesmith I guess put! without callback is equivalent to offer! — I didn’t know the callback-less form existed. I usually use offer!/poll! in REPL for convenience.

2020-02-26T19:05:03.040500Z

it's different in that offer! fails to put anything on the channel (without error) if there's no buffer space or immediate consumer

2020-02-26T19:05:20.040700Z

put! has its own buffer

2020-02-26T19:06:03.040900Z

the reason I bring this up is that using offer silently drops your data if you don't account for that, I didn't look close but it can be a source of unexpected behavior for that reason

Timur Latypoff 2020-02-26T19:12:19.041100Z

@noisesmith ah yes, that's the expected behavior of offer! for me when I'm REPLing, it returns false if the buffer is full, I believe. In my case above, @yonatanel was 100% correct — indeed that was a race condition, the topic channel gets closed when input channel is closed, so in my case I sometimes end up not receiving all the messages.

2020-02-26T19:18:25.041300Z

You can subscribe two different channels and a/merge them into one. The merged channel will be closed only after both the sources are closed.

2020-02-26T19:19:21.041500Z

Or subscribe without closing the destination channel

Timur Latypoff 2020-02-26T19:38:29.041700Z

Thank you! I even have uses for both of the ways :)

Timur Latypoff 2020-02-25T07:24:51.008700Z

Clojure 1.10.0

vlaaad 2020-02-25T11:04:15.009400Z

should I close every channel I open?

2020-02-25T11:05:31.010Z

Probably not

vlaaad 2020-02-25T11:06:43.011100Z

is it an okay way to implement delayed retries (assuming fn asynchronously puts one value to fn-ch, and delays is a seq of millis)?

(defn with-retries [fn fn-ch delays out-ch]
  (fn fn-ch)
  (async/go-loop [val (<! fn-ch)
                  delays delays]
    (if (instance? Exception val)
      (let [[delay & delays] delays]
        (if delay
          (do (<! (async/timeout delay))
              (fn fn-ch)
              (recur (<! fn-ch) delays))
          (>! out-ch val)))
      (>! out-ch val))))

Ben Sless 2020-02-26T13:17:21.039700Z

Do you intend for the process to take another value from the channel while the previous is being timed out?

vlaaad 2020-02-26T13:26:43.039900Z

no, I know there will be at mose one value from fn-ch

Ben Sless 2020-02-26T13:27:57.040100Z

Then it makes sense at a first glance

vlaaad 2020-02-26T13:30:13.040300Z

thanks for the review!

2020-02-25T11:06:56.011300Z

Closing a channel means you will not put any more values on it, and consumers know to stop working (when they take a nil value from the channel)

vlaaad 2020-02-25T11:10:33.011800Z

I see, thanks!

vlaaad 2020-02-25T11:11:05.012Z

I was just worried if creating a channel makes it registered somewhere so it might lead to memory leaks

2020-02-25T12:14:18.012500Z

No, it doesn’t even release pending puts on the closed channel, that’s why it’s the responsibility of the producer thread to close the channel after it’s done + by default piping a channel into another channel will close the destination channel when the source is drained, causing a cascading automatic closing of all channels.

mpenet 2020-02-25T13:45:43.017Z

Was there any plan to differ in implementation between pipeline and pipeline-blocking? (they are identical atm afaik)

mpenet 2020-02-25T13:46:28.017500Z

I guess that would be a question for @alexmiller

alexmiller 2020-02-25T14:04:09.018200Z

they used to be different, we decided blocking was not correct so atm they are the same. they may be modified to be different in the future

alexmiller 2020-02-25T14:04:34.018400Z

this was a recent change

mpenet 2020-02-25T14:28:12.018600Z

makes sense. thx

alexmiller 2020-02-25T14:29:40.019200Z

I would recommend still picking the function that best reflects whether it is computation only or potentially blocking operations

2020-02-25T15:28:48.019800Z

does it use a separate thread pool or do they share one between blocking and "normal"?

mpenet 2020-02-25T15:32:49.021300Z

pipeline and pipeline-blocking are the same and they both use async/thread internally (so the internal core.async cachedthreadpool). pipeline-async doesn't assume any execution context, it runs in go blocks

mpenet 2020-02-25T15:34:08.021800Z

not sure what's "normal" 🙂

alexmiller 2020-02-25T15:37:17.022500Z

note that the pool used by thread is different than the pool used by go blocks

alexmiller 2020-02-25T15:38:13.023200Z

the former is a cached thread pool (grows without bound, reuses if possible, goes to sleep and dies if not used)

alexmiller 2020-02-25T15:38:24.023500Z

the latter is a fixed size thread pool

alexmiller 2020-02-25T15:40:10.024100Z

pipeline or pipeline-blocking will have concurrency up to N

mpenet 2020-02-25T15:40:43.024700Z

yes, it's quite similar to using an executorservice in practice

mpenet 2020-02-25T15:40:49.025Z

since it's bound

alexmiller 2020-02-25T15:40:57.025300Z

it is literally using an executorservice

mpenet 2020-02-25T15:41:06.025600Z

I mean per pipeline

alexmiller 2020-02-25T15:41:42.026600Z

it would be reasonable to use a fixed size pool for pipeline instead as the tasks should be computation-only and not block

alexmiller 2020-02-25T15:42:24.027900Z

but the behavior would not be much different in practice

mpenet 2020-02-25T15:42:29.028100Z

I am saying that because I used to be a proponent to have the ability to specify executor a bit everywhere in core async api, but with usage/time I am not sure it's really useful anymore, defaults are good

mpenet 2020-02-25T15:42:42.028400Z

as long as the user knows what runs where

mpenet 2020-02-25T15:44:15.029700Z

only downside I can think of now is we don't get nice thread names in logs 🙂, which is ok

mpenet 2020-02-25T15:45:19.030200Z

and writing a variant of async/thread that takes an executor as arg takes ~10 lines

alexmiller 2020-02-25T15:48:29.030700Z

you'll get something like "async-thread-macro-1"

alexmiller 2020-02-25T15:49:09.031400Z

so it's somewhat nice

mpenet 2020-02-25T15:50:15.032500Z

yes, sometimes you'd prefer names that indicates what it's responsible for, but that's ok, there are other places in logs to express that and enough information to figure it out

alexmiller 2020-02-25T15:51:15.033Z

the issue with caching threads is that they can be more than one thing of course over time

mpenet 2020-02-25T15:52:02.033800Z

the issue with naming you mean?

alexmiller 2020-02-25T15:52:06.034Z

yeah

alexmiller 2020-02-25T15:52:14.034500Z

it is possible to change thread names dynamically (and this is a super sneaky debugging technique) but there are perf impacts

mpenet 2020-02-25T15:52:15.034600Z

I guess you could cheat and use Thread/setName

mpenet 2020-02-25T15:52:23.034900Z

at Run time, but yuk

mpenet 2020-02-25T15:52:32.035200Z

yes

alexmiller 2020-02-25T15:52:37.035400Z

I've totally abused this for debugging :)

mpenet 2020-02-25T15:52:42.035600Z

me too 🙂