Hi, I want to know if I'm doing this correctly.
(defn long-running-shell-command [x]
(a/thread
(:out (sh "foo" x))))
(defn handler [val res-ch]
(a/go
(a/>! res-ch (a/<! long-running-shell-command val))
(a/close! res-ch)))
(let [out-ch (a/chan)]
(a/pipeline-async 5 out-ch handler (a/to-chan (range 10))))
Since the long-running-shell-command
is blocking io, it shouldn't be run on a go
block instead I run it on a core async thread
and pull from inside the go
block in handler
. Is this the correct approach on doing this, or is there a better way?I would recommend using pipeline-blocking
pipeline-async has some odd behavior that can be very intuitive, like if I recall correctly your example will happily run your shell command with more than 5 concurrent executions
Yep, thats correct it does upto n + 2
concurrent executions.
I'm fine with that behaviour.
What I really want to know is pulling from a thread
in a go
block is that okay?
For the other pipelines it is n+2
For pipeline-async, again it has been a while, but it is more complicated, but basically the n doesn't limit anything
Basically your xf will have at most n concurrent calls and returns, but because xf is assumed to be asynchronous, it continues to run even after it has returned, so you can effectively unlimited overlap of asynchronous executions
I need to write transformed events on a channel out to a file using transit-clj. I presume I just wrap a with-open in a async/thread and do a synchronous take from the channel and write each object to the file? (this is in a batch system)
I suppose just doing a normal loop/recur in the async/thread will keep things going. I just haven't thought my way around terminating the loop yet (I suppose on the nil from the channel)
nil from the channel is the correct "pattern". If you take nil it means the channel is closed, which means work is surely done. Use channel closure to signal process stoppage instead of "special" messages. See https://github.com/bsless/more.async/blob/master/src/main/clojure/clojure/more/async.clj#L108 example
@ben.sless that looks like a useful example and exactly what I am after. Thx!
Note the blocking produce/consume versions are agnostic of your threading model. You can run them in a future, async/thread or something else.
but don't use them inside a go block :D
Yup. Until we get virtual threads never do blocking operations in go blocks.
Yeah, don't block the thread pool
I'm just wondering if I'd want it to return anything as it is just a side effect to write out a file. Possibly the path to the file so a future might be easier for that. Idk
Thread returns a channel iirc
Hello, I’m trying to build a chat-like feature using websockets, and i need users (only 2) to send some initial data to each other. this is an example of what I’ve done so far, but i’m missing something:
(def signals (chan))
(def publication (pub signals :source))
(defn user-handler [signals publication room user_id other_id data]
(let [user-chan (chan)
topic (keyword (str room "_" user_id))]
(sub publication topic user-chan)
(go
(>! signals {:source (keyword (str room "_" other_id))
:room room
:user_id user_id
:other_id other_id
:data data}))
(go
(let [timeout-chan (timeout 15000)]
(alt!
timeout-chan :timeout
user-chan ([received-data] (println received-data)))))))
(comment
(user-handler signals publication "room1" 1 2 "Hello from user1")
(user-handler signals publication "room1" 2 1 "Hello from user2")
)
;; expect to print:
{:source :room1_2, :room "room1", :user_id 1, :other_id 2, :data "Hello from user1"}
{:source :room1_1, :room "room1", :user_id 2, :other_id 1, :data "Hello from user2"}
;; got:
{:source :room1_1, :room "room1", :user_id 2, :other_id 1, :data "Hello from user2"}
I’ve posted the question on stackoverflow https://stackoverflow.com/questions/64988697/how-to-communicate-through-channels-in-clojures-core-async
Interesting library, thanks!