Sometimes async throw a assert fail if put a nil on a chan.
Exception in thread "async-thread-macro-39" java.lang.AssertionError: Assert failed: (not (nil? itm))
but usually this error will give me no information about where it happens in my source code.nils are invalid channel values. if you're putting a nil on a channel in a go block or thread, you need to have some try/catch handling around the puts to be able to catch those errors
although it would be better to ensure you don't put it in the first place
from docs to pipeline
Because it is parallel, the transducer will be applied independently to each element, not across elements, and may produce zero or more outputs per input.
But how transducers can return multiple results?transducer calls to other reducing function with result and value(one value)
It can do that more than once
Like mapcat
interesting
βmapcat` actually does it with a nested reduce iirc
I know I can't put a nil on a channel. but if this happens, how to find where the problem happened? is there any trick or I have to check one by one. (there're a lot usage in my code)
breakpoint on exception in cursive may help i guess, to examine threads stacks
dont you see the source of your nil value in stacktrace?
core.async stack traces are often far from helpful
because the method that actually called your code is calling a part of the state machine that core.async made from your go block, not any named object identifiable directly in your code
bad, some metadata on generated code must be helpful
We have some chains of async blocks in our code, and I ended up writing a guarded-go
macro and guarded-take!
macro for our async bits. guarded go is a go
block wrapped in a try/catch, if an exception occurs it will catch it and return it on the channel. for consuming those channels, guarded-take!
will check if the value returns is an exception, and re-raise it after attaching on some information about the call location
It's not the prettiest, definitely felt wrong writing it, but it has helped make exceptions easier to deal with in some of our more complex async code
the exception eventually winds its way out to the caller and you can see exactly what path caused the issue
(or in the case of something recoverable or that needs special handling, can be caught and dealt with)
@robertfrederickwarner I think this works
the full exception stack trace is
Exception in thread "async-thread-macro-15" java.lang.AssertionError: Assert failed: (not (nil? itm))
at clojure.core.async.impl.protocols$add_BANG_.invokeStatic(protocols.clj:40)
at clojure.core.async.impl.protocols$add_BANG_.invoke(protocols.clj:37)
at clojure.core$map$fn__5864$fn__5865.invoke(core.clj:2742)
at clojure.core.async.impl.channels$chan$fn__6842.invoke(channels.clj:300)
at clojure.core.async.impl.channels.ManyToManyChannel.put_BANG_(channels.clj:143)
at clojure.core.async$_GT__BANG__BANG_.invokeStatic(async.clj:143)
at clojure.core.async$_GT__BANG__BANG_.invoke(async.clj:138)
at clojure.core.async$pipeline_STAR_$process__11435.invoke(async.clj:492)
at clojure.core.async$pipeline_STAR_$fn__11447.invoke(async.clj:507)
at clojure.core.async$thread_call$fn__11358.invoke(async.clj:442)
at clojure.lang.AFn.run(AFn.java:22)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
can I confirm it come from a pipeline-*
?That would be my first guess, but I couldn't say for sure
If pass the exception throw the async channel, what is the downside?
@doglooksgood is the issue at line 2742 of your core.clj file?
ah, nevermind
that's in clojure's core.clj
π
I do think that the issue is happening in a pipeline, in which case you aren't going to be able to write any replacement macros that handle the case
You'll need to check for exceptions manually at the other side; they won't be thrown automatically. That's why I pair using my guarded-go
with a guarded-take!
is there a elegant way to deal with the possible exceptions with core.async. I don't want to try/catch everywhere. I can write the predicate but sometimes it may not cover the case.
always return default value?
maybe it's not a good way I think
the elegant way is to not put nils on channels
in general, if you have something that returns "nil" it is likely some io operation which people insist in putting in transducers on pipelines for some god awful reason
like, if you don't consume and produce meaningful values, don't put it in the middle of a pipeline
I know some company wrote some terrible blog post that did that and it made the front page of various aggregators, but it is bad
that's right, I should rewrite some pipeline-blocking
or just, you know, use an executor
yup. ultimately the exception stuff I describe is our last ditch "something went wrong" handler to ensure that we return a valid response (this is for a web service). in "regular" usage, it shouldn't be hit, only when something goes terribly wrong. I wouldn't use it as a core building block
i'm looking if its even possible to create debouncing transducer which drops any element except last one in a period of time. Looks like there is no way to deliver value when timeout was hit, because we need one more transducer call for that, which may never happen for a channel. Any ideas?
that is not a transducer
that is a special case of copying from channel A to channel B
yeah due to sequential nature of transducers and abstracted iteration, i don't think its possible
channel to channel copying is easy tho
core.async is largely about writing machines, not data processing, transducers are for data processing, there for if you are reaching for transducers writing a core.async program, the odds are pretty good that you are making a mistake
data processing machines
excuse me
that makes no sense re transducers
one of the major reasons transducers exist is core.async
sure
but if you are writing a little stateful machines, there is very little demand for sticking functional data processing (map, filter, etc) in the middle of the queues connecting those machines. And it is super weird to stick io operations in the middle of the queues connecting those machines.
if you are using core.async as a substrate for parallel data processing then it does make sense
you write data processing code, core async makes it state machine i think you missing that part
so it depends on which type of core.async usage you have, and I would argue the more compelling reason to use core.async is for the machines, not parallel data processing
I am well aware of the go macro and the state machine transform
yeah, there's other stuff for clojure that is much better at parallelizing things
the state machine transform the go macro does is largely about stateful machines processing queues (channels)
and there are even programs that do both kinds of core.async usage, and pipelines are, in my mind, basically the interface between those two kinds of core.async programs
I found my problem, thanks to mount I can start my program partly. π
(I have opinions about mount too)
it took me about 30 mins to find out where the exception come from.
hi, I have a use case where I am working with 30-40 items, for each item I need to make 3 http calls (each http call returns a piece of information about the item), and then at the end I want an aggregated results, so the number of http calls will be between 90-120, I am using pipeline-async with 3 stages, although it works I get javax.net.ssl.SSLException: SSLEngine is closing/closed or timeout exception for some calls, I want to limit the number of http calls that are in progress at any point, is it possible here?
Would love to hear them. We considered using it on our project but went with component instead
here is my setup
maybe you can try pipeline-blocking with a connection-pool of http client?
but this is my first big clojure project, so trying to sponge up as much as I can these days
Ok, what is the difference between pipeline-async and pipeline-blocking when applied to above setup? Will pipeline-async trigger 1st call, then trigger next call without waiting for result, and pipeline-blocking trigger at most n calls (as specified in concurrency parameter)?
pipeline-async because of its construction barely limits anything in flight
so if you need to limit the number of external calls inflight pipeline-async isn't a great choice
I would start with creating an executor with a fixed thread count, and making sure all your http requests happen on that executor
then no matter what the code that makes http requests looks like (pipelines, different threads, etc) you can always limit the number in flight based on the number of threads available to the executor
mount encourages reliance on global singletons which is not a good way to structure a program
Ok, I think I will try pipeline-blocking first before going to the executor level
thanks for ideas! π:skin-tone-3:
that was our conclusion as well. we went with component as it seemed to be a more manageable approach, plus being able to run multiple systems concurrently meant nice isolation for repl/test
Made a debouncer using just core.async, not even sure if it's correct. Clearly it can be done using standalone resetable timer instead of deliver function. Would be nice to get some feedback.
(defn debounce
([c ms] (debounce c (chan) ms))
([c c' ms]
(let* [vkey (volatile! nil)
deliver (fn [key val]
(go (<! (timeout ms))
(when (= key @vkey)
(>! c' val))))]
(go-loop []
(let [val (<! c)
key (vreset! vkey (keyword (gensym "id_")))]
(deliver key val))
(recur)))
c'))
I don't even know what you are doing
the volatile thing is weird, the gensym keyword is weird
http://benalman.com/code/projects/jquery-throttle-debounce/examples/debounce/
something like that
(defn debounce [in out t]
(async/go
(let [msg (async/<! in)]
(if (some? msg)
(if (async/>! out msg)
(let [to (async/timeout t)]
(loop []
(async/alt! to
([_]
(debounce in out t))
in
([msg]
(if (some? msg)
(recur)
(async/close! out))))))
(async/close! in))
(async/close! out)))))
(never actually ran that or pasted it in a repl, so ymmv)
yeah thanks for example, looking into it but i need some feedback on my implementation
volatile in core.async seems like a bad idea
even considering vreset?
the whole thing is just weird
creating go blocks without waiting for them
@nxtk volatiles are faster than atoms because do nothing to ensure values stay in synch between threads, core.async go guarantees you can't control what thread that code is running in
at the very least I would use (Object.)
instead of that keyword gensym thing
@noisesmith arent (go) about single threaded CSP?
i just started with coreasync this morning, too much information for one day, may be missing some pieces
on the JVM, go uses a thread pool, on JS there's no volatile! iirc
it's small, but it's usually going to be more than one thread
so one thread is only cljs specific of go?
right
yeah then my solution is not optimal
you are also generating timeouts for everything that goes through, regardless of it will be ignored or not
all the callbacks that go blocks turn in to
all of which are supposed to be relatively cheap
but timeouts, for example, are globals, so if you create a timeout and then wait on consuming a value from it, that isn't eligible for gc until the timeout ends
thanks guys