so, I've been moving some of my design around based on the stateful transducer issue talked about above https://clojurians.slack.com/archives/C05423W6H/p1603202921084800 Does the following seem like a reasonable example? I've often got a large-ish collection of compressed data files (represented by the take 20 range here that I want to do some series of mapping, grouping, reducing, mapping again. Is this a reasonable set up?
(let [inc-chan (a/chan 32 (map inc))
reduce-eo (a/reduce (fn [acc n] (if (even? n)
(update acc :even conj n)
(update acc :odd conj n)))
{:even []
:odd []}
inc-chan)
mapcat-chan (a/chan 8 (comp
(mapcat (fn [m] (map identity m)))
(mapcat (fn [[num-type nums]]
(println "Numtype: " num-type " Nums: " nums)
(into []
(map (fn [n] [num-type n]))
nums)))
(map (fn [[numtype n]] [numtype (inc n)]))))
_ (a/pipe reduce-eo mapcat-chan)
last-reduce (a/reduce (fn [acc [numtype n]]
(println "Numtype: " numtype " Num: " n)
(if (= :odd numtype)
(update acc :used-to-be-odd conj n)
(update acc :used-to-be-even conj n)))
{:used-to-be-even []
:used-to-be-odd []}
mapcat-chan)]
;; Fire in events
(a/onto-chan inc-chan (take 20 (range)))
;; collect events
(a/<!! last-reduce))
I like using the higher order core.async function rather than doing something in a go block, unless I need to keep a look up table up to date or similar
this pattern of setting up the "machinery" in a let, followed by firing in the events, followed by retrieving the results from the channels is a pattern I'm following as well that seems to be working for me, but I don't know if there is a better way for this kind of problem
I'm mostly using this for embarrassingly parallel problems so the pipe would be replaced with some kind of pipeline for a lot of things
is this different than a/pipeline?
the overall thing? or just the use of a/pipe?
I didn't think I could have a reducing stage in the middle of a/pipeline
this is a toy example of the kind of data processing I'd like to do
read in the records from a file, do scrubbing on each record, gather them up by some key (user id say), then process each data item grouped by a key and grab the results to put into some kind of output (csv, excel, database, api)
I've not found good examples of people chaining together the higher level core.async things (my google-fu has failed me)
and I'm just wondering if I'm using the library in a way it wasn't really mean to be
Clojure Applied has a chapter on this but I think the key is - make components that are given the channels to use
then whatever is assembling the units can make channels and hand it as an input to one and an output to another (and you don't even need pipe then)
it also lets the assembler choose buffer policies and sizes etc
yeah, I think the pipe in here is a bit gratuitous looking at it again. That would probably be where I'd put in a a/pipeline if that was appropriate
I'll have a look again at the Clojure Applied chapter
re-reading it with that in mind should help
thx 🙂
I think it's ch 6 but I don't have it in front of me
I'm looking about for my dead tree version. I might have to go to safari
I assume you mean Programming Clojure ;)
I've only got one copy of that
I think I gave away my dead tree copy of Clojure Applied (unless it is in another room). I push it at people who want to go from noodling at the repl to actually building things
it is the best book for that
the authors of all of them are really friendly
mmm... vertical slices
re: https://clojurians.slack.com/archives/C05423W6H/p1603469711097400
sounds then that things like a/mult should mostly live in the assembler namespaces
If I had a component doing a reduce/transduce/into then the component would have a pipe at the end to put the data on the passed in output channel I suppose (as those functions don't take output channels, but just return them
@alexmiller thx for this. This makes me think that a little assembly function I wrote for some sugar is almost completely a bad idea
I was thinking that was true before, but didn't have a good way of thinking how things should be assembled