@smith.adriane thats a good thought...let me play with that
also, I'm just realizing that you probably want (comp (drop offset) (take limit))
rather than (comp (take limit) (drop offset))
while you typically supply functions to comp in "reverse order", you don't with transducers. see https://groups.google.com/g/clojure/c/OeXZOJYydAs for a longer explanation
@smith.adriane oh wow, I didnt know that either..ty
your idea did work, btw
it was a little awkward, as the "to" channel was supplied to me, so I had to put two async/pipes in to get the transducer in the right spot
but nbd
its functional..thank you
(would be nice if async/pipe had an arity for an xform...
but i digress
there are other alternatives
you can run the transducer yourself
do tell
(def xform (comp
(drop 1)
(take 3)))
(def my-rf (xform #(do %2)))
(my-rf nil 1) ;; nil
(my-rf nil 2) ;; 2
(my-rf nil 3) ;; 3
(my-rf nil 4) ;; (reduced 4)
which could be used like:
(def xform (comp
(drop 1)
(take 3)))
(def my-rf (xform #(do %2)))
(go-loop []
(let [v (my-rf nil (<! ch))]
(when v
(if (reduced? v)
(do-thing-with-v (unreduced v))
(do
(do-thing-with-v v)
(recur))))))
interesting, ty for the details
I'm sure there's a nice way to wrap up that functionality if that's not already done somewhere in core.async
im somewhat green on transducers, so ill take a bit to decode that but, very cool
every time I try to do something with transducers that's not the typical example, it always starts out as like 30 lines of code and gets reduced (😉) to like 5 lines once I figure it out
I'm not sure what the process you're building that's consuming from the channel, but async/reduce
might also be a good building block to look into
im basically building a join from two cassandra CQL queries, with pagination
async/transduce
uses async/reduce
for implementation, https://github.com/clojure/core.async/blob/e9dc83dad06161176f176cad0f56f8bb235c51e5/src/main/clojure/clojure/core/async.clj#L651
do they results get aggregated into a collection or are they streamed to some other process?
either way, you should be able to use async/transduce
:
• stream to collection:
(async/transduce
(comp (drop offset) (take limit))
conj
[]
ch)
• stream to other process
(async/transduce
(comp (drop offset) (take limit))
async/put!
other-process-chan
ch)
it should even work if you're streaming to another process using something besides a core async channel:
(async/transduce
(comp (drop offset) (take limit))
socket-send! ;; assuming usage like (socket-send! conn data)
socket-connection
ch)
edit: all the examples were missing the channel to consume argument, ch
that looks promising...ty for the tips
oh wow, that is really cool
its streaming out to a grpc client via protojure, which uses core.async
so your async>!! example is likely perfect
oh whoops, pretty sure async/>!!
should be async/put!
I made one slight variation
(async/transduce
(comp (drop offset) (take limit))
(fn
([] nil)
([acc] (async/close! output-ch))
([acc x] (async/>!! output-ch x)))
nil)
otherwise I was getting an arity error for the [acc] case
im sure there are more elegant ways to do this...
that looks pretty good to me, but I wouldn't be surprised if there was a more idiomatic solution. I might pass output-ch
as the initial value and use it as the accumulator value rather than ignoring acc
, but it's a minor nitpick.
more importantly, async/>!!
should probably be async/put!
so it doesn't block the go loop used by async/transduce
if output-ch
isn't consuming as fast as the input channel is producing
ah, good point...the only problem with put! is the opposite case of a slow reader causing the 1024 buffer limit in put!. Since its an external client I was trying to be defensive, but go-pool blocking isnt attractive either
anyway, i think i understand the issues now, so I can tune this
ty very much, again
Hello! I am trying to make a variant of take-while
that closes the source and destination channels once it takes all it can. Is there a better way to implement it than this:
(defn chan-while [src dst pred]
(let [filtered (a/chan 1 (take-while pred))]
(a/pipe src filtered)
(a/go-loop []
(if-let [val (a/<! filtered)]
(and (a/>! dst val)
(recur))
(do (a/close! src)
(a/close! dst))))))
?To test it I use this as the src
:
(def src (a/to-chan! [1 3 5 2 4 6]))
this as dst
:
(defn print-ch []
(let [ch (a/chan)]
(a/go-loop []
(if-let [val (a/<! ch)]
(do (println "VAL: " val)
(recur))
(println "CLOSED dst")))
ch))
and invoke it all via:
(chan-while src (print-ch) odd?)
I'm actually getting the output you expect from that code.
Thanks a lot! You are right, I must have screwed up my testing.