aleph

2020-10-24T16:21:00.013100Z

Code review wanted for this automatic/reconnecting websocket client. The idea is basically to have something which acts exactly as the aleph websocket client except it uses the source/sink streams you provide it, and it reconnects to those streams when the connection is closed unintentionally. I'm mainly wondering if the stream splicing/connection logic can be better.

;;;goal: pass input/output streams which connect to a websocket client duplex
;;;stream. when the inner websocket stream is closed unintentionally by server,
;;;make a new one and reconnect the input/output streams. close the websocket
;;;stream when the output stream is closed.


(require '[manifold.stream :as ms]
         '[manifold.deferred :as md]
         '[alep.http :as http]
         '[taoensso.timbre :as log]
         '[clojure.data.json :as json])

(defn ws-conn [url]
  (http/websocket-client url
    {:max-frame-payload 1e7 :max-frame-size 1e7}))

(defn reconnecting-websocket-stream [{::keys [url on-connect
                                              in-stream
                                              out-stream]
                                      :or    {in-stream  (ms/stream)
                                              out-stream (ms/stream)}}]
  (let [in* (ms/stream* {:permanent? true :buffer-size 1000})
        _   (ms/connect in-stream in*)
        s   (ms/splice in-stream out-stream)
        renew-connection
        (fn this
          ([] (this 0 (quot (System/currentTimeMillis) 1000)))
          ([n last-reset-second]
           (md/chain (ws-conn url)
             (fn [conn]
               (log/debug "Starting connection. Restart count:" n)

               (ms/on-closed out-stream (fn [] (.close conn)))
               (ms/connect in* conn)
               (ms/connect conn out-stream {:downstream? false})
               (ms/on-closed conn
                 (fn []
                   (log/info "connection closed")
                   (when (not (ms/closed? out-stream))
                     (when (= last-reset-second (quot (System/currentTimeMillis) 1000))
                       (log/info "attempted to reset more than once per second, waiting")
                       (Thread/sleep 1000))
                     (log/info "restarting after connectiong closed closed")
                     (this (inc n) (quot (System/currentTimeMillis) 1000)))))
               (when on-connect (on-connect s))))))]
    (renew-connection)
    s))
usage:
(def subscribe-msg {:type        :subscribe
                    :product_ids ["BTC-USD"]
                    :channels     [:heartbeat :full]})

(def in (ms/stream))
(def out (ms/stream))

(reconnecting-websocket-stream
  {::url        "<wss://ws-feed.pro.coinbase.com>"
   ::on-connect (fn [s] (ms/put! s (json/write-str subscribe-msg)))
   ::in-stream  in
   ::out-stream out})


(ms/consume
  (fn [msg]
    (log/sometimes 0.002
      (log/info msg)))
  out)

;;; to close
(ms/close! out)