core-async

abdullahibra 2020-04-29T19:33:05.118Z

Hi,

abdullahibra 2020-04-29T19:33:09.118100Z

I have general question, about consuming messages from Kafka topic, is it better to collect all messages into async channel and handle it by pure methods which take channel and process it and return new output channel and so on or what is the best approach to model that ?

2020-04-29T19:43:54.118800Z

I responded already in the other channel, but more specifically about core.async here - what specifically would core.async give you when processing kafka- what feature are you looking for?

nikolavojicic 2020-04-29T19:52:49.119900Z

I'd say use transducers instead of "pure" methods which take channel. Attach transducers to channels or pipelines.

2020-04-29T20:21:54.120900Z

also you'll need to decouple the interaction between kafka and core.async, either can introduce lags that break the other

wilkerlucio 2020-04-29T20:21:58.121100Z

hello, I just hit an issue while trying to use loop + try/catch, this is a code that breaks:

(defn x [some-chan]
  (go
    (loop []
      (try
        (when (<! some-chan)
          (recur))
        (catch :default e
          (recur))))))

wilkerlucio 2020-04-29T20:22:12.121400Z

(reduced to make a point), but this code doesn't compile

wilkerlucio 2020-04-29T20:22:24.121800Z

it does the error

------ ERROR -------------------------------------------------------------------
 File: /Users/wilkerlucio/Development/relemma/src/cljs/relemma/shared/logic/feature_flags.cljs:57:3
--------------------------------------------------------------------------------
  54 |     unlisten-cb))
  55 | 
  56 | (defn x [some-chan]
  57 |   (go
---------^----------------------------------------------------------------------
Encountered error when macroexpanding cljs.core/cond.
IllegalArgumentException: No implementation of method: :emit-instruction of protocol: #'cljs.core.async.impl.ioc-macros/IEmittableInstruction found for class: cljs.core.async.impl.ioc_macros.Jmp
	clojure.core/-cache-protocol-fn (core_deftype.clj:583)
	clojure.core/-cache-protocol-fn (core_deftype.clj:575)

wilkerlucio 2020-04-29T20:22:32.122Z

is there a way around it?

dpsutton 2020-04-29T20:25:31.122300Z

you can recur outside of the try

dpsutton 2020-04-29T20:25:34.122400Z

(defn x [some-chan]
  (go
    (loop []
      (try
        (a/<! some-chan))
      (recur))))

ghadi 2020-04-29T20:32:20.123200Z

@wilkerlucio recur doesn't work across try boundaries, whether core.async or not

ghadi 2020-04-29T20:32:24.123400Z

user=> (loop [] (try (recur) (catch Exception e)))
Syntax error (UnsupportedOperationException) compiling recur at (REPL:1:15).
Cannot recur across try

wilkerlucio 2020-04-29T20:33:03.123500Z

yeah, but then when the channel is closed this would loop like crazy, right?

2020-04-29T20:33:52.123700Z

that behavior would be shared by the original

wilkerlucio 2020-04-29T20:34:54.123900Z

not really, the original has a when for no errors, and no recur in that case

dpsutton 2020-04-29T20:35:07.124100Z

well my expanded not pasted version does too 🙂

wilkerlucio 2020-04-29T20:35:23.124300Z

oh, you guys are right, because of the reduced code

2020-04-29T20:35:28.124500Z

@wilkerlucio I don't see a when, and both the try and the catch recur to the same place

wilkerlucio 2020-04-29T20:35:31.124700Z

but on the original I had, there was a when aroudn

dpsutton 2020-04-29T20:35:34.124900Z

no worries.

wilkerlucio 2020-04-29T20:35:48.125200Z

snippet updated

dpsutton 2020-04-29T20:35:54.125400Z

you could just do an if-let and catch _ e nil

2020-04-29T20:35:55.125600Z

err, they would recur to the same place if recur across try was valid

2020-04-29T20:36:03.125800Z

which it isn't

wilkerlucio 2020-04-29T20:37:04.126Z

got it working with this:

(defn x [some-chan]
  (go
    (let [continue* (atom true)]
     (loop []
       (try
         (if (<! some-chan)
           (do-something)
           (reset! continue* false))
         (catch :default e
           (handle-error)))
       (if @continue*
         (recur))))))

2020-04-29T20:38:01.126300Z

for that sort of thing, I like promise or delay, which convey the extra semantic that it's a one way switch

wilkerlucio 2020-04-29T20:38:19.126500Z

in my case its a streaming thing

wilkerlucio 2020-04-29T20:38:36.126700Z

agreed for single hit a promise would be great

2020-04-29T20:38:43.127Z

right, but nothing ever changes the atom to true - it's either not-done, or done

wilkerlucio 2020-04-29T20:38:59.127200Z

it does, (snippet updated)

wilkerlucio 2020-04-29T20:39:09.127400Z

when channel comes with nil, it stops

2020-04-29T20:39:40.127600Z

I still don't see a case where it's assigned to true - you create a new one

wilkerlucio 2020-04-29T20:40:13.127800Z

not sure if I get it, the point is just that one message can come as an error, in this case I want to report and wait for the next message

wilkerlucio 2020-04-29T20:40:28.128Z

the continue starts with true, but if the channel closes its set to false, and recur stops

2020-04-29T20:40:53.128200Z

the point is that you have created a one-way-switch - once created it only changes once, from true to false

wilkerlucio 2020-04-29T20:41:09.128400Z

yeah, that's when the channel gets closed, so nothing more to get from it

wilkerlucio 2020-04-29T20:41:30.128700Z

after closing there is nothing else to do

2020-04-29T20:41:44.128900Z

I'm not saying the atom is incorrect, I'm saying delay / promise are more specialized, and they are less error prone and more clear to a reader for that reason

2020-04-29T20:42:01.129100Z

it's a style suggestion not a bug fix

2020-04-29T20:42:33.129300Z

principle of least power

wilkerlucio 2020-04-29T20:42:41.129500Z

but how I could use that since this is a stream channel?

wilkerlucio 2020-04-29T20:42:47.129700Z

I expect multiple messages from it

2020-04-29T20:43:35.129900Z

it literally replaces the atom, you create it in the same place, instead of using @ as the check, you use realized?, and instead of reset! you use deliver or force

wilkerlucio 2020-04-29T20:44:33.130100Z

ah, gotcha, also I'm on CLJS land, you still thing would be better to use something else than atom there?

2020-04-29T20:44:37.130300Z

(let [done? (promise)] (if (not (realized? done)) (try ... (catch ... (deliver done? :error-state)))

2020-04-29T20:44:53.130500Z

oh, I'm forgetting cljs might not have those constructs

2020-04-29T20:45:45.130700Z

cljs appears to have delay but not promise in the random cljs repl I checked

2020-04-29T20:47:54.130900Z

then you could use (done (delay true)) then later (force done)