I came across a post on the Go Blog describing Go patterns for task pipelining and cancellation. I haven't got a lot of experience with
core.async, so I thought I'd have a go at translating the Go patterns to Clojure.
I won't rewrite all the prose from the original post. Instead I'll just present my Clojure versions of the various functions. Please have a look at the original for more discussion on the patterns etc. I'll use the same subheadings to make it easier to skip back and forth between the two posts.
If you want to play along, create a new project with
lein new go-concurrency-primitives and modify the
core.clj namespace declaration to this:
(ns go-concurrency-primitives.core (:refer-clojure :exclude [merge]) (:require [clojure.core.async :refer [<! <!! alts! chan close! go go-loop merge]]))
The first function presented in the post is
gen, which puts the given arguments into a channel that is then returned to the caller:
(defn gen [& nums] (let [c (chan)] (go (doseq [n nums] (>! c n)) (close! c)) c))
As with the Go function, our function accepts a variable number of arguments and starts a go-block that puts all those arguments to a
core.async channel one at a time. The channel is then closed in order to signal to the consumer that all items have been processed. Finally the channel is returned to the caller.
This seems like a common enough use case, and sure enough
core.async provides a helper function,
to-chan, that creates a channel with the contents of a provided collection, and closes that channel when it's exhausted. So we can simplify
(defn gen [& nums] (to-chan nums))
The next Go function is
sq, which takes an integer from a channel, squares it, and puts it into a new channel. This is done in a loop until the input channel is exhausted:
(defn sq [cin] (let [cout (chan)] (go-loop [n (<! cin)] (if n (do (>! cout (* n n)) (recur (<! cin))) (close! cout))) cout))
We use a
go-loop to loop within the go-block until we've exhausted the input channel (
nil if the channel is closed).
Again, it turns out that taking an item from a channel, applying a function to it, and putting the resulting value to another channel is quite common, and there's a helper function in
core.async for it:
map<. With the helper function our implementation of
sq becomes almost trivial:
(defn sq [cin] (map< #(* % %) cin))
The final function that ties out pipeline together is
(defn main  (let [c (-> (gen-1 2 3) sq-1 sq-1)] (loop [n (<!! c)] (when n (println n) (recur (<!! c))))))
We create a source channel with
3 in it, compose it with two squaring functions, and finally print the resulting items to stdout. We use the
<!! take operation in this case to run
main in the current thread.
The expected output is:
go-concurrency-primitives.core> (main) 16 81 nil
You'll notice that
main looks very similar to our first version of
sq with the exception that we're applying
println to the items for side-effects rather than putting the return value to an output channel. I suspect that printing items to stdout is very common in example code like this, but not very common at all in actual
core.async code. Hence there isn't a helper function in the library that would allow us to simplify our
The next pattern in the Go post was to fan-out items to multiple go-blocks for parallel execution and then collecting the results back together again.
The Go post introduced a
merge function which is able to take items from multiple input channels and put them all to a single output channel. My
merge looks like this:
(defn merge* [& cs] (let [out (chan)] (go-loop [[n _] (alts! cs)] (if n (do (>! out n) (recur (alts! cs))) (close! out))) out))
We use the
alts! function to pick a ready input channel from
cs and take an item from it. If more than one channel are ready, the operation is non-deterministic.
Turns out that my
merge* is broken: it exits the
go-loop as soon as one of the input channels has been closed and
alts! returns that channel.
Fortunately there's a
merge function in
core.async that does what we tried to do above and does it correctly! The difference is that
core.async/merge filters out any closed channels on each
go-loop iteration, thereby making sure that it'll consume all items from all channels before exiting.
main for the parallel section looks like this:
(defn main  (let [in (gen 2 3) c1 (sq in) c2 (sq in) out (merge [c1 c2])] (loop [n (<!! out)] (when n (println n) (recur (<!! out))))))
The output looks like this:
go-concurrency-primitives.core> (main) 4 9 nil
Comparing the Clojure code with the Go code, you'll notice that we've managed to avoid the
WaitGroup shenanigans that the Go code had to use to work out when
merge can close its output channel. In
core.async a take from a closed channel returns
nil. This allows our producers to signal to their consumers that they're finished. I don't know enough Go to work out why
merge has to use a
WaitGroup. If you know why, please educate the rest of us in the comments!
The last pattern section in the post is about cancellation. We'll only discuss the correct approach to cancellation in this post. Please read the original Go blog post for the discussion about various sub-optimal cancellation strategies.
For signalling cancellation we'll pass a channel to all go-blocks that should be cancellable. This channel is then closed when cancellation is requested. The go-blocks are responsible for checking if the cancellation channel has been closed or not and acting appropriately.
We need to modify all our functions to support cancellation:
main. Let's start with
(defn gen [done & nums] (let [out (chan)] (go-loop [[n & nums] nums] (if n (let [[_ c] (alts! [done [out n]])] (when (not= c done) (recur nums))) (close! out))) out))
While this function is slightly more involved, it's still the same basic pattern that we used in my original versions of
sq we spin in a loop until some exit condition and put items to an output channel. In this case there are two exit conditions: i) we run out of items in
nums, ie we've put all our input items into the output channel; or ii) the caller has signaled cancellation by closing the
You'll notice that we're using the
alts! function to check if
done has been closed or to put
n into the
alts! takes a vector of channels and channel and item pairs (also vectors) as an argument. A channel is taken from, whereas a pair results in the item being put into the channel in the pair.
alts! will perform either a take or a put depending on which channel is ready first. If an input channel is closed,
alts! will return
nil and the channel that was closed. This last feature allows us to check for cancellation. Please read through the
alts! documentation to understand all the ways in which
alts! can be called.
sq function follows a similar pattern in order to support cancellation:
(defn sq [done cin] (let [cout (chan)] (go-loop [[n c] (alts! [cin done])] (if n (do (>! cout (* n n)) (recur (alts! [cin done]))) (close! cout))) cout))
done has been closed. In both cases we want to close
cout and exit the go-block. If
n is not
nil, we want to square it and put it to
The final function is
main, which has been modified to set up a
done channel and to pass it on to all the other functions:
(defn main  (let [done (chan) in (apply gen done (range 100)) c1 (sq done in) c2 (sq done in) out (merge done c1 c2)] (doseq [i (range 10)] (println (<!! out))) (close! done)))
Note: We set up
gen to generate 100 integers, but print out only ten of them before canceling the rest of the computation by closing the
done channel. This is just to highlight that the canceling works.
When you execute
main a few times, you'll notice that it's non-deterministics: the order of the results changes. This is because the two
sq go-blocks are executing in parallel.
The latter part of the original post describes how to use the earlier patterns to do something useful. The example app calculates the MD5 sums for all the files in a directory tree.
I did implement the unbounded parallel version of the MD5 calculation, but this post is already getting quite long, so I'll just present the bounded version here.
(ns go-concurrency-primitives.md5 (:require [clojure.core.async :refer [<!! alts! chan close! go-loop merge]] [clojure.java.io :refer [file]] [digest :refer [md5]])) (defn sum-files "In a go-block, take a File object from the fs channel, calculate the MD5 for it, and put the result into an output channel, which is returned to the caller" [done fs] (let [out (chan)] (go-loop [[f c] (alts! [done fs])] (when-not (= c done) (>! out [(.getName f) (md5 f)]) (recur (alts! [done fs]))) (close! out)) out)) (defn walk-files "Walk down a directory structure from root and put all files into a channel that is returned to the caller." [done root] (let [fs (filter #(not (.isDirectory %)) (file-seq (file root))) out (chan)] (go-loop [[f & fs] fs] (let [[_ c] (alts! [[out f] done])] (when-not (= c done) (recur fs)) (close! out))) out)) (defn main "Print out the MD5 sums for the first 10 files in the root directory and its subdirectories." [root] (let [done (chan) fs (walk-files done root) sums (map (fn [_] (sum-files done fs)) (range 10)) out (merge sums)] (doseq [i (range 10)] (println (as/<!! out))) (close! done)))
Note: you need to add
[digest "1.4.4"] to your
:dependencies in your
project.clj file to get access to the
It was really easy to translate the Go examples to
core.async. I managed to knock out all the examples in an evening. And it was fun!
I did feel a bit uncertain about what was happening when I ran the examples in a a repl. Was I leaving go-blocks running? Was I leaking memory? Who knows!
I also found myself using take (
<!) a number of times when I meant to use put (
>!). I'll put this down to inexperience on my part, but the effect of this mistake was that my program just sat there doing nothing. In Go (or Haskell) I would've gotten a compile time error, which would've saved me from some head scratching.
The helpers in
to-chan, are awesome. I wish we had similar helpers for cancellable operations and for side-effecty loops.
All in all my
core.async experience was really good. Highly recommended ;-)
For our regular readers...
Everything except the MD5 calculation should work on node with CLJS. You might have to fiddle with the
requires to pull in macros correctly, but nothing in the code is JVM specific.