core-async

seralbdev 2020-08-26T17:52:44.015400Z

Hi all... first time here I hope I don't break anything...I am Clojure beginner evaluating core.async and I have a situation I cannot understand

seralbdev 2020-08-26T17:52:48.015600Z

(defonce server (atom nil)) (def flag (atom false)) (def rawreqchan (async/chan 1)) (def decreqchan (async/chan 1)) (def seye-auth-chan (async/chan 1)) (def reqpub (async/pub decreqchan (fn [arg] (:fid arg)))) (defn handle-msg [rawreqchan rawreq] (log/debug "handle-msg: " rawreq) (async/>!! rawreqchan rawreq)) (defn pump! [inchan outchan] (let [dict nil] ;;(build-dict) (async/go (loop [] (log/debug "waiting for req...") (let [msg (async/<! inchan)] (if-not (some? msg) (do (log/debug "closing!...") (async/close! outchan)) (do (log/debug "decoding req...") (let [decdata "data"];; (decode-msg! msg dict) (log/debug "forwarding decoded req...") (async/>! outchan decdata) (recur))))))))) (defn start [] (let [_ nil] (async/go (pump! rawreqchan decreqchan)) (async/go (loop [] (when-some [v (async/<! seye-auth-chan)] (log/debug "dec: " v) (recur)))) (async/sub reqpub 2 seye-auth-chan) (future (async/>!! rawreqchan {:fid 2 :sid 1 :data "dummy"})))) ;;(reset! server (msg/listen! 21000 (partial handle-msg rawreqchan) {:endianess :LE}))))

seralbdev 2020-08-26T17:53:44.016600Z

calling (start) gives this result asyncedge.bootstrap> (start) #<Future@15349450: true>19:44:46.712 [async-dispatch-5] DEBUG asyncedge.bootstrap - waiting for req... 19:44:46.713 [async-dispatch-5] DEBUG asyncedge.bootstrap - decoding req... 19:44:46.714 [async-dispatch-5] DEBUG asyncedge.bootstrap - forwarding decoded req... 19:44:46.714 [async-dispatch-5] DEBUG asyncedge.bootstrap - waiting for req...

seralbdev 2020-08-26T17:54:31.017300Z

but...this produces the right effect asyncedge.bootstrap> (async/>!! decreqchan {:fid 2 :sid 1 :data "data"}) true19:45:14.410 [async-dispatch-1] DEBUG asyncedge.bootstrap - dec: {:fid 2, :sid 1, :data data}

seralbdev 2020-08-26T17:56:09.018300Z

The pub & sub seems to be working OK but, somehow, the call to (async/>! outchan decdata) inside the pump! function does not produce the same result...

seralbdev 2020-08-26T18:00:37.020400Z

If instead of this (when-some [v (async/<! seye-auth-chan)] put this (when-some [v (async/<! decreqchan)] and call (start) things work fine, so the "pipeline" between rawreqchan and decreqchan is working...

seralbdev 2020-08-26T18:01:27.021200Z

Its been 24 hours thinking about this and I cannot get it... some subtle detail from core.async internals??

phronmophobic 2020-08-26T18:02:54.021300Z

trying to take all this in. if you reformat the example into a code block, it makes it easier to process.

seralbdev 2020-08-26T18:03:53.021500Z

thanks so much...sorry but first time using Slack...how do I reformat it?

seralbdev 2020-08-26T18:04:40.021800Z

(defonce server (atom nil))
(def flag (atom false))
(def rawreqchan (async/chan 1))
(def decreqchan (async/chan 1))
(def seye-auth-chan (async/chan 1))
(def reqpub (async/pub decreqchan (fn [arg] (:fid arg))))

(defn handle-msg [rawreqchan rawreq]
  (log/debug "handle-msg: " rawreq)
  (async/&gt;!! rawreqchan rawreq))

(defn pump! [inchan outchan]
  (let [dict nil] ;;(build-dict)
    (async/go
      (loop []      
        (log/debug "waiting for req...")
        (let [msg (async/&lt;! inchan)]
          (if-not (some? msg)
            (do
              (log/debug "closing!...")
              (async/close! outchan))
            (do
              (log/debug "decoding req...")
              (let [decdata "data"];; (decode-msg! msg dict)
                (log/debug "forwarding decoded req...")
                (async/&gt;! outchan decdata)
                (recur)))))))))


(defn start []
  (let [_ nil]

    (async/go
      (pump! rawreqchan decreqchan))

    (async/go
      (loop []
        (when-some [v (async/&lt;! seye-auth-chan)]
          (log/debug "dec: " v)
          (recur))))  

    (async/sub reqpub 2 seye-auth-chan)

    (future (async/&gt;!! rawreqchan {:fid 2 :sid 1 :data "dummy"}))))
    ;;(reset! server (msg/listen! 21000 (partial handle-msg rawreqchan) {:endianess :LE}))))

