go-pool for maximizing concurrency

Sean Schulte
5 min readSep 1, 2017

--

I want to talk a little bit about concurrency in Go, and a couple different approaches to it.

We’re going to do some pointless busy work in a few different ways, and compare.

Our program will iterate 100 times, and for each iteration it will do one thing that takes 100ms and two things that each take 500ms. Let’s say these are not CPU-bound tasks.

sync.WaitGroup

A WaitGroup waits for a collection of goroutines to finish. On your main goroutine, you Add to the WaitGroup and then call Wait to wait until all the goroutines have finished. Within each goroutine, you call Done to declare that the goroutine has finished its work; once the number of Done calls adds up to the sum of the Add calls, Waitwill complete.

In this program, we set up a separate WaitGroup for each iteration of the loop, with three goroutines (one for each piece of work we're doing). That limits our concurrency, but also limits the number of goroutines; if we put a single WaitGroup outside the loop, we would have no way to limit the number of goroutines that are currently operating. Maybe that'd be okay for your use case, and maybe it would not.

func wgTest() {
startTime := time.Now()
for i := 0; i < NUM_ITERATIONS; i++ {
var wg sync.WaitGroup
wg.Add(3)
go func(wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(FAST_DURATION)
}(&wg)
go func(wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(SLOW_DURATION)
}(&wg)
go func(wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(SLOW_DURATION)
}(&wg)
wg.Wait()
}
log.Println("sync.WaitGroup", time.Since(startTime))
}

Simple enough, and we’re working concurrently! But there’s a lot of boilerplate around using the WaitGroup and we're repeating ourselves a lot ... and that's before we get into the fact that the actual "work" we're doing here is as simple as it can possibly be. Real work will quickly get harder to read.

And the fact that we can’t increase our concurrency puts a fairly strict limit on how performant this can be.

channel + sync.WaitGroup

One way to control that is to use a channel to orchestrate it.

In this program, we create a buffered channel (up to 10 things can be in the channel waiting to be consumed before the main goroutine blocks). We then open the number of goroutines we’re going to use to do work (in this case, 10 goroutines).

Each goroutine consumes from the channel as long as the channel is open; once the channel closes the goroutine ends and calls Done on the WaitGroup.

Now each iteration of the loop simply sends to the channel. Some of these may block operation if the channel’s buffer is full, but we generally won’t have to worry about that unless the work being performed stalls entirely.

At the end, we close the channel, which tells the goroutines to stop consuming the channel once it's empty. Note that we also need to Wait; if we didn't, there might be goroutines in flight that don't finish before our process finishes.

func chanTest() {
startTime := time.Now()
ch := make(chan time.Duration, 10)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for d := range ch {
time.Sleep(d)
}
}()
}
for i := 0; i < NUM_ITERATIONS; i++ {
ch <- FAST_DURATION
ch <- SLOW_DURATION
ch <- SLOW_DURATION
}
close(ch)
wg.Wait()
log.Println("channel", time.Since(startTime))
}

We’ve improved our concurrency; and we can change how many goroutines we’re running in parallel, which is nice. And there’s also less boilerplate and repetition.

But we have to be concerned with the mechanics of both the channel and the WaitGroup. And perhaps worst of all, our logic is more annoying to test; it doesn't matter here, but when real work is happening, and we're deciding what to do based on what was sent to the channel in an ifstatement or something, we're going to regret it.

go-pool

Enter go-pool. It takes the channel + WaitGroup pattern and hides the internal mechanics, so you can focus on your code.

go where the go-pool goes

This program starts a pool with a buffer size of 10 and 10 goroutines; that’s the same as before, but with the benefit of putting those numbers in the same place. You’ll typically want to tweak them at the same time.

You define what work you’re going to do by creating “work units”. There’s a pool.WorkUnit interface, which you implement by defining a Perform function where you do your work. Now your logic is separated such that it will remain easier to read and test.

In each iteration of the loop, we create our work units and add them to the pool. (Note that a pool can execute different kinds of work units, as long as they implement pool.WorkUnit.) Calls to Add can block just like manually adding to the channel did, if the internal channel is full.

Once we’re done adding work units, we Close the pool, which closes the channel and waits for all the work to finish; but we don't have to care about the internal mechanics of both the channel and the WaitGrouplike we did before. All we need to know is that Close will block until all the work is done.

type FastWork struct{}
type SlowWork struct{}
func (u FastWork) Perform() {
time.Sleep(FAST_DURATION)
}
func (u SlowWork) Perform() {
time.Sleep(SLOW_DURATION)
}
func poolTest() {
startTime := time.Now()
p := pool.NewPool(10, 10)
p.Start()
for i := 0; i < NUM_ITERATIONS; i++ {
p.Add(FastWork{})
p.Add(SlowWork{})
p.Add(SlowWork{})
}
p.Close()
log.Println("go-pool", time.Since(startTime))
}

We have the same concurrency options as the previous solution, but they are more easily read and changed. Our separation of “defining what work needs to be done” and “executing the work” is at least as good, maybe better. Our code doesn’t have to manually touch channels or goroutines or WaitGroup at all. And we get to keep our logic nicely isolated in each implementation of pool.WorkUnit.

Benchmark

The results are unsurprising.

sync.WaitGroup     50.097073561s
channel 11.217436307s
go-pool 11.23290081s

The original solution, with its more strictly limited concurrency and waiting for each iteration of the loop, takes longer to complete. The go-poolsolution is the same as the channel solution, plus about 0.1% overhead of allocating the WorkUnit structs.

--

--

Sean Schulte
Sean Schulte

No responses yet