go-pool for maximizing concurrency
I want to talk a little bit about concurrency in Go, and a couple different approaches to it.
We’re going to do some pointless busy work in a few different ways, and compare.
Our program will iterate 100 times, and for each iteration it will do one thing that takes 100ms and two things that each take 500ms. Let’s say these are not CPU-bound tasks.
sync.WaitGroup
A WaitGroup
waits for a collection of goroutines to finish. On your main goroutine, you Add
to the WaitGroup
and then call Wait
to wait until all the goroutines have finished. Within each goroutine, you call Done
to declare that the goroutine has finished its work; once the number of Done
calls adds up to the sum of the Add
calls, Wait
will complete.
In this program, we set up a separate WaitGroup
for each iteration of the loop, with three goroutines (one for each piece of work we're doing). That limits our concurrency, but also limits the number of goroutines; if we put a single WaitGroup
outside the loop, we would have no way to limit the number of goroutines that are currently operating. Maybe that'd be okay for your use case, and maybe it would not.
func wgTest() {
startTime := time.Now() for i := 0; i < NUM_ITERATIONS; i++ {
var wg sync.WaitGroup
wg.Add(3)
go func(wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(FAST_DURATION)
}(&wg)
go func(wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(SLOW_DURATION)
}(&wg)
go func(wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(SLOW_DURATION)
}(&wg)
wg.Wait()
} log.Println("sync.WaitGroup", time.Since(startTime))
}
Simple enough, and we’re working concurrently! But there’s a lot of boilerplate around using the WaitGroup
and we're repeating ourselves a lot ... and that's before we get into the fact that the actual "work" we're doing here is as simple as it can possibly be. Real work will quickly get harder to read.
And the fact that we can’t increase our concurrency puts a fairly strict limit on how performant this can be.
channel + sync.WaitGroup
One way to control that is to use a channel to orchestrate it.
In this program, we create a buffered channel (up to 10 things can be in the channel waiting to be consumed before the main goroutine blocks). We then open the number of goroutines we’re going to use to do work (in this case, 10 goroutines).
Each goroutine consumes from the channel as long as the channel is open; once the channel closes the goroutine ends and calls Done
on the WaitGroup
.
Now each iteration of the loop simply sends to the channel. Some of these may block operation if the channel’s buffer is full, but we generally won’t have to worry about that unless the work being performed stalls entirely.
At the end, we close
the channel, which tells the goroutines to stop consuming the channel once it's empty. Note that we also need to Wait
; if we didn't, there might be goroutines in flight that don't finish before our process finishes.
func chanTest() {
startTime := time.Now() ch := make(chan time.Duration, 10)
var wg sync.WaitGroup for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for d := range ch {
time.Sleep(d)
}
}()
} for i := 0; i < NUM_ITERATIONS; i++ {
ch <- FAST_DURATION
ch <- SLOW_DURATION
ch <- SLOW_DURATION
} close(ch)
wg.Wait() log.Println("channel", time.Since(startTime))
}
We’ve improved our concurrency; and we can change how many goroutines we’re running in parallel, which is nice. And there’s also less boilerplate and repetition.
But we have to be concerned with the mechanics of both the channel and the WaitGroup
. And perhaps worst of all, our logic is more annoying to test; it doesn't matter here, but when real work is happening, and we're deciding what to do based on what was sent to the channel in an if
statement or something, we're going to regret it.
go-pool
Enter go-pool. It takes the channel + WaitGroup
pattern and hides the internal mechanics, so you can focus on your code.
This program starts a pool with a buffer size of 10 and 10 goroutines; that’s the same as before, but with the benefit of putting those numbers in the same place. You’ll typically want to tweak them at the same time.
You define what work you’re going to do by creating “work units”. There’s a pool.WorkUnit
interface, which you implement by defining a Perform
function where you do your work. Now your logic is separated such that it will remain easier to read and test.
In each iteration of the loop, we create our work units and add them to the pool. (Note that a pool can execute different kinds of work units, as long as they implement pool.WorkUnit
.) Calls to Add
can block just like manually adding to the channel did, if the internal channel is full.
Once we’re done adding work units, we Close
the pool, which closes the channel and waits for all the work to finish; but we don't have to care about the internal mechanics of both the channel and the WaitGroup
like we did before. All we need to know is that Close
will block until all the work is done.
type FastWork struct{}
type SlowWork struct{}func (u FastWork) Perform() {
time.Sleep(FAST_DURATION)
}func (u SlowWork) Perform() {
time.Sleep(SLOW_DURATION)
}func poolTest() {
startTime := time.Now() p := pool.NewPool(10, 10)
p.Start()
for i := 0; i < NUM_ITERATIONS; i++ {
p.Add(FastWork{})
p.Add(SlowWork{})
p.Add(SlowWork{})
}
p.Close() log.Println("go-pool", time.Since(startTime))
}
We have the same concurrency options as the previous solution, but they are more easily read and changed. Our separation of “defining what work needs to be done” and “executing the work” is at least as good, maybe better. Our code doesn’t have to manually touch channels or goroutines or WaitGroup
at all. And we get to keep our logic nicely isolated in each implementation of pool.WorkUnit
.
Benchmark
The results are unsurprising.
sync.WaitGroup 50.097073561s
channel 11.217436307s
go-pool 11.23290081s
The original solution, with its more strictly limited concurrency and waiting for each iteration of the loop, takes longer to complete. The go-pool
solution is the same as the channel solution, plus about 0.1% overhead of allocating the WorkUnit
structs.