Hey all, I'm trying to write my first transducer. I'm processing an endless channel of data and would like to simple do something every n elements processed. Does this look right?
(defn do-every
"Returns a transducer that executes f every n steps."
[n f]
(fn [rf]
(let [i (volatile! 0)]
(fn
([] (rf))
([result] (rf result))
([result input]
(when (zero? (mod (vswap! i inc) n))
(f))
(rf result input))))))
I copied the volatile!
from an example on http://clojure.org/transducers, but, I admit, I'm not entirely sure what it does...@devon: have you tried partition-all
?
I may have misunderstood, were you just trying to implement similar behavior?
nvm, it’s just side-effecting f
with no args
looks reasonable. I think of volatile as an atom that can only be written to from a single thread at a time, but is faster than an atom because of that. If you find good docs explaining its usage, please share!
oh, that makes sense, great to know! thanks!
devon: note that channel transducers run inside the internal channel locks, so that should be a quick side-effect
not a potentially blocking one
if it is not in that category -- probably should run that function not in a transducer
This one is, it's just logging. But, out of curiosity, is it any worse to have a potentially blocking code in transducer on the channel vs something that's consuming off of it? Assuming that it's only one thread is consuming?
it is worse because it's easy to forget about it and do it in multiple places in the application and generate surprising deadlocks
channel monitoring is on the roadmap, and I'm sure Rich and Stu have thoughts.
great to know, thanks!