core-async

Sal Kadam 2019-07-24T02:25:46.000600Z

Caused by: java.lang.ClassNotFoundException: clojure.core.async.Mutex issue showing up with :dependencies [[org.clojure/clojure "1.9.0"] [org.clojure/core.async "0.4.500"] [me.raynes/conch "0.8.0"] [com.draines/postal "2.0.3"] [propertea "1.3.1"] [instaparse "1.4.10"] [com.taoensso/timbre "4.10.0"]] :source-paths ["src" "src/main/clojure"])

petr.mensik 2019-07-24T20:38:01.003200Z

Hello guys, I am experimenting with pipelines in core.async and I ran into the following problem, the call to (<!! (async/into [] output-channel)) (last line) just blocks indefinitely and never returns. The idea is to put passwords from filel to a channel and then pass it to a http-get function which performs an API call and puts result into the channel. Than the ouput has a filter transducer to filter outputs

(defn- create-passwords-channel
  "Creates a channel based on the a lazy sequence from a file"
  []
  (let [c (chan 50)]
    (go
      (with-open [lines (-> (io/reader "passwords.txt")
                            line-seq)]
        (doseq [ln lines]
          (>! c ln))))
    c))

(defn http-get
  "Tries to call a login API with a password and saves result into the channel "
  [password channel]
  (http/get "<http://localhost:5000/auth/login>"
            {:body {:timeout 200
                    :username "<mailto:admin@temify.com|admin@temify.com>"
                    :password password}}
            (fn [{:keys [status]}] // callback
              (put! channel {:status status
                             :password password})
              (close! channel)))
  channel)

(defn dictionary-attack
  []
  (let [input-channel (create-passwords-channel)
        output-channel (chan 10 (filter #(= (:status %) 200)))]
    (pipeline-async 10 output-channel http-get input-channel)
    (&lt;!! (async/into [] output-channel))))
So are there any obvious problems with this code? Thanks a lot for help

alexmiller 2019-07-24T21:01:17.003800Z

you should never block a go thread

alexmiller 2019-07-24T21:01:27.004200Z

which means, don't do i/o in it

2019-07-24T21:01:28.004300Z

using pipeline-async correctly is subtle

2019-07-24T21:01:56.004900Z

in this case you are not actually running the http/get in an async way

2019-07-24T21:02:03.005100Z

so you will break things

2019-07-24T21:02:31.005600Z

pipeline-blocking is somewhat harder to get wrong

2019-07-24T21:03:33.006500Z

pipeline-blocking is like, "I have this blocking process and I would like to run it as a pipeline", pipeline-async is "I have this async process and I would like to run it as a pipeline"

2019-07-24T21:04:15.007200Z

they don't inherently change the nature of your process, using pipeline-async doesn't make your code asynchronous

petr.mensik 2019-07-24T21:07:55.007800Z

Thanks for response. Actually I thought I can se http-kit client in an async manner, like they are describing in the docs. https://www.http-kit.org/client.html#async

petr.mensik 2019-07-24T21:08:12.008300Z

So I have to use pipeline-blocking, right?

2019-07-24T21:10:13.009300Z

that might be ok, then as long as they do all io stuff on their own threadpool

markmarkmark 2019-07-24T21:13:37.010600Z

async/into doesn't do anything until the input channel is closed

2019-07-24T21:14:59.011500Z

and I guess the io alex was referring to was reading the password file, not the http requests

petr.mensik 2019-07-24T21:15:38.012200Z

So actually is there a better way how to put content of a file to a channel? 😕

ghadi 2019-07-24T21:16:10.012700Z

sure, use thread

markmarkmark 2019-07-24T21:16:13.012800Z

use thread instead of go

ghadi 2019-07-24T21:17:07.013800Z

but create-passwords-channel missing async/close! as @markmarkmark suggested

petr.mensik 2019-07-24T21:17:47.014300Z

But that should come after calling pipeline-async, right? In order to have the channel opened for processing

ghadi 2019-07-24T21:18:17.014900Z

close the producer after submitting all the items to channel

ghadi 2019-07-24T21:19:23.016300Z

(defn- create-passwords-channel
  "Creates a channel based on the a lazy sequence from a file"
  []
  (let [c (chan 50)]
    (thread      (with-open [lines (-&gt; (io/reader "passwords.txt")
                            line-seq)]
        (doseq [ln lines]
          (&gt;!! c ln))
         (close! c)))
    c))

ghadi 2019-07-24T21:19:58.016900Z

sorry I messed up the formatting, but you get the gist

ghadi 2019-07-24T21:20:17.017400Z

there may still be pending vals on a channel after it closes

petr.mensik 2019-07-24T21:22:18.018100Z

ah, now it works perfectly, even with the pipeline-async. Thanks a lot, great!