Go concurrency patterns in core.async: pipelines and cancellation

I came across a post on the Go Blog describing Go patterns for task pipelining and cancellation. I haven't got a lot of experience with core.async, so I thought I'd have a go at translating the Go patterns to Clojure.

I won't rewrite all the prose from the original post. Instead I'll just present my Clojure versions of the various functions. Please have a look at the original for more discussion on the patterns etc. I'll use the same subheadings to make it easier to skip back and forth between the two posts.

Preliminaries

If you want to play along, create a new project with lein new go-concurrency-primitives and modify the core.clj namespace declaration to this:

(ns go-concurrency-primitives.core
  (:refer-clojure :exclude [merge])
  (:require [clojure.core.async :refer [<! <!! alts! chan close! go go-loop merge]]))

Squaring numbers

The first function presented in the post is gen, which puts the given arguments into a channel that is then returned to the caller:

(defn gen
  [& nums]
  (let [c (chan)]
    (go
      (doseq [n nums]
        (>! c n))
      (close! c))
    c))

As with the Go function, our function accepts a variable number of arguments and starts a go-block that puts all those arguments to a core.async channel one at a time. The channel is then closed in order to signal to the consumer that all items have been processed. Finally the channel is returned to the caller.

This seems like a common enough use case, and sure enough core.async provides a helper function, to-chan, that creates a channel with the contents of a provided collection, and closes that channel when it's exhausted. So we can simplify gen to:

(defn gen
  [& nums]
  (to-chan nums))

The next Go function is sq, which takes an integer from a channel, squares it, and puts it into a new channel. This is done in a loop until the input channel is exhausted:

(defn sq
  [cin]
  (let [cout (chan)]
    (go-loop [n (<! cin)]
      (if n
        (do
          (>! cout (* n n))
          (recur (<! cin)))
        (close! cout)))
    cout))

We use a go-loop to loop within the go-block until we've exhausted the input channel (<! returns nil if the channel is closed).

Again, it turns out that taking an item from a channel, applying a function to it, and putting the resulting value to another channel is quite common, and there's a helper function in core.async for it: map<. With the helper function our implementation of sq becomes almost trivial:

