
ghaskins 2020-11-02T00:00:07.161900Z

@smith.adriane thats a good thought...let me play with that

phronmophobic 2020-11-02T00:13:33.162700Z

also, I'm just realizing that you probably want (comp (drop offset) (take limit)) rather than (comp (take limit) (drop offset))

phronmophobic 2020-11-02T00:17:26.164300Z

while you typically supply functions to comp in "reverse order", you don't with transducers. see for a longer explanation

ghaskins 2020-11-02T00:31:14.164700Z

@smith.adriane oh wow, I didnt know that either..ty

ghaskins 2020-11-02T00:31:18.164900Z

your idea did work, btw

ghaskins 2020-11-02T00:31:42.165500Z

it was a little awkward, as the "to" channel was supplied to me, so I had to put two async/pipes in to get the transducer in the right spot

ghaskins 2020-11-02T00:31:44.165700Z

but nbd

ghaskins 2020-11-02T00:31:51.166Z

its functional..thank you

ghaskins 2020-11-02T00:32:22.166700Z

(would be nice if async/pipe had an arity for an xform...

ghaskins 2020-11-02T00:32:25.167Z

but i digress

phronmophobic 2020-11-02T00:32:36.167300Z

there are other alternatives

phronmophobic 2020-11-02T00:32:44.167500Z

you can run the transducer yourself

ghaskins 2020-11-02T00:33:30.167800Z

do tell

phronmophobic 2020-11-02T00:34:36.168400Z

(def xform (comp
            (drop 1)
            (take 3)))

(def my-rf (xform #(do %2)))

(my-rf nil 1) ;; nil
(my-rf nil 2) ;; 2
(my-rf nil 3) ;; 3
(my-rf nil 4) ;; (reduced 4)
which could be used like:
(def xform (comp
            (drop 1)
            (take 3)))

(def my-rf (xform #(do %2)))

(go-loop []
  (let [v (my-rf nil (<! ch))]
    (when v
      (if (reduced? v)
        (do-thing-with-v (unreduced v))
          (do-thing-with-v v)

ghaskins 2020-11-02T00:35:21.169200Z

interesting, ty for the details

phronmophobic 2020-11-02T00:35:23.169300Z

I'm sure there's a nice way to wrap up that functionality if that's not already done somewhere in core.async

ghaskins 2020-11-02T00:36:04.170100Z

im somewhat green on transducers, so ill take a bit to decode that but, very cool

phronmophobic 2020-11-02T00:36:46.170900Z

every time I try to do something with transducers that's not the typical example, it always starts out as like 30 lines of code and gets reduced (😉) to like 5 lines once I figure it out

phronmophobic 2020-11-02T00:38:12.171800Z

I'm not sure what the process you're building that's consuming from the channel, but async/reduce might also be a good building block to look into

👍 1
ghaskins 2020-11-02T00:39:00.172500Z

im basically building a join from two cassandra CQL queries, with pagination

phronmophobic 2020-11-02T00:39:36.173Z

async/transduce uses async/reduce for implementation,

phronmophobic 2020-11-02T00:41:41.173200Z

do they results get aggregated into a collection or are they streamed to some other process?

phronmophobic 2020-11-02T00:43:57.173500Z

either way, you should be able to use async/transduce: • stream to collection:

  (comp (drop offset) (take limit))
• stream to other process
  (comp (drop offset) (take limit))
it should even work if you're streaming to another process using something besides a core async channel:
  (comp (drop offset) (take limit))
  socket-send! ;; assuming usage like (socket-send! conn data)
edit: all the examples were missing the channel to consume argument, ch

ghaskins 2020-11-02T00:46:49.174100Z

that looks promising...ty for the tips

👍 1
ghaskins 2020-11-02T00:47:22.174200Z

oh wow, that is really cool

ghaskins 2020-11-02T00:47:48.174500Z

its streaming out to a grpc client via protojure, which uses core.async

ghaskins 2020-11-02T00:47:56.174700Z

so your async>!! example is likely perfect

🤞 1
phronmophobic 2020-11-02T00:52:26.175100Z

oh whoops, pretty sure async/>!! should be async/put!

ghaskins 2020-11-02T01:16:36.175400Z

I made one slight variation

           (comp (drop offset) (take limit))
             ([] nil)
             ([acc] (async/close! output-ch))
             ([acc x] (async/>!! output-ch x)))

ghaskins 2020-11-02T01:16:57.175600Z

otherwise I was getting an arity error for the [acc] case

ghaskins 2020-11-02T01:17:11.175800Z

im sure there are more elegant ways to do this...

phronmophobic 2020-11-02T01:42:55.176Z

that looks pretty good to me, but I wouldn't be surprised if there was a more idiomatic solution. I might pass output-ch as the initial value and use it as the accumulator value rather than ignoring acc, but it's a minor nitpick. more importantly, async/>!! should probably be async/put! so it doesn't block the go loop used by async/transduce if output-ch isn't consuming as fast as the input channel is producing

ghaskins 2020-11-02T02:17:22.176200Z

ah, good point...the only problem with put! is the opposite case of a slow reader causing the 1024 buffer limit in put!. Since its an external client I was trying to be defensive, but go-pool blocking isnt attractive either

ghaskins 2020-11-02T02:17:56.176400Z

anyway, i think i understand the issues now, so I can tune this

ghaskins 2020-11-02T02:18:02.176600Z

ty very much, again

Jakub Holý 2020-11-02T08:14:57.181800Z

Hello! I am trying to make a variant of take-while that closes the source and destination channels once it takes all it can. Is there a better way to implement it than this:

(defn chan-while [src dst pred]
    (let [filtered (a/chan 1 (take-while pred))]
      (a/pipe src filtered)
      (a/go-loop []
        (if-let [val (a/<! filtered)]
          (and (a/>! dst val)
          (do (a/close! src)
              (a/close! dst))))))

Jakub Holý 2020-11-02T08:16:17.181900Z

To test it I use this as the src :

(def src (a/to-chan! [1 3 5 2 4 6]))
this as dst :
(defn print-ch []
    (let [ch (a/chan)]
      (a/go-loop []
        (if-let [val (a/<! ch)]
          (do (println "VAL: " val)
          (println "CLOSED dst")))
and invoke it all via:
(chan-while src (print-ch) odd?)

Jan K 2020-11-02T11:48:35.188900Z

I'm actually getting the output you expect from that code.

✅ 1
Jakub Holý 2020-11-02T18:57:01.189600Z

Thanks a lot! You are right, I must have screwed up my testing.