if you have an use of try/catch/finally in your real code you might try without that, there are some bugs in how the go macro compiles that which can lead to weirdness
I don't believe so
Oh. It does take place in an add-watch
fn.
will try to add that to the repro. code
I bet if you go through, and put printlns everywhere, you will find two printlns that happen in a different order then you expect
and it'll be something like the tap isn't happening until after all the go loops have published their values to a mult where no body is listening
hmmmmm
(def ch1 (a/chan 1))
(def m (a/mult ch1))
(def aa (atom nil))
(add-watch aa ::aa
(fn [k r o n]
(let [ch2 (a/tap m (a/chan 1))]
(log/debug "watch changed")
(a/go
(when-let [x (a/<! ch2)]
(println "GOT2" x)
;;(a/close! ch2)
)))))
(doseq [x (range 10)]
(a/put! ch1 x)
;;(a/go (a/>! ch1 x))
)
(def but
(d/div
(d/button {:onclick #(a/put! ch1 (rand))} "put")
(d/button {:onclick #(reset! aa 2)} "reset")))
So this is closer to the the real thing, and now the go loop never prints, with or without the close!
yeah
(i realize now the watch function is dumb, and i could stream the changes as another channel but should this work anyway?)
all the puts flow through the mult and go no where because there is no one listening
and then later you click the button, it resets the atom, which creates a tap, but by that point there is nothing there
by all the puts I mean the a/put! in the doseq
ah, i think that's still a case of a bad repro. sorry. The real thing creates the tap, then makes a request that should result in the puts, then does the go block
it should result in puts, but does it?
(def ch1 (a/chan 1))
(def m (a/mult ch1))
(def aa (atom nil))
(defn do-puts! []
(doseq [x (range 10)]
(a/put! ch1 x)
;;(a/go (a/>! ch1 x))
))
(add-watch aa ::aa
(fn [k r o n]
(let [ch2 (a/tap m (a/chan 1))]
(do-puts!)
(log/debug "watch changed")
(a/go
(when-let [x (a/<! ch2)]
(println "GOT2" x)
;;(a/close! ch2)
)))))
(def but
(d/div
(d/button {:onclick #(a/put! ch1 (rand))} "put")
(d/button {:onclick #(reset! aa 2)} "reset")))
and now it's back to working with and without the close!
(the go block print executes on reset of the atom)
you are getting at least one println out
and what is happening is the mult is getting clogged up because you never untap after your watch is done
the close is effectively an untap
in your real world case you can replace the a/close with a/untap, see that it makes it work, and confirm that this is correct
oh yeah, sure
at which means the whole watch things happens at least once, so you should see at least one println, and if you don't, you can start trying to figure out why
ok this is confusing. The untap works, but doesn't seem to actually untap anything judging from libral printlns
i mean the print happens with the untap, same as no close! or untap.
but the tapped chans are still receiving values after the untap
just for the sake of it, try untap-all
untap-all works as expected!
I suspect somewhere, somehow you do have a tap, where the channel being tapped to is not being consumed from
which will block the mult
i can't seem to find one at the moment (which doesn't mean it doesn't exist).
gotta run out for now, probably best to step away from this for a bit anyway, but thanks for all the help/patience!
I finally tracked this down! I was basically using code inspired by/equivalent to this: https://github.com/capitalone/cqrs-manager-for-distributed-reactive-services/blob/master/src/com/capitalone/commander/api.clj#L288-L292 This provides a way to get a copy of a single source chan, but with its events transformed (by piping through a chan with a transducer). The transformation chan that's actually tapped is closed over and thus unable to be closed/untapped
Out of curiosity, this code is probably inherently wrong/bad right? the transformation (ie (map command-map)
transducer above) should be done before the mult is created right?
Putting a sliding/dropping buffer on the int
chan gets the job done also but it still doesn't allow for untapping
What is the ideal usage of mult
anyway? I have a clojurescript app and have a single channel that serves as the point of entry for all messages from a server. I have to distribute those messages to various ui components. I've tended to approach this by making a mult
wrapper for that channel at the top level, and creating helper functions of that mult such as the commander code above, and call these from the ui components. Is it better to just use the main channel instead and have each component create it's own mult
off that if needed?
Actually I guess the mult per component wouldn't work because then nothing could take from the main incoming channel, multiple mults would race to take messages, etc
Sounds like pub/sub (which is implemented using mults) would be more suitable for that
if I understand the doc string correctly, a pub wont' globally get "stuck" on one held up sub on any topic, but rather each sub is it's own mult and can only hold up other sub'ed chans with the same topic right
With the defaults a pub can actually get stuck globally if there's a slow subscriber, since the mults are attached to unbuffered channels. You could pass a buf-fn to the pub to avoid it (drop messages instead of blocking), or just make sure to subscribe to non-blocking channels.
ok gotcha