I'm using Aleph's client to connect to a push feed. It uses a chunked response encoding to push out JSON objects. I'm using manifold.stream/stream->seq
to convert the chunks into a sequences. Effectively, something like this:
(require '[manifold.stream :as ms])
(require '[aleph.http :as http])
(defn connect-to-push-stream
[{:keys [::process-fn ::push-url]}]
(m-d/chain (http/get push-url
{:pool (http/connection-pool
{:connection-options {:raw-stream? true}})})
:body
ms/stream->seq
;; TODO transform to seq of maps from json objects
;; Apply process-fn to each map created from the json object
#(map process-fn %)))
Of course, the items in the sequence are not the complete json payloads. I can iterate over the sequence and look for the CRLF endings to combine them.
However, I wasn't sure if there is anything Aleph provides that does this out of the box. I looked at the coerce-json-response
middleware, but it doesn't look like it works with a push feed.