core-async

2020-07-27T18:49:15.098800Z

I want to pipe one channel's values to another at a strict rate limit, no bursts, just ct messages per ms, starting upon getting the first message. Is there an obvious better way to write this:

(defn limit [ct ms in out]
  (go
    (let [fst (<! in)
          time  (a/timeout ms)]
      (>! out fst)
      (dotimes [_ (dec ct)]
        (>! out (<! in)))
      (<! time)
      (loop [time (a/timeout ms)]
        (dotimes [_ ct]
          (>! out (<! in)))
        (<! time)
        (recur (a/timeout ms))))))

(def a (chan))
(def b (chan))

(go-loop []
  (when-some [x (<! b)]
    (println "got" x "from b")
    (recur)))

(limit 5 1000 a b)

(dotimes [x 20] (a/put! a x))

2020-07-27T19:15:45.099500Z

@jjttjj so you want to wait indefinitely if less than ct messages come in before the timeout?

2020-07-27T19:17:27.099900Z

async channels implements closeable?

2020-07-27T19:21:18.100300Z

looks like no

Clojure 1.10.1
(ins)user=> (require '[clojure.core.async :as >])
nil
(ins)user=> (.close (>/chan))
Execution error (IllegalArgumentException) at user/eval9270 (REPL:1).
No matching field found: close for class clojure.core.async.impl.channels.ManyToManyChannel

2020-07-27T19:21:41.100700Z

hum :thinking_face:

2020-07-27T19:21:42.100800Z

good point. I don't think so. So I think I need a second loop to put stuff on the timeout chan every x ms

2020-07-27T19:22:04.101500Z

more generally

(ins)user=> (pprint (supers (class (>/chan))))
#{clojure.lang.IType clojure.core.async.impl.protocols.WritePort
  clojure.core.async.impl.protocols.Channel
  clojure.core.async.impl.channels.MMC
  clojure.core.async.impl.protocols.ReadPort java.lang.Object}
nil

2020-07-27T19:22:07.101600Z

it would be awesome if was

2020-07-27T19:22:25.101900Z

wow supers! thanks!

2020-07-27T19:22:39.102100Z

supers is in core?

2020-07-27T19:23:09.102300Z

yes, given a class it gives a set of superclasses (which will mostly be interfaces of course)

2020-07-27T19:23:39.102500Z

yup, understood, I use in general clojure.reflect/reflect, good to know =D

2020-07-27T19:32:38.102700Z

also, it seems like you could move the duplicate dotimes into the loop, by adding a parameter for ct (always using ct from the recur, using (dec ct) on the first call)

2020-07-27T19:34:48.102900Z

(defn limit [ct ms in out]
  (go
    (let [fst (<! in)]
      (>! out fst)
      (loop [n (dec ct)
             time (a/timeout ms)]
         (dotimes [_ n]
            (>! out (<! in)))
         (<! time)
         (recur cnt (a/timeout ms))))))

2020-07-27T19:35:03.103100Z

oh yeah, awesome, I knew I was missing something like that

2020-07-27T19:39:48.103300Z

also, if time was eg. a delay, you could start it with the first message of each cycle

(defn limit [ct ms in out]
  (go-loop [td (delay (timeout ms))]
     (dotimes [_ n]
       (>! out (<! in))
       (force td))
      (<! @td)
      (recur (delay (timeout ms))))

2020-07-27T19:40:14.103500Z

significantly simplified code, if you are OK with the changed behavior (each timer starting with the first item in that time range)

2020-07-27T19:40:40.103700Z

yes, that's actually what I want

2020-07-27T19:41:10.104Z

just fixed it - the force of td still has to be after the first take (after the first force, the rest are no-ops)