Hello everyone. Does someone already tried to handle erros while your using stream/consume
??
what do you want to happen when there are errors in your callback @ramonp.rios ?
Let me give some code
and then explain
(let [handler (fn [msg]
(log/info (str "creating customer into " http-url " : " msg))
(->> @(add-customer-ror http-url msg)
(log/info "returned from server ")))
stream (m/topic->stream conn topic)]
(ms/consume handler stream))
I'm connecting to activemq and getting messages from topics. Every time i send a message , i get this message and save into a http endpoint
I test that, when i do not connect or got some 404 or another error in http, my function will handle it
Sending a message to another topic and/or restarting to consume
I read that catch
is my function, but not useful because it's with deferred and not stream.
if you use consume
then the only error will be in your callback, not on the stream - you can use deferred/catch
or just plain catch
there (since you are derefing what i presume is a deferred from add-customer-ror
)
i would generally take a different approach - rather than using consume
i would map
the source stream with a fn which calls add-customer-ror
, catches any error and stashes it in a marker record or variant etc... then you can [1] buffer the mapped stream to control your concurrency hitting the API, [2] get backpressure and [3] process the mapped stream to do anything you want with normal and error responses
(the final piece of the puzzle being to stream/reduce
the mapped stream, to force consumption)
oh, and of course stream/realize-each
to turn any deferred values on a stream into plain values
Thank you @mccraigmccraig
I use it and really supply my needs
I put a catch in my handler and now it works as expected