aleph

kachayev 2019-03-28T05:52:37.003600Z

Well, technically anything that you run on JVM “consumes” thread in some meaning of the word “consume”. Even “non-blocking” calls. @igrishaev the code you’ve showed is actually doing some redundant work moving computation to manifold.executor/execute-pool. If you just remove d/future it should give you the same final result.

igrishaev 2019-03-28T09:46:27.005600Z

one more example: when a URL is nil, I get an exception immediately, like this:

(-> (http/get nil) 
    (d/chain 
     (fn [result] :good)))
;; throws Execution error (URISyntaxException) at java.net.URI$Parser/fail
But with the following approach, I’ll capture any exception with d/catch:
(-> nil 
    
    (d/chain 
     #(http/get %) 
     (fn [result] :good)) 
    
    (d/catch 
        (fn [e] :error)))

igrishaev 2019-03-28T09:46:57.006Z

I wonder if the second variant is alright?

igrishaev 2019-03-28T09:56:45.006700Z

so the idea is, I’d like to have only one entry point for all the the exceptions.

mccraigmccraig 2019-03-28T10:24:02.008300Z

@igrishaev we do that with a macro - our real code would be confusing, 'cos it's got a lot of cross-platform stuff in, but something like this should do it

mccraigmccraig 2019-03-28T10:24:07.008500Z

