Hi all!
I am having "random" issues with async pubs (or maybe interaction of them with the REPL) — would you please help me with diagnosing what I am doing wrong.
Below is the sample code. A stream of news messages is coming through all-news
chan. The :country
of the message shows to which country the news piece relates, absent or nil :country
means the message relates to the whole world.
What I want is to route the messages through the a/pub, so that individual subscribers can get only the news they want. There I create a chan russian-news
which subscribes to news from country "RU" and to global news at the same time.
(ns myproject.test-async
(:require [clojure.core.async :as a]))
(def all-news (a/chan (a/sliding-buffer 1000)))
(def the-pub (a/pub all-news :country (fn [_] (a/sliding-buffer 1000))))
(def russian-news (a/chan (a/sliding-buffer 1000)))
(a/sub the-pub "RU" russian-news)
(a/sub the-pub nil russian-news)
(a/offer! all-news {:country "RU" :title "News from Russia 1"})
(a/offer! all-news {:country "TR" :title "News from Turkey 1"})
(a/offer! all-news {:title "Global news 1"})
(a/offer! all-news {:country "TR" :title "News from Turkey 2"})
(a/offer! all-news {:country "RU" :title "News from Russia 2"})
(a/offer! all-news {:title "Global news 2"})
(a/close! all-news)
(print (a/<!! (a/into [] russian-news)))
When I run the code, I expect two Russian and two global news to be printed. I don't rely on the order of Russian news relative to global news, but I would like to rely that all the news messages will get eventually properly routed (in this case, the input channel is closed in the end, so we can ensure eventuality).
The problem is: as I run the same code in the REPL multiple times, one after the other, I get different results:
Loading src/myproject/test_async.clj...
[{:country RU, :title News from Russia 1} {:title Global news 1} {:country RU, :title News from Russia 2} {:title Global news 2}]Loaded
Loading src/myproject/test_async.clj...
[{:country RU, :title News from Russia 1} {:title Global news 1} {:country RU, :title News from Russia 2} {:title Global news 2}]Loaded
Loading src/myproject/test_async.clj...
[{:country RU, :title News from Russia 1} {:title Global news 1} {:title Global news 2}]Loaded
Loading src/myproject/test_async.clj...
[{:country RU, :title News from Russia 1} {:title Global news 1} {:country RU, :title News from Russia 2} {:title Global news 2}]Loaded
Loading src/myproject/test_async.clj...
[{:country RU, :title News from Russia 1} {:title Global news 1} {:country RU, :title News from Russia 2} {:title Global news 2}]Loaded
Loading src/myproject/test_async.clj...
[{:country RU, :title News from Russia 1} {:country RU, :title News from Russia 2}]Loaded
In short, I sometimes (2 out of 6 times) don't get some of the news in the resulting stream.
What could be the problem?You have a race condition when subscribing the same channel to more than one topic.
When you close the source chan all-news
the go-loop created by pub
closes the topic channels when it’s done, which causes the go-loops created by mult
for each topic to close the subscribed channel by default. One of these mult go-loops will close russian-news
before the other, and that other could be in the middle of still putting values on russian-news
. It will silently fail and untap automatically.
@yonatanel thank you!
Would you please suggest, what is the proper way to subscribe a single channel to multiple topics?
I’d assume it’s a/mix
, but the whole thing starts looking really heavy-weight with multiple (4—5) intermediary buffers.
you could use a mult directly and tap with a channel using a filter transducer.
not sure if you'll run into similar races with that though
any particular reason to use offer!
instead of put!
?
@noisesmith I guess put!
without callback is equivalent to offer!
— I didn’t know the callback-less form existed. I usually use offer!/poll! in REPL for convenience.
it's different in that offer!
fails to put anything on the channel (without error) if there's no buffer space or immediate consumer
put!
has its own buffer
the reason I bring this up is that using offer silently drops your data if you don't account for that, I didn't look close but it can be a source of unexpected behavior for that reason
@noisesmith ah yes, that's the expected behavior of offer!
for me when I'm REPLing, it returns false
if the buffer is full, I believe.
In my case above, @yonatanel was 100% correct — indeed that was a race condition, the topic channel gets closed when input channel is closed, so in my case I sometimes end up not receiving all the messages.
You can subscribe two different channels and a/merge
them into one. The merged channel will be closed only after both the sources are closed.
Or subscribe without closing the destination channel
Thank you! I even have uses for both of the ways :)
Clojure 1.10.0
should I close every channel I open?
Probably not
is it an okay way to implement delayed retries (assuming fn
asynchronously puts one value to fn-ch
, and delays
is a seq of millis)?
(defn with-retries [fn fn-ch delays out-ch]
(fn fn-ch)
(async/go-loop [val (<! fn-ch)
delays delays]
(if (instance? Exception val)
(let [[delay & delays] delays]
(if delay
(do (<! (async/timeout delay))
(fn fn-ch)
(recur (<! fn-ch) delays))
(>! out-ch val)))
(>! out-ch val))))
Do you intend for the process to take another value from the channel while the previous is being timed out?
no, I know there will be at mose one value from fn-ch
Then it makes sense at a first glance
thanks for the review!
Closing a channel means you will not put any more values on it, and consumers know to stop working (when they take a nil value from the channel)
I see, thanks!
I was just worried if creating a channel makes it registered somewhere so it might lead to memory leaks
No, it doesn’t even release pending puts on the closed channel, that’s why it’s the responsibility of the producer thread to close the channel after it’s done + by default piping a channel into another channel will close the destination channel when the source is drained, causing a cascading automatic closing of all channels.
Was there any plan to differ in implementation between pipeline and pipeline-blocking? (they are identical atm afaik)
I guess that would be a question for @alexmiller
they used to be different, we decided blocking was not correct so atm they are the same. they may be modified to be different in the future
this was a recent change
makes sense. thx
I would recommend still picking the function that best reflects whether it is computation only or potentially blocking operations
does it use a separate thread pool or do they share one between blocking and "normal"?
pipeline and pipeline-blocking are the same and they both use async/thread internally (so the internal core.async cachedthreadpool). pipeline-async doesn't assume any execution context, it runs in go blocks
not sure what's "normal" 🙂
note that the pool used by thread
is different than the pool used by go
blocks
the former is a cached thread pool (grows without bound, reuses if possible, goes to sleep and dies if not used)
the latter is a fixed size thread pool
pipeline
or pipeline-blocking
will have concurrency up to N
yes, it's quite similar to using an executorservice in practice
since it's bound
it is literally using an executorservice
I mean per pipeline
it would be reasonable to use a fixed size pool for pipeline
instead as the tasks should be computation-only and not block
but the behavior would not be much different in practice
I am saying that because I used to be a proponent to have the ability to specify executor a bit everywhere in core async api, but with usage/time I am not sure it's really useful anymore, defaults are good
as long as the user knows what runs where
only downside I can think of now is we don't get nice thread names in logs 🙂, which is ok
and writing a variant of async/thread that takes an executor as arg takes ~10 lines
you'll get something like "async-thread-macro-1"
so it's somewhat nice
yes, sometimes you'd prefer names that indicates what it's responsible for, but that's ok, there are other places in logs to express that and enough information to figure it out
the issue with caching threads is that they can be more than one thing of course over time
the issue with naming you mean?
yeah
it is possible to change thread names dynamically (and this is a super sneaky debugging technique) but there are perf impacts
I guess you could cheat and use Thread/setName
at Run time, but yuk
yes
I've totally abused this for debugging :)
me too 🙂