I wanna move elements from channel1 to channel2 until I see a specific token, is there a way to do this?
I tried using pipe
to move from channel1 to channel2 with transducer take-while
but it drains 1 more element from the channel1 than it should
(let [cc (async/to-chan [0 1 2 3])
cc2 (async/chan 1000 (take-while #(not= 0 %)))
_ (async/pipe cc cc2)]
(println (async/<!! cc2))
(println (async/<!! cc)))
this will print nil
and 2
respectively, but should print nil
and 1
Might be a good case for halt-when
halt-when?
http://clojure.github.io/clojure/clojure.core-api.html#clojure.core/halt-when
didn’t know that existed
at first glance it just looks like inverse of take-while
I guess you may still drain one more than desired
Not sure, not at a repl to try it
yeah
Could it be related to the time when transducers are executed, which could be either during take or put
still same problem
I see it's not related to when the transducer is executed, the code would work ok for any sequence not starting with 0, but when it does, it just eats the next value... would think this is a bug
This is just the nature of how transducers work
I’m trying to create a non-blocking/async service that for each inbound connection it makes several outbound calls and then combines the results of all those calls into its ultimate return. I’m using Pedestal to handle the inbound and HTTP-Kit for the outbound. Here is the code:
(ns pedrepl.fetch
(:require [org.httpkit.client :as http]
[clojure.core.async :as async]))
(defn fetcher [options url]
(let [c (async/chan 1)
p (http/get url options
(fn [{:keys [status headers body error]}] ;; asynchronous response handling
(if error
(async/>!! c (str "Failed, exception is " error))
(async/>!! c body))
(async/close! c)))]
c))
(defn fetch-bids []
(let [urls ["<http://localhost:8090/quick>" "<http://localhost:8090/slow>"]
options {:timeout 200 :as :text}
results (mapv (partial fetcher options) urls)]
(map async/<!! results)))
I would like for the “(map async/<!! results)” to be parking and not blocking, but I don’t see how to do this. Any advice would be appreciated.
can you just use a go loop to loop-recur through the urls instead?
oh i'm sorry I was looking at the wrong line
actually I'm having a hard time getting a deterministic result with the following code, sometimes I get cc=2
and other times cc=1
(defn thread-name []
(.getName (Thread/currentThread)))
(defn p [& args]
(locking *out*
(apply println (thread-name) ":" args)))
(defn log [msg v]
(p msg v)
v)
(let [cc (async/to-chan [-2 -1 0 1 2 3])
cc2 (async/chan 1000 (take-while (fn [x] (log "transducing" x) (not= 0 x))))
_ (async/pipe cc cc2)]
(p "cc2=" (async/<!! cc2))
(p "cc=" (async/<!! cc)))
parking ops only exist in go blocks
that code was inspired from https://stackoverflow.com/questions/34847872/how-are-transducers-executed-in-core-async-channels
yes, but putting the “(map async/<!! results)” in a block as in (go (map async/<! results)) does not work
you probably want something more like run! than map
or use alts!
With a parking op/go you're either gonna have to do something with each result as you get it or return a chan (which you could still do a blocking take from if you want)
I’ve tried using doseq, but cant figure out how to get the result back as the return of the “go” that the pedestal route is waiting on. The whole thing is started with:
(def my-interceptor
{:name ::my-interceptor
:enter (fn [context]
(async/go
(assoc context :response (ring-resp/response (fetch/fetch-bids)))))})
;; Tabular routes
(def routes #{["/" :get (conj common-interceptors my-interceptor)]
["/about" :get (conj common-interceptors `about-page)]})
(<!!
(go-loop [results (set results)
x []]
(if-let [[v ch] (and (not-empty results)
(alts! results))]
(recur (disj results ch) (conj x v))
results)))
something like that, or you can do something stateful with each result in the loop
not sure I understand the “<!!” at the beginning. Isn’t that blocking? Also, I dont see what channel it is taking from?
go-loop returns a chan that contains the result of the go loop
there's no way to do a parking op without returning a chan so you have to either do something with the values or block somewhere
so there is still a thread blocking waiting on the go-loop…doesn’t that result in 1 thread per connection/request?
there is due to the <!!
but you could omit it and do something like this instead:
(go-loop [results (set results)
x []]
(when-let [[v ch] (and (not-empty results)
(alts! results))]
(save-result-to-database! v)
(recur (disj results ch) (conj x v))))
although i guess that's not ideal either bc you're not really supposed to do blocking ops like database calls in the go-loop
I feel like there should be a way to have a pool of threads handling inbound requests and parking, what pedestal does and a pool of thread handling non-blocking outbound calls, what http-kit does and connect the two with a thread pool in the middles that is also non-blocking.
you do not understand
unless the outbound driver supports parking, you are always going to use threads
just because you ditch threads and are fully “async” doesn’t mean some library (like http-kit client) down the stack doesn’t use a threadpool
you just move the bottleneck from your code into the library that’s doing the calls
the only real way to get massive concurrency in terms of number of connections without bloating thread counts is to use IO that uses epoll, like java.nio
but the problem is that your libraries have to use it too, otherwise it doesn’t matter
I’ve seen dozens of people boast about making their code fully “async”, but then they use a standard JDBC driver which just eats a thread until query is completed, and then they have to make connection pool larger or the queues in front of connection pool get impossibly long
The reality of situation is that “async” is just snake oil 95% of the time. Most of the time even a fairly medium sized thread pool and synchronous processing saturates your CPU, GC or IO bandwidth, so making it async won’t magically make it faster, in fact it will decrease throughput
and code is simpler in synchronous
http-kit uses a threadpool, it works via callbacks and NIO.
http-kit uses http-client, which uses apache http client
regardless, it does not matter…I would like to connect its callbacks (either actual or apparent async) to pedestal
pedestal has a callback right?
that is, it has a function you call wth the result
pedestal expects a channel which will contain a “context”
you can reduce a sequence of channels in a go block into a result
each http-kit success callback pushes a value into result channel
you can create aggregate channel with async/into
so each callback still has its own channel, and I would combine them into one with async/into?
there’s a lot of ways to do it
http-kit is based on https://github.com/cch1/http.async.client (at least according to its project.clj)
if your http-kit code returns channel
then use async/merge
to merge all channels into a new channel that has all the results
does async/merge bock the underlying thread?
not it uses go-loop
it returns a channel
which then has a single collection value
i’ll give it a try, thanks!
and if you need to modify that
you can do (async/go {:body (async/<! merged-chan) :status 200})
which returns another chan that you can give to pedestal
this is all nonblocking
thanks!
go and go-loop blocks return channel that has 1 value and that is the return/last value of the block
👋
Hey, if you create a (chan)
and don’t close it, but it’s no longer referenced anywhere; will that cause problems in the runtime or is it automatically garbage collected?
(let [my-chan (chan)]
(close! chan)
nil)
;; vs
(let [my-chan (chan)]
nil)
It will be gc'ed
Nice, so if I don’t save it to an atom, or global definition, I don’t have to manually close it?
You don't have to
All right, thank you 🙂