core-async

Jakub Holý 2020-11-05T15:19:24.213200Z

Hello! I am trying to make a fn that reads repeatedly a "page" of 20 posts from a paginated API as long as there are any more pages OR until the destination channel is closed (ie. the reader lost interest). I struggle with a good way to do the second part as the only way to see whether a channel is closed is writing to it. What I have now is this:

(defn async-fetch-all-posts [dst] ; dst is a channel
  (a/thread
    (loop [href "/api/posts?offset=0"]
      (try
        (let [{:keys [posts next-href]} (parse-body (fetch-posts-raw href))
              [post more-posts]         posts
              dst-open?                 (a/>!! dst post)]
          (a/onto-chan! dst more-posts false)
          (if (and dst-open? next-href)
            (recur next-href)
            :success))
        (catch Exception e e)
        (finally (a/close! dst))))))
Ideally I would do only (a/onto-chan! dst posts false) but it doesn't tell me whether the dst was opened or not. So I either have to do it that ☝️ way (or is there a nicer one?) or am thinking about making my own variant of onto-chan that would f.ex. return a channel with a single value true/false if it did/didn't copy the items. What do you think? There must be a nicer way?

👀 1
dpsutton 2020-11-05T15:32:00.214200Z

use the backpressure from the onto-chan!. if the consumer keeps pulling from the destination then its this loops responsibility to keep filling it up. if you're never able to put all of the posts onto that channel then you don't need to keep working

dpsutton 2020-11-05T15:33:49.214600Z

also feels like a smell to me that there are two blocks who can independently close the channel.

2020-11-05T16:39:28.215700Z

Using onto-chan like that is kind of a waste, you could just put a cat transducer on dst

Jan K 2020-11-05T17:18:23.217400Z

I think I'd just use something like this instead of onto-chan: (reduce #(or (a/>!! ch %2) (reduced :ch-is-closed)) nil posts)

Jakub Holý 2020-11-05T18:10:09.217800Z

Thank you, that is an important point that I missed!

Jakub Holý 2020-11-05T18:12:06.218Z

I think it is a valid use case, that I want the channel closed if there is no more input but also if the (single) consumer is not interested in any more content? I could solve the latter by passing around another "stop signal", maybe it is a misuse to use the channel for both..

2020-11-05T18:22:46.219100Z

the structure of this process is an iteration

2020-11-05T18:23:52.219300Z

like if you where construction a lazy seq with a pure function it would be something like (iterate f init)

Jakub Holý 2020-11-05T18:24:26.219500Z

That is correct.

2020-11-05T18:24:39.219700Z

where init is some value containing all the posts, and f iterates slicing off posts

Jakub Holý 2020-11-05T18:25:35.219900Z

I guess this leads to a deep insight on how to restructure and improve the code?

2020-11-05T18:26:55.220100Z

so I would write a channel version of iterate

2020-11-05T18:28:09.220300Z

yeah, the idea is you can decouple the looping logic from the rest of it

Jakub Holý 2020-11-05T18:29:55.220500Z

that sounds good ❤️ Let me think about it... Though I still need to solve the problem that I have batches of posts while I want individual posts.

2020-11-05T18:32:08.220700Z

I was discussing this with someone else and ended up

(defn async-iteration [from-process
                       to-process
                       output-channel
                       & {:keys [values-selector some? init]
                          :or {values-selector vector
                               some? some?}}]
  (async/>! to-process init)
  (loop [value (async/<! from-process)]
    (if (some? value)
      (do
        (doseq [item (values-selector value)]
          (async/>! output-channel item))
        (async/>! to-process value))
      (async/close! output-channel))))
but I don't recall if that was the final version or if there was something I didn't like about

👀 1
2020-11-05T18:34:37.221Z

another thing to think about is, iteration is a way to turn a function into a process, in this case that would mean taking a function and making into a go async thing that consumes from a channel and outputs to a channel, but what if you already have a process, and want to iterate it

Jakub Holý 2020-11-05T19:13:15.221500Z

I do not understand how the function is supposed to work 😢 / how to fit it on my problem. So it consumes from the from-process channel, which in my case produces batches of posts, than splits those into individual items and puts onto the output channel. What is the purpose of the to-process ?

2020-11-05T19:21:41.221700Z

that is where your function breaks with iteration

👍 1
2020-11-05T19:22:52.222Z

your function entirely makes the decision to continue iterating or not based on if it publishes results to the channel, not based on what it previously generated

2020-11-05T19:23:40.222200Z

an iteration is a feedback loop, where the previous output of the process is fed back into the next step of the process

2020-11-05T19:25:41.222400Z

so I assume the consumer of dst is reading from dst and when it gets some output from it that signals there are no more posts to fetch, it closes dst, which signals your function to stop fetching

2020-11-05T19:26:54.222600Z

ah, I misread your function, it doesn't break with iteration

2020-11-05T19:27:18.222800Z

the to-process in my async-interation is equivalent to the recur with next-href in your function

👍 1