core-async

orestis 2019-11-27T15:55:20.124200Z

Is trying to model an SQS queue with a core.async channel a good idea?

orestis 2019-11-27T15:57:05.125600Z

The obvious difference I can see is that SQS requires the consumer to explicitly delete the message after processing, so that requires a second “channel” of communication (could be a callback function, or could be just a “processed” channel where the consumer puts the message.

orestis 2019-11-27T15:57:32.126100Z

The only reason I bring core.async into this is to get an abstraction of sorts for testing.

mccraigmccraig 2019-11-27T16:10:43.128100Z

we've done similar in the past and used a core.async channel to model a kafka topic for testing

orestis 2019-11-27T16:11:38.129100Z

Good to know — seems a little bit low level for my tastes, figuring out threads and errors etc. I wonder if there’s a library on top of core.async that has some good opinions on all this.

mccraigmccraig 2019-11-27T16:13:17.130Z

iirc we hid all the detail of interacting with the particular transport behind a protocol, and it was really happy-path testing rather than attempting to make the core.async transport fully equivalent

👍 1
orestis 2019-11-27T16:20:49.131Z

Ah — so instead of trying to actually pull items off a SQS queue and putting them into a core.async channel, make a protocol with a few relevant functions, and implement it once for SQS for production, once for core.async for testing?

mccraigmccraig 2019-11-27T16:21:50.131200Z

yes, exactly

orestis 2019-11-27T16:23:59.132400Z

OK, I guess this frees me from the mental gymnastics in trying to merge the two worlds together. Making a protocol is actually quite easier than trying to wrap my head around all the possible ways I could screw up with core.async.

kenny 2019-11-27T17:46:40.134600Z

@orestis We have done something kind of similar to this as well. We have a generic multi-threaded "message processor". The message processor takes a function for fetching messages and a function for processing messages (and some other thread related stuff). The SQS wrapper implementation simply polls sqs for the fetch message fn. The process message function is passed a map of the sqs message and a function to delete the current message.

2019-11-27T18:10:17.137600Z

I think there is something interesting stuff to do directly extending the ReadPort and WritePort (the two halves of a channel) protocols to things like message queues. https://gist.github.com/hiredman/f2f3f9968ccdfa7c8b9bae74622b2041 (off the dome never actually run code) most projects seem to avoid that kind of thing though and instead having something that reads from a message queue and writes to a channel.

alexmiller 2019-11-27T18:29:51.138100Z

implementing back pressure properly (without introducing unbounded queues) is the tricky bit

2019-11-27T18:31:55.139400Z

the bounds of sqs are pretty large

2019-11-27T18:36:25.141600Z

But yeah, there is a bit of subtly, which is maybe part of the reason most people don't do it. Another thing is determining when to ack a message, the example code above acks the message when it is given to whoever is waiting for a message, but depending on your system you might want to withhold acks until some downstream process has completed

orestis 2019-11-27T18:58:14.141900Z

(defprotocol Dispatcher
  (receive [this] "Receive a message, blocking.")
  (delete [this msg] "Delete a message, indicate successful processing.")
  (msg-body [this msg] "Given an opaque message, retrieve the message body."))

(defn SQSDispatcher [client queue-url]
  (reify Dispatcher
    (receive [_] (sqs/receive-one-message client queue-url {}))
    (delete [_ msg] (sqs/delete-message client queue-url msg))
    (msg-body [_ msg] (:Body msg))))


(defn observe-dispatcher
  "Blocks, observing a `Dispatcher` until the atom `stop` is reset to false.
  Every message is processed through `process-fn`, errors are passed to `err-fn`
  and othwise ignored."
  [dispatcher process-fn err-fn stop]
  (loop []
    (when-not @stop
      (when-let [msg (receive dispatcher)]
        (try
          (process-fn (msg-body dispatcher msg))
          (delete dispatcher msg)
          (catch Exception e
            (err-fn e))))
      (recur))))

orestis 2019-11-27T18:58:47.142600Z

Here’s what I have so far. No core.async machinery at this level, but the receive call is blocking with timeout.

orestis 2019-11-27T19:00:09.143500Z

process-fn should throw if it can’t process the message, otherwise the message is ack’ed. Not sure if this is generic enough, but for this use case (email sending) seems ok to me.

orestis 2019-11-27T19:01:03.144600Z

I need to figure out how to deal with threads etc, probably some ExecutorService — any pointers on resources there? Would like to have a small bounded threadpool to deal with more than one msg at a time.

orestis 2019-11-27T19:01:36.145300Z

Some other out-of-bounds system would scale up in the case of a big SQS backlog, this code doesn’t care much.

Joe Lane 2019-11-27T19:06:08.145700Z

@orestis Maybe check out https://github.com/TheClimateCorporation/squeedo for inspiration?

Travis Martin 2019-11-27T19:23:14.147600Z

Does anybody know the rationale for the core.async/pipeline behavior change in the following commit https://github.com/clojure/core.async/commit/3429e3e1f1d49403bf9608b36dbd6715ffe4dd4f? This seems to contradict the documentation for pipeline vs pipeline-blocking.

2019-11-27T19:25:29.148500Z

it sounds like @alexmiller's position is that the previous behavior was in error

Travis Martin 2019-11-27T19:30:43.149700Z

ok, thanks @hiredman! that link answered my implicit question as well, which was what in particular was erroneous about the old implementation

alexmiller 2019-11-27T19:37:44.153300Z

by the by, I am currently writing some core.async docs (better 6 years late, than never, right?). starting to look unlikely I'm going to finish pre-Thanksgiving, but something to look for soon.

👍 10
1
4
1
alexmiller 2019-11-27T19:53:14.154600Z

the first is from Rich's original announcement, the latter is something I wrote long ago that's embedded in the repo but hard to find (have not revisited the content, just reformatted). The reference is new.

2019-11-27T23:36:58.159300Z

So I'm having trouble reproducing a minimal example of this. This is in clojurescript. The following code works fine. But I've been debuging a real, more complicated example, but which seems to be the same code in essence, and can't figure this out:

(def ch1 (a/chan 1))
(def m (a/mult ch1))
(def ch2 (a/tap m (a/chan 1 (comp (filter odd?)
                                  ;;(take 1)
                                  ))))

(doseq [x (range 10)]
  (a/go (a/>! ch1 x)))
(a/go
  (when-let [x (a/<! ch2)]
    (println "GOT" x)
    (a/close! ch2)))
This works fine and prints 1. In the real code however, nothing prints as it is, unless remove the a/close! line. Also, if I remove this line it works, but if I (equivalently?) uncomment the (take 1) transducer, again it fails, but changing it to (take 2) works fine. Any tips? I've been staring at this wayyyy too long

2019-11-27T23:40:58.160300Z

I'm just confused how closing the chan after it's already been taken from could possibly prevent it from doing the take in the first place

2019-11-27T23:42:03.160700Z

the close of ch2 is allowed to happen before the put on ch1

2019-11-27T23:43:01.161100Z

I guess that doesn't really y explain anything here

2019-11-27T23:44:27.162Z

wait so ch2 can close before the put on ch1, even though it's after the take in the go block?

2019-11-27T23:45:09.162600Z

if there was anything to read on ch2, sure

2019-11-27T23:45:36.163300Z

but in that example, something would first be printed then right?

2019-11-27T23:45:52.163900Z

also the put of 1 could happen before the put of 0 (not likely, but allowed)

2019-11-27T23:46:18.164500Z

yes, a print would happen before the close, so that alone doesn't explain anything

2019-11-27T23:47:38.165700Z

yeah. It's probably something else i'm doing that the example above doesn't include. Just feel the need to brainstorm. (the go block is in fact basically identical)

2019-11-27T23:48:27.166300Z

also, one should usually use put! instead of &gt;! inside go

2019-11-27T23:48:38.166600Z

(if that is all the go is doing)

2019-11-27T23:49:04.166800Z

ah yes

2019-11-27T23:50:01.167300Z

do you happen to know if it's possible for clojurescript go blocks to cause a deadlock?

2019-11-27T23:50:38.167900Z

Or is it the thread pool in clj that makes that a problem and cljs just "rewrites callbacks"?

2019-11-27T23:51:11.168700Z

in cljs the scheduling will be more predictable iirc, and a blocking op in a go block will block the entire vm

2019-11-27T23:51:42.169400Z

js has a threadpool, the pool has 1 thread, and everything happens on it

2019-11-27T23:52:29.170200Z

so if you are doing something on the js main thread, and never yielding, no core.async work (or much of anything else you would want js to do) will happen

2019-11-27T23:53:06.170500Z

ok hmmm

2019-11-27T23:53:45.171300Z

a channel is a meeting place, where two threads of control can meet and exchange values

2019-11-27T23:54:35.172400Z

if one thread of control is at a channel to exchange a value, but another never meets it, it will be there indefinitely, which looks similar to deadlock