core-async

stephenmhopper 2020-05-30T14:05:28.389300Z

I have a system where I’m pulling data from an API with 5 concurrent connections. Each connection produces results to a channel using (async/onto-chan! out-channel data false) and then calls async/close! once data (the sequence of entries returned by the API is empty). These channels are all merged using async/merge. I then create a lazy-seq of results using this snippet:

(defn seq!!
  "Returns a (blocking!) lazy sequence read from a channel."
  [c]
  (lazy-seq
   (when-let [v (<!! c)]
     (cons v (seq!! c)))))
Running count on the returned lazy-seq should return about 170 records, but gives me a result of 10 (though I’ve seen 30 come back before too). Page size for results from the API is 20. I’m not explicitly setting any buffer values. Is async defaulting to a buffer size of 10 somewhere? Any ideas on why this isn’t working properly?

ghadi 2020-06-01T13:28:54.394500Z

seq!! seems prone to misuse

stephenmhopper 2020-05-30T14:39:23.389400Z

I resolved the issue. The problem was that I was creating my channels for my API workers using (async/chan) and wasn’t specifying buffer size. I’m guessing that caused most of the results to just get discarded? Is that accurate?

ghadi 2020-05-30T15:38:25.391100Z

A bufferless channel (async/chan) is a “rendezvous” channel, where the put into it doesn’t succeed until there is a take on the other side

Jan K 2020-05-30T16:29:05.393700Z

@stephenmhopper async/onto-chan works asynchronously - you have to block on the returned channel, otherwise the c channel gets closed while onto-chan is still working

stephenmhopper 2020-05-30T16:32:22.393900Z

Oh, so should I do <! on the result of onto-chan before trying to process the next iteration?

Jan K 2020-05-30T16:32:32.394100Z

yes

stephenmhopper 2020-05-30T16:34:31.394300Z

That makes sense. Thank you!