Jay's blog

Batching Messages in Async Clojure

I read this interesting blog post called Scalar Select Anti-Pattern by Alex Kladov about the advantages to batching the processing of messages when you're working with an event-driven asynchronous loop pattern. I recommend you go and read it.

It came at just the right time, too, because I was recently working with just such a pattern in Clojure using its async library. Here's the basic outline of what I had before I read the aforementioned blog post.

(require '[clojure.core.async :refer [<! chan go-loop]])

(defonce running? (atom true))
(defonce ch (chan))

(go-loop []
  (when @running?
    (when-let [message (<! ch)]
      ;; Process a message
      ;; When I want to quit:
      ;; (reset! running? false)
      (recur))))

My go-loop block is a state machine that waits for a val from the channel ch and then processes it. It does this loop until the running? atom is set to false.

In order to process messages in batches, I need a function that will drain a channel of vals and return a vector of them.

(require '[clojure.core.async :refer [poll!]])

(defn collect-available! [ch]
  (loop [acc []]
    (if-some [msg (poll! ch)]
      (recur (conj acc msg))
      acc)))

Okay, I'm half-way there. Now I want to encapsulate this idea to include awaiting the first value within a go-loop before draining the rest of the channel.

(defn collect-batch! [ch]
  (go
    (if-some [first-msg (<! ch)]
      (let [rest-msgs (collect-available! ch)]
        (conj rest-msgs first-msg))
      nil)))

Cool! Now I can process messages in batches with a little tweak to my go-loop.

(go-loop []
  (when @running?
    (if-some [messages (<! (collect-batch! ch))]
      (do
        (doseq [message messages]
          ;; Process a message
          )
        (recur))
      (reset! running? false))))

I've added some safety so that my go-loop exits if the channel ch is closed.

This is already an improvement since I only have to yield to the scheduler once per batch of messages rather than once per individual message. It also establishes the pattern, which will make it easier to utilize other optimization strategies like the ones Alex discusses in his blog post when I find I need them.