aleph

Ramon Rios 2020-03-13T08:02:45.023200Z

Hello everyone. Does someone already tried to handle erros while your using stream/consume ??

mccraigmccraig 2020-03-13T08:07:02.024100Z

what do you want to happen when there are errors in your callback @ramonp.rios ?

Ramon Rios 2020-03-13T08:07:17.024400Z

Let me give some code

Ramon Rios 2020-03-13T08:07:22.024600Z

and then explain

Ramon Rios 2020-03-13T08:08:16.024800Z

(let [handler (fn [msg]
                    (log/info (str "creating customer into " http-url " : " msg))
                    (->> @(add-customer-ror http-url msg)
                         (log/info "returned from server ")))
          stream (m/topic->stream conn topic)]
      (ms/consume handler stream))

Ramon Rios 2020-03-13T08:09:16.026200Z

I'm connecting to activemq and getting messages from topics. Every time i send a message , i get this message and save into a http endpoint

Ramon Rios 2020-03-13T08:09:51.026900Z

I test that, when i do not connect or got some 404 or another error in http, my function will handle it

Ramon Rios 2020-03-13T08:10:13.027500Z

Sending a message to another topic and/or restarting to consume

Ramon Rios 2020-03-13T08:11:03.028500Z

I read that catch is my function, but not useful because it's with deferred and not stream.

mccraigmccraig 2020-03-13T08:14:08.030500Z

if you use consume then the only error will be in your callback, not on the stream - you can use deferred/catch or just plain catch there (since you are derefing what i presume is a deferred from add-customer-ror)

mccraigmccraig 2020-03-13T08:21:50.033300Z

i would generally take a different approach - rather than using consume i would map the source stream with a fn which calls add-customer-ror , catches any error and stashes it in a marker record or variant etc... then you can [1] buffer the mapped stream to control your concurrency hitting the API, [2] get backpressure and [3] process the mapped stream to do anything you want with normal and error responses

mccraigmccraig 2020-03-13T08:24:03.034Z

(the final piece of the puzzle being to stream/reduce the mapped stream, to force consumption)

mccraigmccraig 2020-03-13T08:45:59.034600Z

oh, and of course stream/realize-each to turn any deferred values on a stream into plain values

Ramon Rios 2020-03-13T10:37:44.034800Z

Thank you @mccraigmccraig

Ramon Rios 2020-03-13T10:37:54.035100Z

I use it and really supply my needs

Ramon Rios 2020-03-13T10:38:14.035600Z

I put a catch in my handler and now it works as expected