Hi,
I have general question, about consuming messages from Kafka topic, is it better to collect all messages into async channel and handle it by pure methods which take channel and process it and return new output channel and so on or what is the best approach to model that ?
I responded already in the other channel, but more specifically about core.async here - what specifically would core.async give you when processing kafka- what feature are you looking for?
I'd say use transducers instead of "pure" methods which take channel. Attach transducers to channels or pipelines.
also you'll need to decouple the interaction between kafka and core.async, either can introduce lags that break the other
hello, I just hit an issue while trying to use loop + try/catch, this is a code that breaks:
(defn x [some-chan]
(go
(loop []
(try
(when (<! some-chan)
(recur))
(catch :default e
(recur))))))
(reduced to make a point), but this code doesn't compile
it does the error
------ ERROR -------------------------------------------------------------------
File: /Users/wilkerlucio/Development/relemma/src/cljs/relemma/shared/logic/feature_flags.cljs:57:3
--------------------------------------------------------------------------------
54 | unlisten-cb))
55 |
56 | (defn x [some-chan]
57 | (go
---------^----------------------------------------------------------------------
Encountered error when macroexpanding cljs.core/cond.
IllegalArgumentException: No implementation of method: :emit-instruction of protocol: #'cljs.core.async.impl.ioc-macros/IEmittableInstruction found for class: cljs.core.async.impl.ioc_macros.Jmp
clojure.core/-cache-protocol-fn (core_deftype.clj:583)
clojure.core/-cache-protocol-fn (core_deftype.clj:575)
is there a way around it?
you can recur outside of the try
(defn x [some-chan]
(go
(loop []
(try
(a/<! some-chan))
(recur))))
@wilkerlucio recur doesn't work across try boundaries, whether core.async or not
user=> (loop [] (try (recur) (catch Exception e)))
Syntax error (UnsupportedOperationException) compiling recur at (REPL:1:15).
Cannot recur across try
yeah, but then when the channel is closed this would loop like crazy, right?
that behavior would be shared by the original
not really, the original has a when
for no errors, and no recur in that case
well my expanded not pasted version does too 🙂
oh, you guys are right, because of the reduced code
@wilkerlucio I don't see a when, and both the try and the catch recur to the same place
but on the original I had, there was a when aroudn
no worries.
snippet updated
you could just do an if-let and catch _ e nil
err, they would recur to the same place if recur across try was valid
which it isn't
got it working with this:
(defn x [some-chan]
(go
(let [continue* (atom true)]
(loop []
(try
(if (<! some-chan)
(do-something)
(reset! continue* false))
(catch :default e
(handle-error)))
(if @continue*
(recur))))))
for that sort of thing, I like promise
or delay
, which convey the extra semantic that it's a one way switch
in my case its a streaming thing
agreed for single hit a promise would be great
right, but nothing ever changes the atom to true - it's either not-done, or done
it does, (snippet updated)
when channel comes with nil
, it stops
I still don't see a case where it's assigned to true - you create a new one
not sure if I get it, the point is just that one message can come as an error, in this case I want to report and wait for the next message
the continue starts with true, but if the channel closes its set to false, and recur stops
the point is that you have created a one-way-switch - once created it only changes once, from true to false
yeah, that's when the channel gets closed, so nothing more to get from it
after closing there is nothing else to do
I'm not saying the atom is incorrect, I'm saying delay / promise are more specialized, and they are less error prone and more clear to a reader for that reason
it's a style suggestion not a bug fix
principle of least power
but how I could use that since this is a stream channel?
I expect multiple messages from it
it literally replaces the atom, you create it in the same place, instead of using @
as the check, you use realized?
, and instead of reset! you use deliver
or force
ah, gotcha, also I'm on CLJS land, you still thing would be better to use something else than atom there?
(let [done? (promise)] (if (not (realized? done)) (try ... (catch ... (deliver done? :error-state)))
oh, I'm forgetting cljs might not have those constructs
cljs appears to have delay
but not promise
in the random cljs repl I checked
then you could use (done (delay true))
then later (force done)