Go高性能编程 EP6: 异步编程的技巧

 

Golang最大的使命就是简化异步编程,当我们遇到那种需要批量处理且耗时的操作时,传统的单线程执行就显得吃力,这时就会想到异步并行处理。本篇文章介绍一些Golang异步编程的技巧。

This article was first published in the Medium MPP plan. If you are a Medium user, please follow me on Medium. Thank you very much.

首先介绍一个简化并发编程的的库conc 里面封装了很多实用的工具,比如WaitGroupiter.Map等。我们并不一定要在生产代码中使用conc,但是学习他的一些思路还是可以的。

使用方式

go

最简单的最常用的方式:使用go关键词

1
2
3
4
5
6
7
8
func main() {  
 go func() {  
  fmt.Println("hello world1")  
 }()  
 go func() {  
  fmt.Println("hello world2")  
 }()  
}

或者:

1
2
3
4
5
6
7
func main() {  
 go Announce("hello world1")  
 go Announce("hello world2")  
}  
func Announce(message string) {  
 fmt.Println(message)  
}

使用匿名函数传递参数

1
2
3
4
5
data := "Hello, World!"  
go func(msg string) {  
		// Use msg to perform asynchronous task logic processing
      fmt.Println(msg)  
}(data)

这种方式不需要考虑返回值问题,如果要考虑返回值,可以使用下面的方式。

通过goroutine和channel来实现超时控制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
ch := make(chan int, 1)  
timer := time.NewTimer(time.Second)  
go func() {  
    time.Sleep(2 * time.Second)  
    ch <- 1  
    close(ch)  
}()  
select {  
case <-timer.C:  
    fmt.Println("timeout")  
case result := <-ch:  
    fmt.Println(result)  
}

使用sync.WaitGroup

sync.WaitGroup用于等待一组协程完成其任务。通过Add()方法增加等待的协程数量,Done()方法标记协程完成,Wait()方法阻塞直到所有协程完成。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    var wg sync.WaitGroup  
    // 启动多个协程  
    for i := 0; i < 5; i++ {  
       wg.Add(1)  
       go func(index int) {  
          defer wg.Done()  
          // 异步任务逻辑  
       }(i)  
    }  
    wg.Wait()  
}

1.4、使用errgroup实现goroutine group的错误处理

如果想简单获取协程返回的错误,errgroup包很适合,errgroup包是Go语言标准库中的一个实用工具,用于管理一组协程并处理它们的错误。可以使用errgroup.Group结构来跟踪和处理协程组的错误。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
var eg errgroup.Group  
for i := 0; i < 5; i++ {  
    eg.Go(func() error {  
     return errors.New("error")  
    })  
  
    eg.Go(func() error {  
     return nil  
    })  
}  
  
if err := eg.Wait(); err != nil {  
    // 处理错误  
}

一些使用技巧

使用channel的range和close操作

range操作可以在接收通道上迭代值,直到通道关闭。可以使用close函数关闭通道,以向接收方指示没有更多的值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
ch := make(chan int)  
  
go func() {  
    for i := 0; i < 5; i++ {  
        ch <- i // 发送值到通道  
    }  
    close(ch) // 关闭通道  
}()  
  
// 使用range迭代接收通道的值  
for val := range ch {  
    // 处理接收到的值  
}
// do somethings

使用select语句实现多个异步操作的等待

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
ch1 := make(chan int)  
ch2 := make(chan string)  
  
go func() {  
    // 异步任务1逻辑  
    ch1 <- result1  
}()  
  
go func() {  
    // 异步任务2逻辑  
    ch2 <- result2  
}()  
  
// 在主goroutine中等待多个异步任务完成  
select {  
case res1 := <-ch1:  
    // 处理结果1  
case res2 := <-ch2:  
    // 处理结果2  
}

使用select和time.After()实现超时控制

如果需要在异步操作中设置超时,可以使用select语句结合time.After()函数实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
ch := make(chan int)  
  
