core-async

2020-10-23T16:01:34.088900Z

so, I've been moving some of my design around based on the stateful transducer issue talked about above https://clojurians.slack.com/archives/C05423W6H/p1603202921084800 Does the following seem like a reasonable example? I've often got a large-ish collection of compressed data files (represented by the take 20 range here that I want to do some series of mapping, grouping, reducing, mapping again. Is this a reasonable set up?

(let [inc-chan    (a/chan 32 (map inc))
        reduce-eo   (a/reduce (fn [acc n] (if (even? n)
                                            (update acc :even conj n)
                                            (update acc :odd conj n)))
                              {:even []
                               :odd  []}
                              inc-chan)
        mapcat-chan (a/chan 8 (comp
                               (mapcat (fn [m] (map identity m)))
                               (mapcat (fn [[num-type nums]]
                                         (println "Numtype: " num-type " Nums: " nums)
                                         (into []
                                               (map (fn [n] [num-type n]))
                                               nums)))
                               (map (fn [[numtype n]] [numtype (inc n)]))))
        _           (a/pipe reduce-eo mapcat-chan)
        last-reduce (a/reduce (fn [acc [numtype n]]
                                (println "Numtype: " numtype " Num: " n)
                                (if (= :odd numtype)
                                  (update acc :used-to-be-odd conj n)
                                  (update acc :used-to-be-even conj n)))
                              {:used-to-be-even []
                               :used-to-be-odd  []}
                              mapcat-chan)]
    ;; Fire in events
    (a/onto-chan inc-chan (take 20 (range)))
    ;; collect events
    (a/<!! last-reduce))

2020-10-23T16:02:25.089700Z

I like using the higher order core.async function rather than doing something in a go block, unless I need to keep a look up table up to date or similar

2020-10-23T16:04:21.091Z

this pattern of setting up the "machinery" in a let, followed by firing in the events, followed by retrieving the results from the channels is a pattern I'm following as well that seems to be working for me, but I don't know if there is a better way for this kind of problem

2020-10-23T16:04:47.091500Z

I'm mostly using this for embarrassingly parallel problems so the pipe would be replaced with some kind of pipeline for a lot of things

alexmiller 2020-10-23T16:05:25.091800Z

is this different than a/pipeline?

2020-10-23T16:06:01.092200Z

the overall thing? or just the use of a/pipe?

2020-10-23T16:06:21.092700Z

I didn't think I could have a reducing stage in the middle of a/pipeline

2020-10-23T16:07:06.093300Z

this is a toy example of the kind of data processing I'd like to do

2020-10-23T16:08:26.094600Z

read in the records from a file, do scrubbing on each record, gather them up by some key (user id say), then process each data item grouped by a key and grab the results to put into some kind of output (csv, excel, database, api)

2020-10-23T16:10:51.095600Z

I've not found good examples of people chaining together the higher level core.async things (my google-fu has failed me)

2020-10-23T16:11:14.096100Z

and I'm just wondering if I'm using the library in a way it wasn't really mean to be

alexmiller 2020-10-23T16:14:19.096600Z

Clojure Applied has a chapter on this but I think the key is - make components that are given the channels to use

alexmiller 2020-10-23T16:15:11.097400Z

then whatever is assembling the units can make channels and hand it as an input to one and an output to another (and you don't even need pipe then)

alexmiller 2020-10-23T16:15:58.097700Z

it also lets the assembler choose buffer policies and sizes etc

2020-10-23T16:17:34.098500Z

yeah, I think the pipe in here is a bit gratuitous looking at it again. That would probably be where I'd put in a a/pipeline if that was appropriate

2020-10-23T16:17:52.098900Z

I'll have a look again at the Clojure Applied chapter

2020-10-23T16:18:00.099300Z

re-reading it with that in mind should help

2020-10-23T16:18:01.099500Z

thx 🙂

alexmiller 2020-10-23T16:18:10.099800Z

I think it's ch 6 but I don't have it in front of me

2020-10-23T16:18:59.100100Z

I'm looking about for my dead tree version. I might have to go to safari

alexmiller 2020-10-23T16:20:50.100500Z

I assume you mean Programming Clojure ;)

2020-10-23T16:21:22.101Z

I've only got one copy of that

2020-10-23T16:22:42.101800Z

I think I gave away my dead tree copy of Clojure Applied (unless it is in another room). I push it at people who want to go from noodling at the repl to actually building things

2020-10-23T16:22:47.102Z

it is the best book for that

2020-10-23T16:25:41.102800Z

the authors of all of them are really friendly

2020-10-23T16:27:02.103100Z

mmm... vertical slices

2020-10-23T16:30:45.104Z

sounds then that things like a/mult should mostly live in the assembler namespaces

2020-10-23T16:32:37.105300Z

If I had a component doing a reduce/transduce/into then the component would have a pipe at the end to put the data on the passed in output channel I suppose (as those functions don't take output channels, but just return them

2020-10-23T16:34:22.106200Z

@alexmiller thx for this. This makes me think that a little assembly function I wrote for some sugar is almost completely a bad idea

2020-10-23T16:34:41.106700Z

I was thinking that was true before, but didn't have a good way of thinking how things should be assembled