I have a system where I’m pulling data from an API with 5 concurrent connections. Each connection produces results to a channel using (async/onto-chan! out-channel data false)
and then calls async/close!
once data
(the sequence of entries returned by the API is empty). These channels are all merged using async/merge
. I then create a lazy-seq
of results using this snippet:
(defn seq!!
"Returns a (blocking!) lazy sequence read from a channel."
[c]
(lazy-seq
(when-let [v (<!! c)]
(cons v (seq!! c)))))
Running count
on the returned lazy-seq
should return about 170 records, but gives me a result of 10
(though I’ve seen 30 come back before too). Page size for results from the API is 20. I’m not explicitly setting any buffer values. Is async defaulting to a buffer size of 10 somewhere? Any ideas on why this isn’t working properly?seq!! seems prone to misuse
I resolved the issue. The problem was that I was creating my channels for my API workers using (async/chan)
and wasn’t specifying buffer size. I’m guessing that caused most of the results to just get discarded? Is that accurate?
A bufferless channel (async/chan) is a “rendezvous” channel, where the put into it doesn’t succeed until there is a take on the other side
@stephenmhopper async/onto-chan works asynchronously - you have to block on the returned channel, otherwise the c
channel gets closed while onto-chan is still working
Oh, so should I do <!
on the result of onto-chan
before trying to process the next iteration?
yes
That makes sense. Thank you!