go func() {  
    // 异步任务逻辑  
    time.Sleep(2 * time.Second)  
    ch <- result  
}()  
  
// 设置超时时间  
select {  
case res := <-ch:  
    // 处理结果  
case <-time.After(3 * time.Second):  
    // 超时处理  
}

使用time.Tick()和time.After()进行定时操作

time.Tick()函数返回一个通道,定期发送时间值,可以用于执行定时操作。time.After()函数返回一个通道,在指定的时间后发送一个时间值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
tick := time.Tick(1 * time.Second) // 每秒执行一次操作  
  
for {  
    select {  
    case <-tick:  
        // 执行定时操作  
    }  
}  
  
select {  
case <-time.After(5 * time.Second):  
    // 在5秒后执行操作  
}

使用sync.Mutex或sync.RWMutex进行并发安全访问

当多个协程并发访问共享数据时,需要确保数据访问的安全性。sync.Mutex和sync.RWMutex提供了互斥锁和读写锁,用于在访问共享资源之前进行锁定,以避免数据竞争。sync.RWMutex是一种读写锁,可以在多个协程之间提供对共享资源的并发访问控制。多个协程可以同时获取读锁,但只有一个协程可以获取写锁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
var mutex sync.Mutex  
var data int  
  
// 写操作,使用互斥锁保护数据  
mutex.Lock()  
data = 123  
mutex.Unlock()  
  
// 读操作,使用读锁保护数据  
//RLock()加读锁时,如果存在写锁,则无法加读锁;当只有读锁或者没有锁时,可以加读锁,读锁可以加载多个  
mutex.RLock()  
value := data  
mutex.RUnlock()  
  
var rwMutex sync.RWMutex  
var sharedData map[string]string  
  
// 读操作,使用rwMutex.RLock读锁保护数据  
func readData(key string) string {  
    rwMutex.RLock()  
    defer rwMutex.RUnlock()  
    return sharedData[key]  
}  
  
// 写操作,使用rwMutex.Lock写锁保护数据  
func writeData(key, value string) {  
    rwMutex.Lock()  
    defer rwMutex.Unlock()  
    sharedData[key] = value  
}

注意:sync.Mutex 的锁是不可以嵌套使用的 sync.RWMutex 的 RLock()是可以嵌套使用的 sync.RWMutex 的 mu.Lock() 是不可以嵌套的 sync.RWMutex 的 mu.Lock() 中不可以嵌套 mu.RLock()

使用sync.Cond进行条件变量控制

sync.Cond是一个条件变量,用于在协程之间进行通信和同步。它可以在指定的条件满足之前阻塞等待,并在条件满足时唤醒等待的协程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
var cond = sync.NewCond(&sync.Mutex{})  
var ready bool  
  
go func() {  
    // 异步任务逻辑  
    ready = true  
  
    // 通知等待的协程条件已满足  
    cond.Broadcast()  
}()  
  
// 在某个地方等待条件满足  
cond.L.Lock()  
for !ready {  
    cond.Wait()  
}  
cond.L.Unlock()

使用sync.Pool管理对象池

sync.Pool是一个对象池,用于缓存和复用临时对象,可以提高对象的分配和回收效率。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
type MyObject struct {  
    // 对象结构  
}  
  
var objectPool = sync.Pool{  
    New: func() interface{} {  
        // 创建新对象  
        return &MyObject{}  
    },  
}  
  
// 从对象池获取对象  
obj := objectPool.Get().(*MyObject)  
  
// 使用对象  
  
// 将对象放回对象池  
objectPool.Put(obj)

使用sync.Once实现只执行一次的操作

sync.Once用于确保某个操作只执行一次,无论有多少个协程尝试执行它,常用于初始化或加载资源等场景。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
var once sync.Once  
var resource *Resource  
  
func getResource() *Resource {  
    once.Do(func() {  
        // 执行初始化资源的操作,仅执行一次  
        resource = initResource()  
    })  
    return resource  
}  
  
