Hi all... first time here I hope I don't break anything...I am Clojure beginner evaluating core.async and I have a situation I cannot understand
(defonce server (atom nil)) (def flag (atom false)) (def rawreqchan (async/chan 1)) (def decreqchan (async/chan 1)) (def seye-auth-chan (async/chan 1)) (def reqpub (async/pub decreqchan (fn [arg] (:fid arg)))) (defn handle-msg [rawreqchan rawreq] (log/debug "handle-msg: " rawreq) (async/>!! rawreqchan rawreq)) (defn pump! [inchan outchan] (let [dict nil] ;;(build-dict) (async/go (loop [] (log/debug "waiting for req...") (let [msg (async/<! inchan)] (if-not (some? msg) (do (log/debug "closing!...") (async/close! outchan)) (do (log/debug "decoding req...") (let [decdata "data"];; (decode-msg! msg dict) (log/debug "forwarding decoded req...") (async/>! outchan decdata) (recur))))))))) (defn start [] (let [_ nil] (async/go (pump! rawreqchan decreqchan)) (async/go (loop [] (when-some [v (async/<! seye-auth-chan)] (log/debug "dec: " v) (recur)))) (async/sub reqpub 2 seye-auth-chan) (future (async/>!! rawreqchan {:fid 2 :sid 1 :data "dummy"})))) ;;(reset! server (msg/listen! 21000 (partial handle-msg rawreqchan) {:endianess :LE}))))
calling (start) gives this result asyncedge.bootstrap> (start) #<Future@15349450: true>19:44:46.712 [async-dispatch-5] DEBUG asyncedge.bootstrap - waiting for req... 19:44:46.713 [async-dispatch-5] DEBUG asyncedge.bootstrap - decoding req... 19:44:46.714 [async-dispatch-5] DEBUG asyncedge.bootstrap - forwarding decoded req... 19:44:46.714 [async-dispatch-5] DEBUG asyncedge.bootstrap - waiting for req...
but...this produces the right effect asyncedge.bootstrap> (async/>!! decreqchan {:fid 2 :sid 1 :data "data"}) true19:45:14.410 [async-dispatch-1] DEBUG asyncedge.bootstrap - dec: {:fid 2, :sid 1, :data data}
The pub & sub seems to be working OK but, somehow, the call to (async/>! outchan decdata) inside the pump! function does not produce the same result...
If instead of this (when-some [v (async/<! seye-auth-chan)] put this (when-some [v (async/<! decreqchan)] and call (start) things work fine, so the "pipeline" between rawreqchan and decreqchan is working...
Its been 24 hours thinking about this and I cannot get it... some subtle detail from core.async internals??
trying to take all this in. if you reformat the example into a code block, it makes it easier to process.
thanks so much...sorry but first time using Slack...how do I reformat it?
(defonce server (atom nil))
(def flag (atom false))
(def rawreqchan (async/chan 1))
(def decreqchan (async/chan 1))
(def seye-auth-chan (async/chan 1))
(def reqpub (async/pub decreqchan (fn [arg] (:fid arg))))
(defn handle-msg [rawreqchan rawreq]
(log/debug "handle-msg: " rawreq)
(async/>!! rawreqchan rawreq))
(defn pump! [inchan outchan]
(let [dict nil] ;;(build-dict)
(async/go
(loop []
(log/debug "waiting for req...")
(let [msg (async/<! inchan)]
(if-not (some? msg)
(do
(log/debug "closing!...")
(async/close! outchan))
(do
(log/debug "decoding req...")
(let [decdata "data"];; (decode-msg! msg dict)
(log/debug "forwarding decoded req...")
(async/>! outchan decdata)
(recur)))))))))
(defn start []
(let [_ nil]
(async/go
(pump! rawreqchan decreqchan))
(async/go
(loop []
(when-some [v (async/<! seye-auth-chan)]
(log/debug "dec: " v)
(recur))))
(async/sub reqpub 2 seye-auth-chan)
(future (async/>!! rawreqchan {:fid 2 :sid 1 :data "dummy"}))))
;;(reset! server (msg/listen! 21000 (partial handle-msg rawreqchan) {:endianess :LE}))))
ok I see
:thumbsup:
asyncedge.bootstrap> (start)
#<Future@15349450: true>19:44:46.712 [async-dispatch-5] DEBUG asyncedge.bootstrap - waiting for req...
19:44:46.713 [async-dispatch-5] DEBUG asyncedge.bootstrap - decoding req...
19:44:46.714 [async-dispatch-5] DEBUG asyncedge.bootstrap - forwarding decoded req...
19:44:46.714 [async-dispatch-5] DEBUG asyncedge.bootstrap - waiting for req...
asyncedge.bootstrap> (async/>!! decreqchan {:fid 2 :sid 1 :data "data"})
true19:45:14.410 [async-dispatch-1] DEBUG asyncedge.bootstrap - dec: {:fid 2, :sid 1, :data data}
do you have a high level description of what this code is trying to do?
what's the difference between pump!
and the built in pipe
?
I am just trying to play a bit with core.async so doing the transformation and copy between channels "by hand"
pump function will decode a request...
and put the result in decreqchan
a pub will publish it and a sub should receive it
I can't get why a direct >!! in decreqchan works (pub & sub are triggered)
but inside pump! not
which ends up writing in decreqchan anyway...
TCP server -> msg put in rawreqchan -> pump -> decreqchan -> pub -> sub -> seye-auth-chan
TCP server is disabled now, just inserting directly in rawreqchan
the data is different, in one case, it's {:fid 2 :sid 1 :data "dummy"}
, in the other, it's "data"
yeah that is not relevant...all the decoding mechanism is now disabled...
right, but isn't the pub/sub based on the :fid
key?
so I think it is relevant
I would start by making sure you aren't shooting your self in the foot by running multiple copies of this code in your repl all sharing the global channels
you only have a subscription for data that has an :fid
value of 2
umm the map is {:fid :sid :data} at this moment the relevant part is :fid yes, this is the dispatch key but the :data content is not being used
you can make sure of that by verifying the behavior in a fresh started repl
yeah but in both cases (future & direct call) :fid = 2
in pump, you're putting "data". try changing the let from
(let [decdata "data"] ...
to
(let [decdata {:fid 2 :sid 1 :data "dummy"}] ...
OMG you are right
this is stupid
so sorry
if you have a persistent bug, it's always hiding in the place where you assume everything is working
thanks so much for taking time to help. very much appreciated!