core-async

wimomisterx 2020-11-23T00:26:35.248700Z

Hi, I want to know if I'm doing this correctly.

(defn long-running-shell-command [x]
  (a/thread 
    (:out (sh "foo" x))))

(defn handler [val res-ch]
  (a/go
    (a/>! res-ch (a/<! long-running-shell-command val))
    (a/close! res-ch)))

(let [out-ch (a/chan)]
  (a/pipeline-async 5 out-ch handler (a/to-chan (range 10))))
Since the long-running-shell-command is blocking io, it shouldn't be run on a go block instead I run it on a core async thread and pull from inside the go block in handler. Is this the correct approach on doing this, or is there a better way?

2020-11-23T01:23:31.249700Z

I would recommend using pipeline-blocking

2020-11-23T01:28:22.252900Z

pipeline-async has some odd behavior that can be very intuitive, like if I recall correctly your example will happily run your shell command with more than 5 concurrent executions

wimomisterx 2020-11-23T02:26:12.253Z

Yep, thats correct it does upto n + 2 concurrent executions. I'm fine with that behaviour. What I really want to know is pulling from a thread in a go block is that okay?

2020-11-23T03:12:43.253200Z

For the other pipelines it is n+2

2020-11-23T03:14:45.253400Z

For pipeline-async, again it has been a while, but it is more complicated, but basically the n doesn't limit anything

2020-11-23T03:16:40.253600Z

Basically your xf will have at most n concurrent calls and returns, but because xf is assumed to be asynchronous, it continues to run even after it has returned, so you can effectively unlimited overlap of asynchronous executions

2020-11-23T09:13:17.255100Z

I need to write transformed events on a channel out to a file using transit-clj. I presume I just wrap a with-open in a async/thread and do a synchronous take from the channel and write each object to the file? (this is in a batch system)

2020-11-23T09:14:09.256100Z

I suppose just doing a normal loop/recur in the async/thread will keep things going. I just haven't thought my way around terminating the loop yet (I suppose on the nil from the channel)

Ben Sless 2020-11-23T09:34:49.256200Z

nil from the channel is the correct "pattern". If you take nil it means the channel is closed, which means work is surely done. Use channel closure to signal process stoppage instead of "special" messages. See https://github.com/bsless/more.async/blob/master/src/main/clojure/clojure/more/async.clj#L108 example

2020-11-23T10:54:13.256500Z

@ben.sless that looks like a useful example and exactly what I am after. Thx!

👍 1
Ben Sless 2020-11-23T10:58:06.256800Z

Note the blocking produce/consume versions are agnostic of your threading model. You can run them in a future, async/thread or something else.

2020-11-23T19:50:49.257Z

but don't use them inside a go block :D

☝️ 2
Ben Sless 2020-11-23T21:06:21.257300Z

Yup. Until we get virtual threads never do blocking operations in go blocks.

2020-11-23T21:16:02.257600Z

Yeah, don't block the thread pool

2020-11-23T21:17:23.257800Z

I'm just wondering if I'd want it to return anything as it is just a side effect to write out a file. Possibly the path to the file so a future might be easier for that. Idk

2020-11-23T21:17:39.258Z

Thread returns a channel iirc

elarouss 2020-11-23T22:51:54.260600Z

Hello, I’m trying to build a chat-like feature using websockets, and i need users (only 2) to send some initial data to each other. this is an example of what I’ve done so far, but i’m missing something:

(def signals (chan))
(def publication (pub signals :source))

(defn user-handler [signals publication room user_id other_id data]
  (let [user-chan (chan)
        topic (keyword (str room "_" user_id))]
    (sub publication topic user-chan)
    (go
      (>! signals {:source   (keyword (str room "_" other_id))
                   :room     room
                   :user_id  user_id
                   :other_id other_id
                   :data     data}))
    (go
      (let [timeout-chan (timeout 15000)]
        (alt!
          timeout-chan :timeout
          user-chan ([received-data] (println received-data)))))))


(comment
  (user-handler signals publication "room1" 1 2 "Hello from user1")
  (user-handler signals publication "room1" 2 1 "Hello from user2")
  )

;; expect to print:
{:source :room1_2, :room "room1", :user_id 1, :other_id 2, :data "Hello from user1"}
{:source :room1_1, :room "room1", :user_id 2, :other_id 1, :data "Hello from user2"}

;; got:
{:source :room1_1, :room "room1", :user_id 2, :other_id 1, :data "Hello from user2"} 

elarouss 2020-11-24T14:38:39.261Z

I’ve posted the question on stackoverflow https://stackoverflow.com/questions/64988697/how-to-communicate-through-channels-in-clojures-core-async

Jakub Holý 2020-11-23T23:05:44.260800Z

Interesting library, thanks!