core-async

Jakub Holý 2019-10-24T08:37:00.043200Z

I do not want to put the data into a coll first. The main reason why I use async is that the input data is huge and I need to run it 1 by 1 through a transducer that extracts only the small part I really need - only then will all fit into memory and I can use into. And obviously I want to run the extraction transducer only on the data items, that's why I separate throwables into a separate channel. Not sure how cat applies here? Thank you all for your contributions!

2019-10-24T16:48:46.044900Z

I think I misenterpreted by missing some context. I debug something like that with an atom where I use (swap! debug conj x), then I can easily play with the data in the repl even as the process is running

2019-10-24T16:49:10.045500Z

you don't need channel semantics to debug this, so using a channel just adds complexity - just swap the data in from the same code that handles the channel

2019-10-24T16:49:44.046300Z

you can combine the conj with random-sample logic to avoid excess memory usage while still seeing representative data

2019-10-24T16:50:51.047300Z

often a group-by on the intermediate values in the atom will be helpful for debugging, also consider a tuple [timestamp ::code-context-keyword value]

2019-10-24T16:51:10.047700Z

because those are the two big differentiators when doing complex debugging of live data

2019-10-24T16:52:07.048800Z

where you derive the keyword as (defn my-broken-fn [blah] (swap! debug conj [(now) ::my-broken-fn {:args {:blah blah}}]) ...)

Jakub Holý 2019-10-25T09:20:45.051800Z

well, the input of the fn is a channel so I would need to wrap it with another channel with a transducer that puts stuff into the debug atom and use this wrapped channel further on, no?

2019-10-25T17:27:26.052200Z

right, the transducer would be a noop in terms of channel data, but it would have the side effect of putting the data in a growing mutable collection

2019-10-25T17:27:59.052400Z

(map (fn [x] (swap! debug conj {... ...}) x))

2019-10-25T17:28:15.052600Z

or even an agent since async doesn't hurt here

👍 1
joaohgomes 2019-10-24T20:36:02.051700Z

One thing you could do is to construct the resulting object {:result ... :error ...} inside the reducing function. If it is a throwable, you conj to :errors else merge to :result.

Jakub Holý 2019-10-25T09:21:41.052Z

the reducing function does not see the errors for it only runs on the data-ch not the errors-ch. Or you mean not split the input ch in 2 like this? But I want to differentiate erros coming in and happening during reducing

joaohgomes 2019-10-25T17:51:00.052800Z

Maybe something like this

(a/reduce
    (fn [result curr]
      (if (throwable? curr)
        (update result :errors conj curr)
        (update result :data extract-and-combine-data curr)))
    {:data nil :errors nil}
    data-ch)

joaohgomes 2019-10-25T18:45:22.054500Z

I’ve put together a working snippet:

(defn new-error [] 
  (ex-info "some error..." {:code 999}))

(defn new-data []
  (let [k (rand-nth [:a :b :c :d :e :f])
        v (rand-int 10)]
    {k v}))
    
(defn emulate-load [max]
  (->> (repeatedly #(if (> (rand) 0.05) (new-data) (new-error)))        
       (take max)))

(defn throwable? [x] (instance? Throwable x))

(let [from (async/chan)
      to (async/reduce
           (fn [result curr]
             (if (throwable? curr)
               (update result :errors conj curr)
               (update result :data (partial merge-with +) curr)))
           {:data nil :errors nil}
           from)]
  (async/onto-chan from (emulate-load 15))
  (println (async/<!! to)))

Jakub Holý 2019-10-26T06:51:25.066100Z

thank you!

👍 1