I want to pipe one channel's values to another at a strict rate limit, no bursts, just ct
messages per ms
, starting upon getting the first message. Is there an obvious better way to write this:
(defn limit [ct ms in out]
(go
(let [fst (<! in)
time (a/timeout ms)]
(>! out fst)
(dotimes [_ (dec ct)]
(>! out (<! in)))
(<! time)
(loop [time (a/timeout ms)]
(dotimes [_ ct]
(>! out (<! in)))
(<! time)
(recur (a/timeout ms))))))
(def a (chan))
(def b (chan))
(go-loop []
(when-some [x (<! b)]
(println "got" x "from b")
(recur)))
(limit 5 1000 a b)
(dotimes [x 20] (a/put! a x))
@jjttjj so you want to wait indefinitely if less than ct messages come in before the timeout?
async channels implements closeable?
looks like no
Clojure 1.10.1
(ins)user=> (require '[clojure.core.async :as >])
nil
(ins)user=> (.close (>/chan))
Execution error (IllegalArgumentException) at user/eval9270 (REPL:1).
No matching field found: close for class clojure.core.async.impl.channels.ManyToManyChannel
hum :thinking_face:
good point. I don't think so. So I think I need a second loop to put stuff on the timeout chan every x ms
more generally
(ins)user=> (pprint (supers (class (>/chan))))
#{clojure.lang.IType clojure.core.async.impl.protocols.WritePort
clojure.core.async.impl.protocols.Channel
clojure.core.async.impl.channels.MMC
clojure.core.async.impl.protocols.ReadPort java.lang.Object}
nil
it would be awesome if was
wow supers! thanks!
supers is in core?
yes, given a class it gives a set of superclasses (which will mostly be interfaces of course)
yup, understood, I use in general clojure.reflect/reflect, good to know =D
also, it seems like you could move the duplicate dotimes into the loop, by adding a parameter for ct (always using ct from the recur, using (dec ct) on the first call)
(defn limit [ct ms in out]
(go
(let [fst (<! in)]
(>! out fst)
(loop [n (dec ct)
time (a/timeout ms)]
(dotimes [_ n]
(>! out (<! in)))
(<! time)
(recur cnt (a/timeout ms))))))
oh yeah, awesome, I knew I was missing something like that
also, if time
was eg. a delay, you could start it with the first message of each cycle
(defn limit [ct ms in out]
(go-loop [td (delay (timeout ms))]
(dotimes [_ n]
(>! out (<! in))
(force td))
(<! @td)
(recur (delay (timeout ms))))
significantly simplified code, if you are OK with the changed behavior (each timer starting with the first item in that time range)
yes, that's actually what I want
just fixed it - the force of td still has to be after the first take (after the first force, the rest are no-ops)