Code review wanted for this automatic/reconnecting websocket client. The idea is basically to have something which acts exactly as the aleph websocket client except it uses the source/sink streams you provide it, and it reconnects to those streams when the connection is closed unintentionally. I'm mainly wondering if the stream splicing/connection logic can be better.
;;;goal: pass input/output streams which connect to a websocket client duplex
;;;stream. when the inner websocket stream is closed unintentionally by server,
;;;make a new one and reconnect the input/output streams. close the websocket
;;;stream when the output stream is closed.
(require '[manifold.stream :as ms]
'[manifold.deferred :as md]
'[alep.http :as http]
'[taoensso.timbre :as log]
'[clojure.data.json :as json])
(defn ws-conn [url]
(http/websocket-client url
{:max-frame-payload 1e7 :max-frame-size 1e7}))
(defn reconnecting-websocket-stream [{::keys [url on-connect
in-stream
out-stream]
:or {in-stream (ms/stream)
out-stream (ms/stream)}}]
(let [in* (ms/stream* {:permanent? true :buffer-size 1000})
_ (ms/connect in-stream in*)
s (ms/splice in-stream out-stream)
renew-connection
(fn this
([] (this 0 (quot (System/currentTimeMillis) 1000)))
([n last-reset-second]
(md/chain (ws-conn url)
(fn [conn]
(log/debug "Starting connection. Restart count:" n)
(ms/on-closed out-stream (fn [] (.close conn)))
(ms/connect in* conn)
(ms/connect conn out-stream {:downstream? false})
(ms/on-closed conn
(fn []
(log/info "connection closed")
(when (not (ms/closed? out-stream))
(when (= last-reset-second (quot (System/currentTimeMillis) 1000))
(log/info "attempted to reset more than once per second, waiting")
(Thread/sleep 1000))
(log/info "restarting after connectiong closed closed")
(this (inc n) (quot (System/currentTimeMillis) 1000)))))
(when on-connect (on-connect s))))))]
(renew-connection)
s))
usage:
(def subscribe-msg {:type :subscribe
:product_ids ["BTC-USD"]
:channels [:heartbeat :full]})
(def in (ms/stream))
(def out (ms/stream))
(reconnecting-websocket-stream
{::url "<wss://ws-feed.pro.coinbase.com>"
::on-connect (fn [s] (ms/put! s (json/write-str subscribe-msg)))
::in-stream in
::out-stream out})
(ms/consume
(fn [msg]
(log/sometimes 0.002
(log/info msg)))
out)
;;; to close
(ms/close! out)