Hello! I am trying to make a fn that reads repeatedly a "page" of 20 posts from a paginated API as long as there are any more pages OR until the destination channel is closed (ie. the reader lost interest). I struggle with a good way to do the second part as the only way to see whether a channel is closed is writing to it. What I have now is this:
(defn async-fetch-all-posts [dst] ; dst is a channel
(a/thread
(loop [href "/api/posts?offset=0"]
(try
(let [{:keys [posts next-href]} (parse-body (fetch-posts-raw href))
[post more-posts] posts
dst-open? (a/>!! dst post)]
(a/onto-chan! dst more-posts false)
(if (and dst-open? next-href)
(recur next-href)
:success))
(catch Exception e e)
(finally (a/close! dst))))))
Ideally I would do only (a/onto-chan! dst posts false)
but it doesn't tell me whether the dst was opened or not. So I either have to do it that ☝️ way (or is there a nicer one?) or am thinking about making my own variant of onto-chan
that would f.ex. return a channel with a single value true/false if it did/didn't copy the items. What do you think? There must be a nicer way?use the backpressure from the onto-chan!
. if the consumer keeps pulling from the destination then its this loops responsibility to keep filling it up. if you're never able to put all of the posts onto that channel then you don't need to keep working
also feels like a smell to me that there are two blocks who can independently close the channel.
Using onto-chan like that is kind of a waste, you could just put a cat transducer on dst
I think I'd just use something like this instead of onto-chan:
(reduce #(or (a/>!! ch %2) (reduced :ch-is-closed)) nil posts)
Thank you, that is an important point that I missed!
I think it is a valid use case, that I want the channel closed if there is no more input but also if the (single) consumer is not interested in any more content? I could solve the latter by passing around another "stop signal", maybe it is a misuse to use the channel for both..
the structure of this process is an iteration
like if you where construction a lazy seq with a pure function it would be something like (iterate f init)
That is correct.
where init is some value containing all the posts, and f iterates slicing off posts
I guess this leads to a deep insight on how to restructure and improve the code?
so I would write a channel version of iterate
yeah, the idea is you can decouple the looping logic from the rest of it
that sounds good ❤️ Let me think about it... Though I still need to solve the problem that I have batches of posts while I want individual posts.
I was discussing this with someone else and ended up
(defn async-iteration [from-process
to-process
output-channel
& {:keys [values-selector some? init]
:or {values-selector vector
some? some?}}]
(async/>! to-process init)
(loop [value (async/<! from-process)]
(if (some? value)
(do
(doseq [item (values-selector value)]
(async/>! output-channel item))
(async/>! to-process value))
(async/close! output-channel))))
but I don't recall if that was the final version or if there was something I didn't like aboutanother thing to think about is, iteration is a way to turn a function into a process, in this case that would mean taking a function and making into a go async thing that consumes from a channel and outputs to a channel, but what if you already have a process, and want to iterate it
I do not understand how the function is supposed to work 😢 / how to fit it on my problem. So it consumes from the from-process
channel, which in my case produces batches of posts, than splits those into individual items and puts onto the output channel. What is the purpose of the to-process
?
that is where your function breaks with iteration
your function entirely makes the decision to continue iterating or not based on if it publishes results to the channel, not based on what it previously generated
an iteration is a feedback loop, where the previous output of the process is fed back into the next step of the process
so I assume the consumer of dst is reading from dst and when it gets some output from it that signals there are no more posts to fetch, it closes dst, which signals your function to stop fetching
ah, I misread your function, it doesn't break with iteration
the to-process in my async-interation is equivalent to the recur with next-href in your function