core-async

roklenarcic 2019-12-13T12:37:40.002300Z

I wanna move elements from channel1 to channel2 until I see a specific token, is there a way to do this?

roklenarcic 2019-12-13T12:39:14.003100Z

I tried using pipe to move from channel1 to channel2 with transducer take-while but it drains 1 more element from the channel1 than it should

roklenarcic 2019-12-13T12:41:02.003400Z

(let [cc (async/to-chan [0 1 2 3])
      cc2 (async/chan 1000 (take-while #(not= 0 %)))
      _ (async/pipe cc cc2)]
  (println (async/<!! cc2))
  (println (async/<!! cc)))

roklenarcic 2019-12-13T12:41:29.003900Z

this will print nil and 2 respectively, but should print nil and 1

alexmiller 2019-12-13T12:45:03.004500Z

Might be a good case for halt-when

roklenarcic 2019-12-13T12:45:21.005100Z

halt-when?

roklenarcic 2019-12-13T12:47:22.005500Z

didn’t know that existed

roklenarcic 2019-12-13T12:47:59.006500Z

at first glance it just looks like inverse of take-while

alexmiller 2019-12-13T12:48:25.007200Z

I guess you may still drain one more than desired

alexmiller 2019-12-13T12:49:12.008100Z

Not sure, not at a repl to try it

roklenarcic 2019-12-13T12:49:23.008500Z

yeah

fmjrey 2019-12-13T12:49:24.008600Z

Could it be related to the time when transducers are executed, which could be either during take or put

roklenarcic 2019-12-13T12:49:28.008800Z

still same problem

fmjrey 2019-12-13T13:16:00.010500Z

I see it's not related to when the transducer is executed, the code would work ok for any sequence not starting with 0, but when it does, it just eats the next value... would think this is a bug

alexmiller 2019-12-13T13:21:45.010900Z

This is just the nature of how transducers work

Steven Katz 2019-12-13T14:32:29.017400Z

I’m trying to create a non-blocking/async service that for each inbound connection it makes several outbound calls and then combines the results of all those calls into its ultimate return. I’m using Pedestal to handle the inbound and HTTP-Kit for the outbound. Here is the code:

(ns pedrepl.fetch
  (:require [org.httpkit.client :as http]
            [clojure.core.async :as async]))
          

(defn fetcher [options url] 
  (let [c (async/chan 1)
        p (http/get url options
                     (fn [{:keys [status headers body error]}] ;; asynchronous response handling
                       (if error
                         (async/>!! c (str "Failed, exception is " error))
                         (async/>!! c body))
                       (async/close! c)))]
    c))

(defn fetch-bids [] 
  (let [urls ["<http://localhost:8090/quick>" "<http://localhost:8090/slow>"]
        options {:timeout 200 :as :text}
        results (mapv (partial fetcher options) urls)]
    (map async/&lt;!! results)))

Steven Katz 2019-12-13T14:34:41.018900Z

I would like for the “(map async/<!! results)” to be parking and not blocking, but I don’t see how to do this. Any advice would be appreciated.

2019-12-13T14:35:31.019500Z

can you just use a go loop to loop-recur through the urls instead?

2019-12-13T14:36:05.019800Z

oh i'm sorry I was looking at the wrong line

fmjrey 2019-12-13T14:36:33.020Z

actually I'm having a hard time getting a deterministic result with the following code, sometimes I get cc=2 and other times cc=1

(defn thread-name []
  (.getName (Thread/currentThread)))

(defn p [&amp; args]
  (locking *out*
    (apply println (thread-name) ":" args)))

(defn log [msg v]
  (p msg v)
  v)

(let [cc (async/to-chan [-2 -1 0 1 2 3])
      cc2 (async/chan 1000 (take-while (fn [x] (log "transducing" x) (not= 0 x))))
      _ (async/pipe cc cc2)]
  (p "cc2=" (async/&lt;!! cc2))
  (p "cc=" (async/&lt;!! cc))) 

alexmiller 2019-12-13T14:36:59.020500Z

parking ops only exist in go blocks

Steven Katz 2019-12-13T14:38:05.022600Z

yes, but putting the “(map async/<!! results)” in a block as in (go (map async/<! results)) does not work

alexmiller 2019-12-13T14:38:53.023100Z

you probably want something more like run! than map

alexmiller 2019-12-13T14:39:33.023500Z

or use alts!

2019-12-13T14:40:15.024600Z

With a parking op/go you're either gonna have to do something with each result as you get it or return a chan (which you could still do a blocking take from if you want)

Steven Katz 2019-12-13T14:42:43.025900Z

I’ve tried using doseq, but cant figure out how to get the result back as the return of the “go” that the pedestal route is waiting on. The whole thing is started with:

(def my-interceptor
  {:name ::my-interceptor
   :enter (fn [context]
            (async/go
              (assoc context :response (ring-resp/response (fetch/fetch-bids)))))})

;; Tabular routes
(def routes #{["/" :get (conj common-interceptors my-interceptor)]
              ["/about" :get (conj common-interceptors `about-page)]})

2019-12-13T14:45:20.026300Z

(&lt;!!
 (go-loop [results (set results)
           x []]
   (if-let [[v ch] (and (not-empty results)
                        (alts! results))]
     (recur (disj results ch) (conj x v))
     results)))

2019-12-13T14:45:49.026800Z

something like that, or you can do something stateful with each result in the loop

Steven Katz 2019-12-13T14:48:41.028400Z

not sure I understand the “<!!” at the beginning. Isn’t that blocking? Also, I dont see what channel it is taking from?

2019-12-13T14:49:18.029Z

go-loop returns a chan that contains the result of the go loop

2019-12-13T14:49:48.030Z

there's no way to do a parking op without returning a chan so you have to either do something with the values or block somewhere

Steven Katz 2019-12-13T14:50:17.030500Z

so there is still a thread blocking waiting on the go-loop…doesn’t that result in 1 thread per connection/request?

2019-12-13T14:51:48.031500Z

there is due to the &lt;!! but you could omit it and do something like this instead:

(go-loop [results (set results)
          x []]
  (when-let [[v ch] (and (not-empty results)
                         (alts! results))]
    (save-result-to-database! v)
    (recur (disj results ch) (conj x v))))

2019-12-13T14:52:59.032500Z

although i guess that's not ideal either bc you're not really supposed to do blocking ops like database calls in the go-loop

Steven Katz 2019-12-13T14:54:56.034400Z

I feel like there should be a way to have a pool of threads handling inbound requests and parking, what pedestal does and a pool of thread handling non-blocking outbound calls, what http-kit does and connect the two with a thread pool in the middles that is also non-blocking.

roklenarcic 2019-12-13T15:08:07.035300Z

you do not understand

roklenarcic 2019-12-13T15:08:23.035800Z

unless the outbound driver supports parking, you are always going to use threads

roklenarcic 2019-12-13T15:09:00.036600Z

just because you ditch threads and are fully “async” doesn’t mean some library (like http-kit client) down the stack doesn’t use a threadpool

roklenarcic 2019-12-13T15:09:26.037300Z

you just move the bottleneck from your code into the library that’s doing the calls

roklenarcic 2019-12-13T15:10:34.038400Z

the only real way to get massive concurrency in terms of number of connections without bloating thread counts is to use IO that uses epoll, like java.nio

roklenarcic 2019-12-13T15:11:00.039Z

but the problem is that your libraries have to use it too, otherwise it doesn’t matter

roklenarcic 2019-12-13T15:12:57.040900Z

I’ve seen dozens of people boast about making their code fully “async”, but then they use a standard JDBC driver which just eats a thread until query is completed, and then they have to make connection pool larger or the queues in front of connection pool get impossibly long

roklenarcic 2019-12-13T15:15:29.043200Z

The reality of situation is that “async” is just snake oil 95% of the time. Most of the time even a fairly medium sized thread pool and synchronous processing saturates your CPU, GC or IO bandwidth, so making it async won’t magically make it faster, in fact it will decrease throughput

roklenarcic 2019-12-13T15:15:43.043700Z

and code is simpler in synchronous

Steven Katz 2019-12-13T15:16:20.044100Z

http-kit uses a threadpool, it works via callbacks and NIO.

roklenarcic 2019-12-13T15:16:52.044800Z

http-kit uses http-client, which uses apache http client

Steven Katz 2019-12-13T15:18:30.046600Z

regardless, it does not matter…I would like to connect its callbacks (either actual or apparent async) to pedestal

roklenarcic 2019-12-13T15:20:02.048100Z

pedestal has a callback right?

roklenarcic 2019-12-13T15:20:28.048600Z

that is, it has a function you call wth the result

Steven Katz 2019-12-13T15:20:56.049100Z

pedestal expects a channel which will contain a “context”

roklenarcic 2019-12-13T15:23:13.049800Z

you can reduce a sequence of channels in a go block into a result

roklenarcic 2019-12-13T15:24:49.050900Z

each http-kit success callback pushes a value into result channel

roklenarcic 2019-12-13T15:25:26.051600Z

you can create aggregate channel with async/into

Steven Katz 2019-12-13T15:29:52.053600Z

so each callback still has its own channel, and I would combine them into one with async/into?

roklenarcic 2019-12-13T15:30:08.054100Z

there’s a lot of ways to do it

Steven Katz 2019-12-13T15:30:17.054500Z

http-kit is based on https://github.com/cch1/http.async.client (at least according to its project.clj)

roklenarcic 2019-12-13T15:30:21.054800Z

if your http-kit code returns channel

roklenarcic 2019-12-13T15:30:37.055100Z

then use async/merge

roklenarcic 2019-12-13T15:30:51.055600Z

to merge all channels into a new channel that has all the results

Steven Katz 2019-12-13T15:31:15.056200Z

does async/merge bock the underlying thread?

roklenarcic 2019-12-13T15:31:34.056400Z

not it uses go-loop

roklenarcic 2019-12-13T15:31:42.056600Z

it returns a channel

roklenarcic 2019-12-13T15:31:53.057Z

which then has a single collection value

Steven Katz 2019-12-13T15:31:59.057300Z

i’ll give it a try, thanks!

roklenarcic 2019-12-13T15:32:13.057600Z

and if you need to modify that

roklenarcic 2019-12-13T15:32:49.058400Z

you can do (async/go {:body (async/&lt;! merged-chan) :status 200})

roklenarcic 2019-12-13T15:33:02.058700Z

which returns another chan that you can give to pedestal

roklenarcic 2019-12-13T15:33:07.059Z

this is all nonblocking

Steven Katz 2019-12-13T15:42:03.059200Z

thanks!

roklenarcic 2019-12-13T15:42:52.060Z

go and go-loop blocks return channel that has 1 value and that is the return/last value of the block

2019-12-13T16:57:36.060400Z

👋

👋 1
kwrooijen 2019-12-13T20:36:43.062900Z

Hey, if you create a (chan) and don’t close it, but it’s no longer referenced anywhere; will that cause problems in the runtime or is it automatically garbage collected?

(let [my-chan (chan)]
  (close! chan)
  nil)

;; vs

(let [my-chan (chan)]
  nil)

2019-12-13T20:38:25.063300Z

It will be gc'ed

kwrooijen 2019-12-13T20:39:02.063800Z

Nice, so if I don’t save it to an atom, or global definition, I don’t have to manually close it?

2019-12-13T20:39:39.064100Z

You don't have to

kwrooijen 2019-12-13T20:40:52.064400Z

All right, thank you 🙂