Is trying to model an SQS queue with a core.async channel a good idea?
The obvious difference I can see is that SQS requires the consumer to explicitly delete the message after processing, so that requires a second “channel” of communication (could be a callback function, or could be just a “processed” channel where the consumer puts the message.
The only reason I bring core.async into this is to get an abstraction of sorts for testing.
we've done similar in the past and used a core.async channel to model a kafka topic for testing
Good to know — seems a little bit low level for my tastes, figuring out threads and errors etc. I wonder if there’s a library on top of core.async that has some good opinions on all this.
iirc we hid all the detail of interacting with the particular transport behind a protocol, and it was really happy-path testing rather than attempting to make the core.async transport fully equivalent
Ah — so instead of trying to actually pull items off a SQS queue and putting them into a core.async channel, make a protocol with a few relevant functions, and implement it once for SQS for production, once for core.async for testing?
yes, exactly
OK, I guess this frees me from the mental gymnastics in trying to merge the two worlds together. Making a protocol is actually quite easier than trying to wrap my head around all the possible ways I could screw up with core.async.
@orestis We have done something kind of similar to this as well. We have a generic multi-threaded "message processor". The message processor takes a function for fetching messages and a function for processing messages (and some other thread related stuff). The SQS wrapper implementation simply polls sqs for the fetch message fn. The process message function is passed a map of the sqs message and a function to delete the current message.
I think there is something interesting stuff to do directly extending the ReadPort and WritePort (the two halves of a channel) protocols to things like message queues. https://gist.github.com/hiredman/f2f3f9968ccdfa7c8b9bae74622b2041 (off the dome never actually run code) most projects seem to avoid that kind of thing though and instead having something that reads from a message queue and writes to a channel.
implementing back pressure properly (without introducing unbounded queues) is the tricky bit
the bounds of sqs are pretty large
But yeah, there is a bit of subtly, which is maybe part of the reason most people don't do it. Another thing is determining when to ack a message, the example code above acks the message when it is given to whoever is waiting for a message, but depending on your system you might want to withhold acks until some downstream process has completed
(defprotocol Dispatcher
(receive [this] "Receive a message, blocking.")
(delete [this msg] "Delete a message, indicate successful processing.")
(msg-body [this msg] "Given an opaque message, retrieve the message body."))
(defn SQSDispatcher [client queue-url]
(reify Dispatcher
(receive [_] (sqs/receive-one-message client queue-url {}))
(delete [_ msg] (sqs/delete-message client queue-url msg))
(msg-body [_ msg] (:Body msg))))
(defn observe-dispatcher
"Blocks, observing a `Dispatcher` until the atom `stop` is reset to false.
Every message is processed through `process-fn`, errors are passed to `err-fn`
and othwise ignored."
[dispatcher process-fn err-fn stop]
(loop []
(when-not @stop
(when-let [msg (receive dispatcher)]
(try
(process-fn (msg-body dispatcher msg))
(delete dispatcher msg)
(catch Exception e
(err-fn e))))
(recur))))
Here’s what I have so far. No core.async machinery at this level, but the receive
call is blocking with timeout.
process-fn
should throw if it can’t process the message, otherwise the message is ack’ed. Not sure if this is generic enough, but for this use case (email sending) seems ok to me.
I need to figure out how to deal with threads etc, probably some ExecutorService — any pointers on resources there? Would like to have a small bounded threadpool to deal with more than one msg at a time.
Some other out-of-bounds system would scale up in the case of a big SQS backlog, this code doesn’t care much.
@orestis Maybe check out https://github.com/TheClimateCorporation/squeedo for inspiration?
Does anybody know the rationale for the core.async/pipeline
behavior change in the following commit https://github.com/clojure/core.async/commit/3429e3e1f1d49403bf9608b36dbd6715ffe4dd4f? This seems to contradict the documentation for pipeline
vs pipeline-blocking
.
https://ask.clojure.org/index.php/8805/where-is-the-blocking-operation
it sounds like @alexmiller's position is that the previous behavior was in error
ok, thanks @hiredman! that link answered my implicit question as well, which was what in particular was erroneous about the old implementation
by the by, I am currently writing some core.async docs (better 6 years late, than never, right?). starting to look unlikely I'm going to finish pre-Thanksgiving, but something to look for soon.
This may change structure, but work in progress: • Rationale - https://github.com/clojure/clojure-site/blob/async/content/about/async.adoc • Reference - https://github.com/clojure/clojure-site/blob/async/content/reference/async.adoc • Walkthrough - https://github.com/clojure/clojure-site/blob/async/content/guides/async-walkthrough.adoc
the first is from Rich's original announcement, the latter is something I wrote long ago that's embedded in the repo but hard to find (have not revisited the content, just reformatted). The reference is new.
So I'm having trouble reproducing a minimal example of this. This is in clojurescript. The following code works fine. But I've been debuging a real, more complicated example, but which seems to be the same code in essence, and can't figure this out:
(def ch1 (a/chan 1))
(def m (a/mult ch1))
(def ch2 (a/tap m (a/chan 1 (comp (filter odd?)
;;(take 1)
))))
(doseq [x (range 10)]
(a/go (a/>! ch1 x)))
(a/go
(when-let [x (a/<! ch2)]
(println "GOT" x)
(a/close! ch2)))
This works fine and prints 1. In the real code however, nothing prints as it is, unless remove the a/close!
line. Also, if I remove this line it works, but if I (equivalently?) uncomment the (take 1)
transducer, again it fails, but changing it to (take 2)
works fine. Any tips? I've been staring at this wayyyy too longI'm just confused how closing the chan after it's already been taken from could possibly prevent it from doing the take in the first place
the close of ch2 is allowed to happen before the put on ch1
I guess that doesn't really y explain anything here
wait so ch2 can close before the put on ch1, even though it's after the take in the go block?
if there was anything to read on ch2, sure
but in that example, something would first be printed then right?
also the put of 1 could happen before the put of 0 (not likely, but allowed)
yes, a print would happen before the close, so that alone doesn't explain anything
yeah. It's probably something else i'm doing that the example above doesn't include. Just feel the need to brainstorm. (the go block is in fact basically identical)
also, one should usually use put!
instead of >!
inside go
(if that is all the go is doing)
ah yes
do you happen to know if it's possible for clojurescript go blocks to cause a deadlock?
Or is it the thread pool in clj that makes that a problem and cljs just "rewrites callbacks"?
in cljs the scheduling will be more predictable iirc, and a blocking op in a go block will block the entire vm
js has a threadpool, the pool has 1 thread, and everything happens on it
so if you are doing something on the js main thread, and never yielding, no core.async work (or much of anything else you would want js to do) will happen
ok hmmm
a channel is a meeting place, where two threads of control can meet and exchange values
if one thread of control is at a channel to exchange a value, but another never meets it, it will be there indefinitely, which looks similar to deadlock