// 在多个协程中获取资源  
go func() {  
    res := getResource()  
    // 使用资源  
}()  
  
go func() {  
    res := getResource()  
    // 使用资源  
}()

使用sync.Once和context.Context实现资源清理

可以结合使用sync.Once和context.Context来确保在多个协程之间只执行一次资源清理操作,并在取消或超时时进行清理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
var once sync.Once  
  
func cleanup() {  
    // 执行资源清理操作  
}  
  
func doTask(ctx context.Context) {  
    go func() {  
        select {  
        case <-ctx.Done():  
            once.Do(cleanup) // 只执行一次资源清理  
        }  
    }()  
  
    // 异步任务逻辑  
}

使用sync.Map实现并发安全的Map

sync.Map是Go语言标准库中提供的并发安全的映射类型,可在多个协程之间安全地进行读写操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
var m sync.Map  
  
// 存储键值对  
m.Store("key", "value")  
  
// 获取值  
if val, ok := m.Load("key"); ok {  
    // 使用值  
}  
  
// 删除键  
m.Delete("key")

使用context.Context进行协程管理和取消

context.Context用于在协程之间传递上下文信息,并可用于取消或超时控制。可以使用context.WithCancel()创建一个可取消的上下文,并使用context.WithTimeout()创建一个带有超时的上下文。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
ctx, cancel := context.WithCancel(context.Background())  
  
go func() {  
    // 异步任务逻辑  
    if someCondition {  
        cancel() // 取消任务  
    }  
}()  
  
// 等待任务完成或取消  
select {  
case <-ctx.Done():  
    // 任务被取消或超时  
}

使用context.WithDeadline()和context.WithTimeout()设置截止时间

context.WithDeadline()和context.WithTimeout()函数可以用于创建带有截止时间的上下文,以限制异步任务的执行时间。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func doTask(ctx context.Context) {  
    // 异步任务逻辑  
  
    select {  
    case <-time.After(5 * time.Second):  
        // 超时处理  
    case <-ctx.Done():  
        // 上下文取消处理  
    }  
}  
  
func main() {  
    ctx := context.Background()  
    ctx, cancel := context.WithTimeout(ctx, 3*time.Second)  
    defer cancel()  
  
    go doTask(ctx)  
  
    // 继续其他操作  
}

使用context.WithValue()传递上下文值

context.WithValue()函数可用于在上下文中传递键值对,以在协程之间共享和传递上下文相关的值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
type keyContextValue string  
  
func doTask(ctx context.Context) {  
    if val := ctx.Value(keyContextValue("key")); val != nil {  
        // 使用上下文值  
    }  
}  
  
func main() {  
    ctx := context.WithValue(context.Background(), keyContextValue("key"), "value")  
    go doTask(ctx)  
  
    // 继续其他操作  
}

使用atomic包进行原子操作

atomic包提供了一组函数,用于实现原子操作,以确保在并发环境中对共享变量的读写操作是原子的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
var counter int64
func increment() {
	atomic.AddInt64(&counter, 1)
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
	wg.Add(1)
	go func() {
		defer wg.Done()
			increment()
		}()
	}
	wg.Wait()
	fmt.Println("Counter:", counter)
}

总结

本篇文章,我们介绍的是一些常用的关键字,掌握这些,应对常用的并发编程已经问题没有什么问题了。相信您也已经发现了,go源码中没有提供javac# 中常用的 Barrier 功能,虽然Barrier 的功能我们使用基本的并发语句也能实现,但是不如调用现成的API接口方便。其实在 golang.org/x中有SingleFlight 以及是第三方[CyclicBarrier](https://github.com/marusama/cyclicbarrier)。下一篇文章中,我们将介绍这两个并发原语。

true
最后更新于 Jul 08, 2024 17:38 CST
使用 Hugo 构建
主题 StackJimmy 设计