seralbdev 2020-08-26T18:04:51.021900Z

ok I see

phronmophobic 2020-08-26T18:05:04.022100Z

:thumbsup:

seralbdev 2020-08-26T18:05:27.022400Z

asyncedge.bootstrap&gt; (start)
#&lt;Future@15349450: true&gt;19:44:46.712 [async-dispatch-5] DEBUG asyncedge.bootstrap - waiting for req...
19:44:46.713 [async-dispatch-5] DEBUG asyncedge.bootstrap - decoding req...
19:44:46.714 [async-dispatch-5] DEBUG asyncedge.bootstrap - forwarding decoded req...
19:44:46.714 [async-dispatch-5] DEBUG asyncedge.bootstrap - waiting for req...

seralbdev 2020-08-26T18:05:42.022700Z

asyncedge.bootstrap&gt; (async/&gt;!! decreqchan {:fid 2 :sid 1 :data "data"})
true19:45:14.410 [async-dispatch-1] DEBUG asyncedge.bootstrap - dec:  {:fid 2, :sid 1, :data data}

phronmophobic 2020-08-26T18:07:17.023500Z

do you have a high level description of what this code is trying to do?

phronmophobic 2020-08-26T18:08:17.024900Z

what's the difference between pump! and the built in pipe?

seralbdev 2020-08-26T18:09:22.026200Z

I am just trying to play a bit with core.async so doing the transformation and copy between channels "by hand"

seralbdev 2020-08-26T18:09:53.026900Z

pump function will decode a request...

seralbdev 2020-08-26T18:10:10.027300Z

and put the result in decreqchan

seralbdev 2020-08-26T18:10:26.027700Z

a pub will publish it and a sub should receive it

seralbdev 2020-08-26T18:11:02.028400Z

I can't get why a direct >!! in decreqchan works (pub & sub are triggered)

seralbdev 2020-08-26T18:11:43.029100Z

but inside pump! not

seralbdev 2020-08-26T18:12:35.029700Z

which ends up writing in decreqchan anyway...

seralbdev 2020-08-26T18:13:53.030500Z

TCP server -> msg put in rawreqchan -> pump -> decreqchan -> pub -> sub -> seye-auth-chan

seralbdev 2020-08-26T18:14:11.030900Z

TCP server is disabled now, just inserting directly in rawreqchan

phronmophobic 2020-08-26T18:16:55.031600Z

the data is different, in one case, it's {:fid 2 :sid 1 :data "dummy"}, in the other, it's "data"

seralbdev 2020-08-26T18:17:35.032300Z

yeah that is not relevant...all the decoding mechanism is now disabled...

phronmophobic 2020-08-26T18:17:52.032800Z

right, but isn't the pub/sub based on the :fid key?

phronmophobic 2020-08-26T18:18:00.033100Z

so I think it is relevant

2020-08-26T18:18:24.033600Z

I would start by making sure you aren't shooting your self in the foot by running multiple copies of this code in your repl all sharing the global channels

phronmophobic 2020-08-26T18:19:30.034900Z

you only have a subscription for data that has an :fid value of 2

seralbdev 2020-08-26T18:19:31.035Z

umm the map is {:fid :sid :data} at this moment the relevant part is :fid yes, this is the dispatch key but the :data content is not being used

2020-08-26T18:19:43.035400Z

you can make sure of that by verifying the behavior in a fresh started repl

seralbdev 2020-08-26T18:20:29.036Z

yeah but in both cases (future & direct call) :fid = 2

phronmophobic 2020-08-26T18:20:33.036100Z

in pump, you're putting "data". try changing the let from

(let [decdata "data"] ...
to
(let [decdata {:fid 2 :sid 1 :data "dummy"}] ...

seralbdev 2020-08-26T18:21:18.036700Z

OMG you are right

seralbdev 2020-08-26T18:21:28.036900Z

this is stupid

seralbdev 2020-08-26T18:21:33.037100Z

so sorry

phronmophobic 2020-08-26T18:22:18.037300Z

if you have a persistent bug, it's always hiding in the place where you assume everything is working

seralbdev 2020-08-26T18:24:35.038100Z

thanks so much for taking time to help. very much appreciated!

👍 1