(defmacro safe-catch
  [pr-form error-handler]
  `(manifold.deferred/catch
       (try
         ~pr-form
         (catch Exception x#
             (manifold.deferred/error-deferred x#)))
       ~error-handler))

mccraigmccraig 2019-03-28T10:25:27.008900Z

then you have

mccraigmccraig 2019-03-28T10:25:33.009200Z

(def r (-> (manifold.deferred/success-deferred 1)
           (manifold.deferred/chain inc)
           (manifold.deferred/chain
            (fn [& args] (throw (ex-info "boo" {}))))
           (safe-catch identity)))

mccraigmccraig 2019-03-28T10:25:35.009400Z

or

mccraigmccraig 2019-03-28T10:25:43.009600Z

(def r (-> (throw (ex-info "boo" {}))
           (safe-catch identity)))

mccraigmccraig 2019-03-28T10:26:14.010200Z

and you get a promise out the end even if the exception happens before the first callback

igrishaev 2019-03-28T10:27:07.011Z

yes, that’s approach is clear to me. I’m wondering what might be wrong in the second block of code I posted above?

igrishaev 2019-03-28T10:27:37.011500Z

when you just pass a value and the main function is under a chain

alexyakushev 2019-03-28T10:28:26.012500Z

Sorry, missed the threading.

mccraigmccraig 2019-03-28T10:29:08.013400Z

@alexyakushev no, it throws immediately, but the error is caught and returned as an errored promise

igrishaev 2019-03-28T10:29:29.013800Z

yes, so would be great to hear your thoughts on that

mccraigmccraig 2019-03-28T10:30:48.015Z

@igrishaev i like the "always use safe-catch rather than catch" approach because it's easy to find violations with ag... but chaining a simple value should also work fine

mccraigmccraig 2019-03-28T10:31:46.015700Z

and when you get into more complex scenarios with some other promise which you need to compose on to... having a catch which works in all situations is nice

igrishaev 2019-03-28T10:42:40.016600Z

ok I made my own version of your macro:

(defmacro with-catch
  [& body]
  `(try
     ~@body
     (catch Throwable e#
       (d/error-deferred e#))))
so the chain body looks like:
(->

       (with-catch
         (http/get url http-options))

       (d/chain

        (fn [response]

          (let [{:keys [body headers]} response
                content-length (get headers "content-length")]
...

mccraigmccraig 2019-03-28T10:46:30.018600Z

well that will work - but you will have an uncaught errored promise at the end... so you'll perhaps want a d/catch too ? i like the "you don't have to think about it anymore" aspect of the safe-catch macro approach, and the way it has the same shape as the d/catch fn and plays the same way with threading

igrishaev 2019-03-28T10:47:42.019100Z

There is a d/catch at the bottom, I didn’t put it here

igrishaev 2019-03-28T10:48:09.019600Z

I always finish the chain like with a common d/chain handler

mccraigmccraig 2019-03-28T10:50:19.020600Z

with safe-catch then you can just replace the d/catch with the safe-catch and you get everything

igrishaev 2019-03-28T10:50:40.021Z

the whole pipeline in my example would be:

(->

 (with-catch
     (http/get url http-options))

 (d/chain

  (fn [response]
      (:body response)))
 
  (fn [body]
    (process-the-body body))

 (d/catch
  (fn [^Exception e]
    (reporn-error e))))

igrishaev 2019-03-28T10:56:21.022800Z

And also, can anyone help me with d/loop? I’m a bit confused with it. What I’m going to do is to poll Kafka and feed messages from it to a stream. That stream should be processes by several workers. I’m not sure about how to build a d/loop logic.

igrishaev 2019-03-28T10:57:08.023800Z

Inside d/loop, I’ve got a d/chain clause that polls a consumer. However, this approach is blocking.

igrishaev 2019-03-28T10:58:04.024Z

(->

 nil

 (d/chain

  (fn [& _]
      (consumer/poll! consumer poll-timeout))

  (fn [messages]
      (when-not (empty? messages)
                (s/put-all! stream messages))))

 (d/catch
  (fn [^Throwable e]
      (report-exeption e {:topic topic})))

 (d/chain
  (fn [& _]
      (d/recur))))

igrishaev 2019-03-28T10:58:39.024600Z

But when I move polling into d/future, that works:

igrishaev 2019-03-28T10:59:05.024800Z

(->

 (d/future
   (consumer/poll! consumer poll-timeout))

 (d/chain

  (fn [messages]
      (when-not (empty? messages)
                (s/put-all! stream messages))))

 (d/catch
  (fn [^Throwable e]
      (report-exeption e {:topic topic})))

 (d/chain
  (fn [& _]
      (d/recur))))

igrishaev 2019-03-28T10:59:58.025600Z

So my question is, why does the first variant block the main thread?

igrishaev 2019-03-28T11:24:44.026500Z

btw @mccraigmccraig thank you fore sharing your macro. Only now I’ve got what’s behind it, I’m a bit slow.

igrishaev 2019-03-28T14:12:10.027300Z

anyone familiar with aleph-http stuff, could you take a look please? https://github.com/ztellman/aleph/issues/500

igrishaev 2019-03-28T14:12:18.027600Z

(get!)

mccraigmccraig 2019-03-28T16:16:59.029300Z

@igrishaev hmm - i've not seen this, and we often upload mobile video files which can get quite large - i wonder why your connection is being closed ?

igrishaev 2019-03-28T16:17:45.029800Z

so am I, really. I even managed to reproduce that in repl

igrishaev 2019-03-28T16:18:58.031200Z

first, get stream from http/get, then wrap it with AWS stream what counts the number of bytes consumed

igrishaev 2019-03-28T16:19:30.032100Z

then read that wrapped stream in a loop into a byte-array

igrishaev 2019-03-28T16:19:50.032500Z

and it time the exception will araise

mccraigmccraig 2019-03-28T16:24:08.034400Z

one thing i remember from way back... we don't stream directly to S3, we go via a tmpfile first - and we had terrible trouble (with the sort of random truncation errors you are seeing) until we started using the :raw-stream? option and doing things async https://github.com/ztellman/aleph/blob/master/examples/src/aleph/examples/http.clj#L177

mccraigmccraig 2019-03-28T16:24:29.035Z

but we've got #yada in between us and raw #aleph so it may have been some other problem

2019-03-28T16:47:27.036800Z

if you haven't considered/are not aware of multipart uploads to s3, I would suggest checking those out. instead of having to deal with figuring out the size from a stream you can upload fixed size chunks

1☝️
igrishaev 2019-03-28T17:00:23.037600Z

@mccraigmccraig I’m a bit confused about how to turn a raw-stream into a intputStream that AWS needs?

mccraigmccraig 2019-03-28T17:01:40.038600Z

@igrishaev i haven't tried - we just wrote the bytes from the raw-stream to a tmpfile and then gave the whole tmpfile to the s3 uploader

igrishaev 2019-03-28T17:02:30.039400Z

@mccraigmccraig I see, but it won’t apply in my case. I’ve got to do the whole stuff in memory

igrishaev 2019-03-28T17:03:01.040Z

but I put a comment in the issue, I figured out with clj-http

igrishaev 2019-03-28T17:03:50.041100Z

when you pass {:as :stream}, the connection won’t close until you consume it till the end

2019-03-28T17:34:28.042900Z

also, I am pretty sure using multipart uploading is the only way to get true streaming, I think (I am not 100% sure) the cognitect aws-api clients turn inputstreams into byte arrays (read them all in as one big one) when passed