I do not want to put the data into a coll first. The main reason why I use async is that the input data is huge and I need to run it 1 by 1 through a transducer that extracts only the small part I really need - only then will all fit into memory and I can use into
. And obviously I want to run the extraction transducer only on the data items, that's why I separate throwables into a separate channel.
Not sure how cat
applies here?
Thank you all for your contributions!
I think I misenterpreted by missing some context. I debug something like that with an atom where I use (swap! debug conj x)
, then I can easily play with the data in the repl even as the process is running
you don't need channel semantics to debug this, so using a channel just adds complexity - just swap the data in from the same code that handles the channel
you can combine the conj with random-sample logic to avoid excess memory usage while still seeing representative data
often a group-by
on the intermediate values in the atom will be helpful for debugging, also consider a tuple [timestamp ::code-context-keyword value]
because those are the two big differentiators when doing complex debugging of live data
where you derive the keyword as (defn my-broken-fn [blah] (swap! debug conj [(now) ::my-broken-fn {:args {:blah blah}}]) ...)
well, the input of the fn is a channel so I would need to wrap it with another channel with a transducer that puts stuff into the debug atom and use this wrapped channel further on, no?
right, the transducer would be a noop in terms of channel data, but it would have the side effect of putting the data in a growing mutable collection
(map (fn [x] (swap! debug conj {... ...}) x))
or even an agent since async doesn't hurt here
One thing you could do is to construct the resulting object {:result ... :error ...}
inside the reducing function. If it is a throwable
, you conj to :errors
else merge to :result
.
the reducing function does not see the errors for it only runs on the data-ch not the errors-ch. Or you mean not split the input ch in 2 like this? But I want to differentiate erros coming in and happening during reducing
Maybe something like this
(a/reduce
(fn [result curr]
(if (throwable? curr)
(update result :errors conj curr)
(update result :data extract-and-combine-data curr)))
{:data nil :errors nil}
data-ch)
I’ve put together a working snippet:
(defn new-error []
(ex-info "some error..." {:code 999}))
(defn new-data []
(let [k (rand-nth [:a :b :c :d :e :f])
v (rand-int 10)]
{k v}))
(defn emulate-load [max]
(->> (repeatedly #(if (> (rand) 0.05) (new-data) (new-error)))
(take max)))
(defn throwable? [x] (instance? Throwable x))
(let [from (async/chan)
to (async/reduce
(fn [result curr]
(if (throwable? curr)
(update result :errors conj curr)
(update result :data (partial merge-with +) curr)))
{:data nil :errors nil}
from)]
(async/onto-chan from (emulate-load 15))
(println (async/<!! to)))
thank you!