Featured image of post Decryption Go:WaitGroup

Decryption Go:WaitGroup

 

sync.WaitGroup, I believe all gophers have used it. It is a concurrency primitive in the package sync used for task coordination. It solves the problem of concurrency waiting: when a goroutine A is waiting at a checkpoint for a group of goroutines to complete. Without this synchronization primitive, how can we achieve this functionality? There are many ways; let’s first try using channels to implement it.

This article was first published in the Medium MPP plan. If you are a Medium user, please follow me on Medium. Thank you very much.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
	"fmt"
	"time"
)

func main() {
	channels := make(chan struct{}, 10)
	for i := 0; i < 10; i++ {
		go func() {
			println("hello,world")
			time.Sleep(time.Second * 10)
			channels <- struct{}{}
		}()
	}
	for i := 0; i < 10; i++ {
		<-channels
		fmt.Println(fmt.Sprintf("done:%d", i))
	}
	fmt.Println("all done")
}

Although this code can fulfill the functionality, it is cumbersome. We not only need to create a channel but also create a goroutine to wait for this channel. Moreover, if the number of goroutines we need to wait for is uncertain, this approach is not very suitable. Therefore, sync.WaitGroup comes into play.

Now, let’s implement the above functionality in a different way using sync.WaitGroup link.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	wg := sync.WaitGroup{}
	wg.Add(10)
	for i := 0; i < 10; i++ {
		go func() {
			defer wg.Done()
			println("hello,world")
			time.Sleep(time.Second * 10)
		}()
	}
	for i := 0; i < 10; i++ {
		go func() {
			wg.Wait()
			fmt.Println(fmt.Sprintf("done,%d", i))
		}()
	}
	wg.Wait()
	fmt.Println("all done")
	time.Sleep(time.Second * 1)
}

In fact, many operating systems and programming languages provide similar concurrency primitives. For example, the barrier in Linux, the barrier in Pthread (POSIX threads), std::barrier in C++, CyclicBarrier and CountDownLatch in Java, and so on. This concurrency primitive is therefore a very fundamental type of concurrency.

Basic Usage of WaitGroup

The basic usage of WaitGroup is very simple, with only three methods:

1
2
3
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()

The Add method is used to increase the number of goroutines to wait for, the Done method is used to decrease the number of goroutines to wait for, and the Wait method is used to wait for all goroutines to complete.

Implementation of WaitGroup

This article is based on go1.21.4

First, let’s take a look at the data structure of WaitGroup:

1
2
3
4
5
type WaitGroup struct {
	noCopy noCopy
	state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
	sema  uint32
}

The data structure of WaitGroup is very simple, with only two fields: state and sema. Among them, state is an atomic.Uint64 type used to store the counter and the number of waiting goroutines. sema is a semaphore used for waiting.
Pasted image 20240312215620

Add Method

Let’s take a look at the implementation of the Add method, removing the race detection.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (wg *WaitGroup) Add(delta int) {
	state := wg.state.Add(uint64(delta) << 32)
	v := int32(state >> 32) // counter
	w := uint32(state)      // waiter count
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	if v > 0 || w == 0 {
		return
	}
	// This goroutine has set counter to 0 when waiters > 0.
	// Now there can't be concurrent mutations of state:
	// - Adds must not happen concurrently with Wait,
	// - Wait does not increment waiters if it sees counter == 0.
	// Still do a cheap sanity check to detect WaitGroup misuse.
	if wg.state.Load() != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// Reset waiters count to 0. Wake up all waiters.
	wg.state.Store(0)
	for ; w != 0; w-- {
		runtime_Semrelease(&wg.sema, false, 0)
	}
}

The implementation of the Add method is very simple. It left-shifts the delta by 32 bits and adds it to the state. Then, it checks if the counter is less than 0, and if so, it panics. It also checks if the waiter count is not 0 and if the delta is greater than 0 and the counter is equal to delta, it panics. If the counter is greater than 0 or the waiter count is 0, it returns. If the state has been modified by other goroutines, it panics. Finally, it sets the waiter count to 0 and wakes up all waiters.

Done Method

The implementation of the Done method is very simple. It calls the Add method with a parameter of -1.

Wait Method

The logic of the Wait method is as follows: it continuously checks the value of the state. If the counter becomes 0, it means that all tasks have been completed, and the caller does not need to wait any longer, so it returns directly. If the counter is greater than 0, it means that there are still tasks that have not been completed, so the caller becomes a waiter, needs to join the waiter queue, and blocks itself.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (wg *WaitGroup) Wait() {
	for {
		state := wg.state.Load()
		v := int32(state >> 32)
		w := uint32(state)
		if v == 0 {
			return
		}
		// Increment waiters count.
		if wg.state.CompareAndSwap(state, state+1) {
			runtime_Semacquire(&wg.sema) // block until signalled
			if wg.state.Load() != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			return
		}
	}
}

Common Errors when Using WaitGroup

  1. The high 32 bits of the state are the counter, and the low 32 bits are the waiter count. If the value of the counter exceeds 2^31-1, it will panic. This problem rarely occurs in practical use because this value is too large.
  2. Add and Done do not appear in pairs, which means that if you add n times, you should also call Done n times. If you add n times but call Done m times, where m < n, it will panic or cause deadlock.
  3. Unexpected timing of Add. If Add is called after Wait, it will panic. Because this will cause the counter to become 0, but there are still goroutines waiting.
  4. Reusing WaitGroup.

Bugs in WaitGroup usage in real projects

  1. In Golang issue 28123, the biggest problem in this code is that line 9 copies the WaitGroup instance w. Although this code can be executed successfully, it does violate the rule of not copying the WaitGroup instance after use. In projects, we can use the vet tool to detect such errors.
  2. Docker issue 28161 and issue 27011 are both errors caused by reusing WaitGroup without waiting for the previous Wait to finish before calling Add.
  3. Etcd issue 6534 is also a bug in reusing WaitGroup, where Add is called without waiting for the previous Wait to finish.
  4. Kubernetes issue 59574 is a bug in which the Wait is forgotten before increasing the count. This bug is considered almost impossible to occur in our usual understanding.

Conclusion

sync.WaitGroup is a very basic concurrency primitive. Its implementation is very simple, but it has many pitfalls. When using it, you must pay attention to not copying the WaitGroup instance, not reusing the WaitGroup instance, not calling Add after Wait, and ensuring that Add and Done appear in pairs. In real projects, we can use the go vet tool to check for these issues.

true
Last updated on Jun 28, 2024 19:35 CST
Built with Hugo
Theme Stack designed by Jimmy