Well, technically anything that you run on JVM “consumes” thread in some meaning of the word “consume”. Even “non-blocking” calls. @igrishaev the code you’ve showed is actually doing some redundant work moving computation to manifold.executor/execute-pool
. If you just remove d/future
it should give you the same final result.
one more example: when a URL is nil, I get an exception immediately, like this:
(-> (http/get nil)
(d/chain
(fn [result] :good)))
;; throws Execution error (URISyntaxException) at java.net.URI$Parser/fail
But with the following approach, I’ll capture any exception with d/catch:
(-> nil
(d/chain
#(http/get %)
(fn [result] :good))
(d/catch
(fn [e] :error)))
I wonder if the second variant is alright?
so the idea is, I’d like to have only one entry point for all the the exceptions.
@igrishaev we do that with a macro - our real code would be confusing, 'cos it's got a lot of cross-platform stuff in, but something like this should do it
(defmacro safe-catch
[pr-form error-handler]
`(manifold.deferred/catch
(try
~pr-form
(catch Exception x#
(manifold.deferred/error-deferred x#)))
~error-handler))
then you have
(def r (-> (manifold.deferred/success-deferred 1)
(manifold.deferred/chain inc)
(manifold.deferred/chain
(fn [& args] (throw (ex-info "boo" {}))))
(safe-catch identity)))
or
(def r (-> (throw (ex-info "boo" {}))
(safe-catch identity)))
and you get a promise out the end even if the exception happens before the first callback
yes, that’s approach is clear to me. I’m wondering what might be wrong in the second block of code I posted above?
when you just pass a value and the main function is under a chain
Sorry, missed the threading.
@alexyakushev no, it throws immediately, but the error is caught and returned as an errored promise
yes, so would be great to hear your thoughts on that
@igrishaev i like the "always use safe-catch
rather than catch
" approach because it's easy to find violations with ag... but chaining a simple value should also work fine
and when you get into more complex scenarios with some other promise which you need to compose on to... having a catch
which works in all situations is nice
ok I made my own version of your macro:
(defmacro with-catch
[& body]
`(try
~@body
(catch Throwable e#
(d/error-deferred e#))))
so the chain body looks like:
(->
(with-catch
(http/get url http-options))
(d/chain
(fn [response]
(let [{:keys [body headers]} response
content-length (get headers "content-length")]
...
well that will work - but you will have an uncaught errored promise at the end... so you'll perhaps want a d/catch
too ? i like the "you don't have to think about it anymore" aspect of the safe-catch
macro approach, and the way it has the same shape as the d/catch
fn and plays the same way with threading
There is a d/catch
at the bottom, I didn’t put it here
I always finish the chain like with a common d/chain handler
with safe-catch
then you can just replace the d/catch
with the safe-catch
and you get everything
the whole pipeline in my example would be:
(->
(with-catch
(http/get url http-options))
(d/chain
(fn [response]
(:body response)))
(fn [body]
(process-the-body body))
(d/catch
(fn [^Exception e]
(reporn-error e))))
And also, can anyone help me with d/loop
? I’m a bit confused with it. What I’m going to do is to poll Kafka and feed messages from it to a stream. That stream should be processes by several workers. I’m not sure about how to build a d/loop logic.
Inside d/loop, I’ve got a d/chain clause that polls a consumer. However, this approach is blocking.
(->
nil
(d/chain
(fn [& _]
(consumer/poll! consumer poll-timeout))
(fn [messages]
(when-not (empty? messages)
(s/put-all! stream messages))))
(d/catch
(fn [^Throwable e]
(report-exeption e {:topic topic})))
(d/chain
(fn [& _]
(d/recur))))
But when I move polling into d/future
, that works:
(->
(d/future
(consumer/poll! consumer poll-timeout))
(d/chain
(fn [messages]
(when-not (empty? messages)
(s/put-all! stream messages))))
(d/catch
(fn [^Throwable e]
(report-exeption e {:topic topic})))
(d/chain
(fn [& _]
(d/recur))))
So my question is, why does the first variant block the main thread?
btw @mccraigmccraig thank you fore sharing your macro. Only now I’ve got what’s behind it, I’m a bit slow.
anyone familiar with aleph-http stuff, could you take a look please? https://github.com/ztellman/aleph/issues/500
(get!)
@igrishaev hmm - i've not seen this, and we often upload mobile video files which can get quite large - i wonder why your connection is being closed ?
so am I, really. I even managed to reproduce that in repl
first, get stream from http/get
, then wrap it with AWS stream what counts the number of bytes consumed
then read that wrapped stream in a loop into a byte-array
and it time the exception will araise
one thing i remember from way back... we don't stream directly to S3, we go via a tmpfile first - and we had terrible trouble (with the sort of random truncation errors you are seeing) until we started using the :raw-stream?
option and doing things async https://github.com/ztellman/aleph/blob/master/examples/src/aleph/examples/http.clj#L177
but we've got #yada in between us and raw #aleph so it may have been some other problem
if you haven't considered/are not aware of multipart uploads to s3, I would suggest checking those out. instead of having to deal with figuring out the size from a stream you can upload fixed size chunks
@mccraigmccraig I’m a bit confused about how to turn a raw-stream into a intputStream that AWS needs?
@igrishaev i haven't tried - we just wrote the bytes from the raw-stream to a tmpfile and then gave the whole tmpfile to the s3 uploader
@mccraigmccraig I see, but it won’t apply in my case. I’ve got to do the whole stuff in memory
but I put a comment in the issue, I figured out with clj-http
when you pass {:as :stream}
, the connection won’t close until you consume it till the end
also, I am pretty sure using multipart uploading is the only way to get true streaming, I think (I am not 100% sure) the cognitect aws-api clients turn inputstreams into byte arrays (read them all in as one big one) when passed
https://github.com/cognitect-labs/aws-api/blob/master/src/cognitect/aws/client.clj#L75 and https://github.com/cognitect-labs/aws-api/blob/master/src/cognitect/aws/util.clj#L135 (but I have just started using the library myself so I am not sure)