core-async

tianshu 2019-04-24T13:16:55.003300Z

Sometimes async throw a assert fail if put a nil on a chan.

Exception in thread "async-thread-macro-39" java.lang.AssertionError: Assert failed: (not (nil? itm))
but usually this error will give me no information about where it happens in my source code.

alexmiller 2019-04-24T15:19:58.005500Z

nils are invalid channel values. if you're putting a nil on a channel in a go block or thread, you need to have some try/catch handling around the puts to be able to catch those errors

alexmiller 2019-04-24T15:26:31.006Z

although it would be better to ensure you don't put it in the first place

Ivan Koz 2019-04-24T16:26:12.007600Z

from docs to pipeline

Because it is parallel, the transducer will be applied independently to each element, not across elements, and may produce zero or more outputs per input.
But how transducers can return multiple results?

Ivan Koz 2019-04-24T16:26:44.008600Z

transducer calls to other reducing function with result and value(one value)

alexmiller 2019-04-24T16:26:57.009100Z

It can do that more than once

alexmiller 2019-04-24T16:27:06.009700Z

Like mapcat

Ivan Koz 2019-04-24T16:27:09.009900Z

interesting

alexmiller 2019-04-24T16:28:17.012300Z

β€˜mapcat` actually does it with a nested reduce iirc

tianshu 2019-04-24T16:28:53.013100Z

I know I can't put a nil on a channel. but if this happens, how to find where the problem happened? is there any trick or I have to check one by one. (there're a lot usage in my code)

Ivan Koz 2019-04-24T16:29:17.013700Z

breakpoint on exception in cursive may help i guess, to examine threads stacks

Ivan Koz 2019-04-24T16:30:07.014400Z

dont you see the source of your nil value in stacktrace?

2019-04-24T16:32:32.014800Z

core.async stack traces are often far from helpful

2019-04-24T16:33:20.015700Z

because the method that actually called your code is calling a part of the state machine that core.async made from your go block, not any named object identifiable directly in your code

Ivan Koz 2019-04-24T16:34:17.016200Z

bad, some metadata on generated code must be helpful

robertfw 2019-04-24T16:37:02.018600Z

We have some chains of async blocks in our code, and I ended up writing a guarded-go macro and guarded-take! macro for our async bits. guarded go is a go block wrapped in a try/catch, if an exception occurs it will catch it and return it on the channel. for consuming those channels, guarded-take! will check if the value returns is an exception, and re-raise it after attaching on some information about the call location

robertfw 2019-04-24T16:37:27.019200Z

It's not the prettiest, definitely felt wrong writing it, but it has helped make exceptions easier to deal with in some of our more complex async code

robertfw 2019-04-24T16:37:55.019800Z

the exception eventually winds its way out to the caller and you can see exactly what path caused the issue

robertfw 2019-04-24T16:39:46.020400Z

(or in the case of something recoverable or that needs special handling, can be caught and dealt with)

tianshu 2019-04-24T16:41:24.021200Z

@robertfrederickwarner I think this works

tianshu 2019-04-24T16:42:07.021900Z

the full exception stack trace is

Exception in thread "async-thread-macro-15" java.lang.AssertionError: Assert failed: (not (nil? itm))
        at clojure.core.async.impl.protocols$add_BANG_.invokeStatic(protocols.clj:40)
        at clojure.core.async.impl.protocols$add_BANG_.invoke(protocols.clj:37)
        at clojure.core$map$fn__5864$fn__5865.invoke(core.clj:2742)
        at clojure.core.async.impl.channels$chan$fn__6842.invoke(channels.clj:300)
        at clojure.core.async.impl.channels.ManyToManyChannel.put_BANG_(channels.clj:143)
        at clojure.core.async$_GT__BANG__BANG_.invokeStatic(async.clj:143)
        at clojure.core.async$_GT__BANG__BANG_.invoke(async.clj:138)
        at clojure.core.async$pipeline_STAR_$process__11435.invoke(async.clj:492)
        at clojure.core.async$pipeline_STAR_$fn__11447.invoke(async.clj:507)
        at clojure.core.async$thread_call$fn__11358.invoke(async.clj:442)
        at clojure.lang.AFn.run(AFn.java:22)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
can I confirm it come from a pipeline-*?

robertfw 2019-04-24T16:44:40.022400Z

That would be my first guess, but I couldn't say for sure

tianshu 2019-04-24T16:46:50.022700Z

If pass the exception throw the async channel, what is the downside?

markmarkmark 2019-04-24T16:52:36.023400Z

@doglooksgood is the issue at line 2742 of your core.clj file?

markmarkmark 2019-04-24T16:53:25.023600Z

ah, nevermind

markmarkmark 2019-04-24T16:53:30.023800Z

that's in clojure's core.clj

tianshu 2019-04-24T16:55:57.024500Z

πŸ˜…

markmarkmark 2019-04-24T16:57:21.025300Z

I do think that the issue is happening in a pipeline, in which case you aren't going to be able to write any replacement macros that handle the case

robertfw 2019-04-24T16:57:47.025400Z

You'll need to check for exceptions manually at the other side; they won't be thrown automatically. That's why I pair using my guarded-go with a guarded-take!

tianshu 2019-04-24T17:00:48.027Z

is there a elegant way to deal with the possible exceptions with core.async. I don't want to try/catch everywhere. I can write the predicate but sometimes it may not cover the case.

Ivan Koz 2019-04-24T17:01:21.027600Z

always return default value?

tianshu 2019-04-24T17:03:16.028300Z

maybe it's not a good way I think

2019-04-24T17:06:18.029Z

the elegant way is to not put nils on channels

2019-04-24T17:07:30.030Z

in general, if you have something that returns "nil" it is likely some io operation which people insist in putting in transducers on pipelines for some god awful reason

2019-04-24T17:08:06.030500Z

like, if you don't consume and produce meaningful values, don't put it in the middle of a pipeline

2019-04-24T17:08:50.031700Z

I know some company wrote some terrible blog post that did that and it made the front page of various aggregators, but it is bad

tianshu 2019-04-24T17:11:25.034300Z

that's right, I should rewrite some pipeline-blocking

2019-04-24T17:12:32.035900Z

or just, you know, use an executor

robertfw 2019-04-24T17:15:10.038700Z

yup. ultimately the exception stuff I describe is our last ditch "something went wrong" handler to ensure that we return a valid response (this is for a web service). in "regular" usage, it shouldn't be hit, only when something goes terribly wrong. I wouldn't use it as a core building block

Ivan Koz 2019-04-24T17:16:28.039200Z

i'm looking if its even possible to create debouncing transducer which drops any element except last one in a period of time. Looks like there is no way to deliver value when timeout was hit, because we need one more transducer call for that, which may never happen for a channel. Any ideas?

2019-04-24T17:17:50.039800Z

that is not a transducer

2019-04-24T17:18:15.040500Z

that is a special case of copying from channel A to channel B

Ivan Koz 2019-04-24T17:19:52.042Z

yeah due to sequential nature of transducers and abstracted iteration, i don't think its possible

Ivan Koz 2019-04-24T17:20:29.042800Z

channel to channel copying is easy tho

2019-04-24T17:20:43.043200Z

core.async is largely about writing machines, not data processing, transducers are for data processing, there for if you are reaching for transducers writing a core.async program, the odds are pretty good that you are making a mistake

Ivan Koz 2019-04-24T17:21:22.043500Z

data processing machines

Ivan Koz 2019-04-24T17:21:24.043800Z

excuse me

alexmiller 2019-04-24T17:21:28.043900Z

that makes no sense re transducers

alexmiller 2019-04-24T17:22:08.044800Z

one of the major reasons transducers exist is core.async

2019-04-24T17:22:59.045600Z

sure

2019-04-24T17:25:26.048100Z

but if you are writing a little stateful machines, there is very little demand for sticking functional data processing (map, filter, etc) in the middle of the queues connecting those machines. And it is super weird to stick io operations in the middle of the queues connecting those machines.

2019-04-24T17:25:48.048600Z

if you are using core.async as a substrate for parallel data processing then it does make sense

Ivan Koz 2019-04-24T17:26:10.049400Z

you write data processing code, core async makes it state machine i think you missing that part

2019-04-24T17:26:46.050200Z

so it depends on which type of core.async usage you have, and I would argue the more compelling reason to use core.async is for the machines, not parallel data processing

2019-04-24T17:27:03.050600Z

I am well aware of the go macro and the state machine transform

2019-04-24T17:27:20.051100Z

yeah, there's other stuff for clojure that is much better at parallelizing things

2019-04-24T17:28:06.052100Z

the state machine transform the go macro does is largely about stateful machines processing queues (channels)

2019-04-24T17:29:55.052900Z

and there are even programs that do both kinds of core.async usage, and pipelines are, in my mind, basically the interface between those two kinds of core.async programs

tianshu 2019-04-24T17:36:10.057500Z

I found my problem, thanks to mount I can start my program partly. 😎

2019-04-24T17:36:31.057800Z

(I have opinions about mount too)

tianshu 2019-04-24T17:38:12.059500Z

it took me about 30 mins to find out where the exception come from.

Ajay 2019-04-24T17:39:30.060600Z

hi, I have a use case where I am working with 30-40 items, for each item I need to make 3 http calls (each http call returns a piece of information about the item), and then at the end I want an aggregated results, so the number of http calls will be between 90-120, I am using pipeline-async with 3 stages, although it works I get javax.net.ssl.SSLException: SSLEngine is closing/closed or timeout exception for some calls, I want to limit the number of http calls that are in progress at any point, is it possible here?

robertfw 2019-04-24T17:39:43.060700Z

Would love to hear them. We considered using it on our project but went with component instead

Ajay 2019-04-24T17:40:29.060900Z

here is my setup

tianshu 2019-04-24T17:41:47.062200Z

maybe you can try pipeline-blocking with a connection-pool of http client?

robertfw 2019-04-24T17:42:19.062400Z

but this is my first big clojure project, so trying to sponge up as much as I can these days

Ajay 2019-04-24T17:54:37.066900Z

Ok, what is the difference between pipeline-async and pipeline-blocking when applied to above setup? Will pipeline-async trigger 1st call, then trigger next call without waiting for result, and pipeline-blocking trigger at most n calls (as specified in concurrency parameter)?

2019-04-24T17:54:44.067100Z

pipeline-async because of its construction barely limits anything in flight

2019-04-24T17:55:08.067700Z

so if you need to limit the number of external calls inflight pipeline-async isn't a great choice

2019-04-24T17:57:22.068300Z

I would start with creating an executor with a fixed thread count, and making sure all your http requests happen on that executor

2019-04-24T17:58:33.069700Z

then no matter what the code that makes http requests looks like (pipelines, different threads, etc) you can always limit the number in flight based on the number of threads available to the executor

2019-04-24T17:59:17.070100Z

mount encourages reliance on global singletons which is not a good way to structure a program

Ajay 2019-04-24T17:59:55.070600Z

Ok, I think I will try pipeline-blocking first before going to the executor level

Ajay 2019-04-24T18:01:13.070900Z

thanks for ideas! πŸ‘:skin-tone-3:

robertfw 2019-04-24T18:04:19.071Z

that was our conclusion as well. we went with component as it seemed to be a more manageable approach, plus being able to run multiple systems concurrently meant nice isolation for repl/test

Ivan Koz 2019-04-24T20:25:03.073900Z

Made a debouncer using just core.async, not even sure if it's correct. Clearly it can be done using standalone resetable timer instead of deliver function. Would be nice to get some feedback.

(defn debounce
  ([c ms] (debounce c (chan) ms))
  ([c c' ms]
   (let* [vkey (volatile! nil)
          deliver (fn [key val]
                    (go (<! (timeout ms))
                        (when (= key @vkey)
                          (>! c' val))))]
     (go-loop []
       (let [val (<! c)
             key (vreset! vkey (keyword (gensym "id_")))]
         (deliver key val))
       (recur)))
   c'))

2019-04-24T20:31:07.074700Z

I don't even know what you are doing

2019-04-24T20:31:38.075300Z

the volatile thing is weird, the gensym keyword is weird

Ivan Koz 2019-04-24T20:32:25.075900Z

something like that

2019-04-24T20:34:27.076400Z

(defn debounce [in out t]
  (async/go
    (let [msg (async/<! in)]
      (if (some? msg)
        (if (async/>! out msg)
          (let [to (async/timeout t)]
            (loop []
              (async/alt! to
                          ([_]
                           (debounce in out t))
                          in
                          ([msg]
                           (if (some? msg)
                             (recur)
                             (async/close! out))))))
          (async/close! in))
        (async/close! out)))))

2019-04-24T20:37:52.076800Z

(never actually ran that or pasted it in a repl, so ymmv)

Ivan Koz 2019-04-24T20:38:50.077700Z

yeah thanks for example, looking into it but i need some feedback on my implementation

2019-04-24T20:40:01.078300Z

volatile in core.async seems like a bad idea

Ivan Koz 2019-04-24T20:40:27.078700Z

even considering vreset?

2019-04-24T20:40:38.079100Z

the whole thing is just weird

2019-04-24T20:41:33.080100Z

creating go blocks without waiting for them

2019-04-24T20:42:11.081Z

@nxtk volatiles are faster than atoms because do nothing to ensure values stay in synch between threads, core.async go guarantees you can't control what thread that code is running in

2019-04-24T20:42:24.081400Z

at the very least I would use (Object.) instead of that keyword gensym thing

Ivan Koz 2019-04-24T20:43:36.082100Z

@noisesmith arent (go) about single threaded CSP?

Ivan Koz 2019-04-24T20:44:11.083Z

i just started with coreasync this morning, too much information for one day, may be missing some pieces

2019-04-24T20:44:13.083100Z

on the JVM, go uses a thread pool, on JS there's no volatile! iirc

2019-04-24T20:44:26.083400Z

it's small, but it's usually going to be more than one thread

Ivan Koz 2019-04-24T20:44:52.083700Z

so one thread is only cljs specific of go?

2019-04-24T20:45:04.084Z

right

Ivan Koz 2019-04-24T20:46:01.084200Z

yeah then my solution is not optimal

2019-04-24T20:49:10.085Z

you are also generating timeouts for everything that goes through, regardless of it will be ignored or not

2019-04-24T20:49:28.085700Z

all the callbacks that go blocks turn in to

2019-04-24T20:49:35.086Z

all of which are supposed to be relatively cheap

2019-04-24T20:50:24.086900Z

but timeouts, for example, are globals, so if you create a timeout and then wait on consuming a value from it, that isn't eligible for gc until the timeout ends

Ivan Koz 2019-04-24T20:59:35.088500Z

thanks guys