(defn sq
  [cin]
  (map< #(* % %) cin))

The final function that ties out pipeline together is main:

(defn main
  []
  (let [c (-> (gen-1 2 3) sq-1 sq-1)]
    (loop [n (<!! c)]
      (when n
        (println n)
        (recur (<!! c))))))

We create a source channel with 2 and 3 in it, compose it with two squaring functions, and finally print the resulting items to stdout. We use the <!! take operation in this case to run main in the current thread.

The expected output is:

go-concurrency-primitives.core> (main)
16
81
nil

You'll notice that main looks very similar to our first version of sq with the exception that we're applying println to the items for side-effects rather than putting the return value to an output channel. I suspect that printing items to stdout is very common in example code like this, but not very common at all in actual core.async code. Hence there isn't a helper function in the library that would allow us to simplify our main.

Fan-out, fan-in

The next pattern in the Go post was to fan-out items to multiple go-blocks for parallel execution and then collecting the results back together again.

The Go post introduced a merge function which is able to take items from multiple input channels and put them all to a single output channel. My merge looks like this:

(defn merge*
  [& cs]
  (let [out (chan)]
    (go-loop [[n _] (alts! cs)]
      (if n
        (do
          (>! out n)
          (recur (alts! cs)))
        (close! out)))
    out))

We use the alts! function to pick a ready input channel from cs and take an item from it. If more than one channel are ready, the operation is non-deterministic.

Turns out that my merge* is broken: it exits the go-loop as soon as one of the input channels has been closed and alts! returns that channel.

Fortunately there's a merge function in core.async that does what we tried to do above and does it correctly! The difference is that core.async/merge filters out any closed channels on each go-loop iteration, thereby making sure that it'll consume all items from all channels before exiting.

Finally, the main for the parallel section looks like this:

(defn main
  []
  (let [in (gen 2 3)
        c1 (sq in)
        c2 (sq in)
        out (merge [c1 c2])]
    (loop [n (<!! out)]
      (when n
        (println n)
        (recur (<!! out))))))

The output looks like this:

go-concurrency-primitives.core> (main)
4
9
nil

Comparing the Clojure code with the Go code, you'll notice that we've managed to avoid the WaitGroup shenanigans that the Go code had to use to work out when merge can close its output channel. In core.async a take from a closed channel returns nil. This allows our producers to signal to their consumers that they're finished. I don't know enough Go to work out why merge has to use a WaitGroup. If you know why, please educate the rest of us in the comments!

Explicit cancellation

The last pattern section in the post is about cancellation. We'll only discuss the correct approach to cancellation in this post. Please read the original Go blog post for the discussion about various sub-optimal cancellation strategies.

For signalling cancellation we'll pass a channel to all go-blocks that should be cancellable. This channel is then closed when cancellation is requested. The go-blocks are responsible for checking if the cancellation channel has been closed or not and acting appropriately.

We need to modify all our functions to support cancellation: gen, sq, and main. Let's start with gen:

(defn gen
  [done & nums]
  (let [out (chan)]
    (go-loop [[n & nums] nums]
      (if n
        (let [[_ c] (alts! [done [out n]])]
          (when (not= c done)
            (recur nums)))
      (close! out)))
    out))

While this function is slightly more involved, it's still the same basic pattern that we used in my original versions of gen and sq we spin in a loop until some exit condition and put items to an output channel. In this case there are two exit conditions: i) we run out of items in nums, ie we've put all our input items into the output channel; or ii) the caller has signaled cancellation by closing the done channel.

You'll notice that we're using the alts! function to check if done has been closed or to put n into the out channel. alts! takes a vector of channels and channel and item pairs (also vectors) as an argument. A channel is taken from, whereas a pair results in the item being put into the channel in the pair. alts! will perform either a take or a put depending on which channel is ready first. If an input channel is closed, alts! will return nil and the channel that was closed. This last feature allows us to check for cancellation. Please read through the alts! documentation to understand all the ways in which alts! can be called.

The sq function follows a similar pattern in order to support cancellation:

(defn sq
  [done cin]
  (let [cout (chan)]
    (go-loop [[n c] (alts! [cin done])]
      (if n
        (do
          (>! cout (* n n))
          (recur (alts! [cin done])))
        (close! cout)))
    cout))

When n is nil, either cin or done has been closed. In both cases we want to close cout and exit the go-block. If n is not nil, we want to square it and put it to cout.

The final function is main, which has been modified to set up a done channel and to pass it on to all the other functions:

(defn main
  []
  (let [done (chan)
        in (apply gen done (range 100))
        c1 (sq done in)
        c2 (sq done in)
        out (merge done c1 c2)]
    (doseq [i (range 10)]
      (println (<!! out)))
    (close! done)))

Note: We set up gen to generate 100 integers, but print out only ten of them before canceling the rest of the computation by closing the done channel. This is just to highlight that the canceling works.

When you execute main a few times, you'll notice that it's non-deterministics: the order of the results changes. This is because the two sq go-blocks are executing in parallel.

Bounded parallelism

The latter part of the original post describes how to use the earlier patterns to do something useful. The example app calculates the MD5 sums for all the files in a directory tree.

I did implement the unbounded parallel version of the MD5 calculation, but this post is already getting quite long, so I'll just present the bounded version here.

(ns go-concurrency-primitives.md5
  (:require [clojure.core.async :refer [<!! alts! chan close! go-loop merge]]
            [clojure.java.io :refer [file]]
            [digest :refer [md5]]))

(defn sum-files
  "In a go-block, take a File object from the fs channel,
      calculate the MD5 for it, and put the result into an
      output channel, which is returned to the caller"
  [done fs]
  (let [out (chan)]
    (go-loop [[f c] (alts! [done fs])]
             (when-not (= c done)
               (>! out [(.getName f) (md5 f)])
               (recur (alts! [done fs])))
             (close! out))
    out))

(defn walk-files
  "Walk down a directory structure from root and put all files into a channel
      that is returned to the caller."
  [done root]
  (let [fs (filter #(not (.isDirectory %)) (file-seq (file root)))
        out (chan)]
    (go-loop [[f & fs] fs]
             (let [[_ c] (alts! [[out f] done])]
               (when-not (= c done)
                 (recur fs))
               (close! out)))
    out))

(defn main
  "Print out the MD5 sums for the first 10 files in the root directory
  and its subdirectories."
  [root]
  (let [done (chan)
        fs (walk-files done root)
        sums (map (fn [_] (sum-files done fs)) (range 10))
        out (merge sums)]
    (doseq [i (range 10)]
      (println (as/<!! out)))
    (close! done)))

Note: you need to add [digest "1.4.4"] to your :dependencies in your project.clj file to get access to the md5 function.

Conclusions

It was really easy to translate the Go examples to core.async. I managed to knock out all the examples in an evening. And it was fun!

I did feel a bit uncertain about what was happening when I ran the examples in a a repl. Was I leaving go-blocks running? Was I leaking memory? Who knows!

I also found myself using take (<!) a number of times when I meant to use put (>!). I'll put this down to inexperience on my part, but the effect of this mistake was that my program just sat there doing nothing. In Go (or Haskell) I would've gotten a compile time error, which would've saved me from some head scratching.

The helpers in core.async, eg go-loop, merge and to-chan, are awesome. I wish we had similar helpers for cancellable operations and for side-effecty loops.

All in all my core.async experience was really good. Highly recommended ;-)

For our regular readers...

Everything except the MD5 calculation should work on node with CLJS. You might have to fiddle with the requires to pull in macros correctly, but nothing in the code is JVM specific.


Want to read more about ClojureScript on Node?
Sign up for our mailing list!

Unsubscribe at any time. No spam, ever.
comments powered by Disqus