Golang基于多线程、协程实现,与生俱来适合异步编程,当我们遇到那种需要批量处理且耗时的操作时,传统的线性执行就显得吃力,这时就会想到异步并行处理。下面介绍一些异步编程方式和技巧。

作者:zvalhu

一、使用方式

1.1、最简单的最常用的方式:使用go关键词

1
2
3
4
5
6
7
8
func main() {  
 go func() {  
  fmt.Println("hello world1")  
 }()  
 go func() {  
  fmt.Println("hello world2")  
 }()  
}

或者:

1
2
3
4
5
6
7
func main() {  
 go Announce("hello world1")  
 go Announce("hello world2")  
}  
func Announce(message string) {  
 fmt.Println(message)  
}

使用匿名函数传递参数

1
2
3
4
5
data := "Hello, World!"  
go func(msg string) {  
		// Use msg to perform asynchronous task logic processing
      fmt.Println(msg)  
}(data)

这种方式不需要考虑返回值问题,如果要考虑返回值,可以使用下面的方式。

1.2、通过goroutine和channel来实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
ch := make(chan int, 1) // 创建一个带缓冲的channel  
// ch := make(chan int, 0) // 创建一个无缓冲的channel  
  
go func() {  
    // 异步任务逻辑  
    ch <- result // 将结果发送到channel  
    // 异步任务逻辑  
    close(ch) // 关闭channel,表示任务完成  
}()  
// 在需要的时候从channel接收结果  
result := <-ch

1.3、使用sync.WaitGroup

sync.WaitGroup用于等待一组协程完成其任务。通过Add()方法增加等待的协程数量,Done()方法标记协程完成,Wait()方法阻塞直到所有协程完成。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
var wg sync.WaitGroup  
// 启动多个协程  
for i := 0; i < 5; i++ {  
    wg.Add(1)  
    go func(index int) {  
        defer wg.Done()  
        // 异步任务逻辑  
    }(i)  
}
// 等待所有协程完成  
wg.Wait()

1.4、使用errgroup实现协程组的错误处理

如果想简单获取协程返回的错误,errgroup包很适合,errgroup包是Go语言标准库中的一个实用工具,用于管理一组协程并处理它们的错误。可以使用errgroup.Group结构来跟踪和处理协程组的错误。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
var eg errgroup.Group  
for i := 0; i < 5; i++ {  
    eg.Go(func() error {  
     return errors.New("error")  
    })  
  
    eg.Go(func() error {  
     return nil  
    })  
}  
  
if err := eg.Wait(); err != nil {  
    // 处理错误  
}

二、一些使用技巧

2.1、使用channel的range和close操作

range操作可以在接收通道上迭代值,直到通道关闭。可以使用close函数关闭通道,以向接收方指示没有更多的值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
ch := make(chan int)  
  
go func() {  
    for i := 0; i < 5; i++ {  
        ch <- i // 发送值到通道  
    }  
    close(ch) // 关闭通道  
}()  
  
// 使用range迭代接收通道的值  
for val := range ch {  
    // 处理接收到的值  
}

2.2、使用select语句实现多个异步操作的等待

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
ch1 := make(chan int)  
ch2 := make(chan string)  
  
go func() {  
    // 异步任务1逻辑  
    ch1 <- result1  
}()  
  
go func() {  
    // 异步任务2逻辑  
    ch2 <- result2  
}()  
  
// 在主goroutine中等待多个异步任务完成  
select {  
case res1 := <-ch1:  
    // 处理结果1  
case res2 := <-ch2:  
    // 处理结果2  
}

2.3、使用select和time.After()实现超时控制

如果需要在异步操作中设置超时,可以使用select语句结合time.After()函数实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
ch := make(chan int)  
  
go func() {  
    // 异步任务逻辑  
    time.Sleep(2 * time.Second)  
    ch <- result  
}()  
  
// 设置超时时间  
select {  
case res := <-ch:  
    // 处理结果  
case <-time.After(3 * time.Second):  
    // 超时处理  
}

2.4、使用select和time.After()实现超时控制

如果需要在异步操作中设置超时,可以使用select语句结合time.After()函数实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
ch := make(chan int)  
  
go func() {  
    // 异步任务逻辑  
    time.Sleep(2 * time.Second)  
    ch <- result  
}()  
  
// 设置超时时间  
select {  
case res := <-ch:  
    // 处理结果  
case <-time.After(3 * time.Second):  
    // 超时处理  
}

2.5、使用time.Tick()和time.After()进行定时操作

time.Tick()函数返回一个通道,定期发送时间值,可以用于执行定时操作。time.After()函数返回一个通道,在指定的时间后发送一个时间值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
tick := time.Tick(1 * time.Second) // 每秒执行一次操作  
  
for {  
    select {  
    case <-tick:  
        // 执行定时操作  
    }  
}  
  
select {  
case <-time.After(5 * time.Second):  
    // 在5秒后执行操作  
}

2.6、使用sync.Mutex或sync.RWMutex进行并发安全访问

当多个协程并发访问共享数据时,需要确保数据访问的安全性。sync.Mutex和sync.RWMutex提供了互斥锁和读写锁,用于在访问共享资源之前进行锁定,以避免数据竞争。sync.RWMutex是一种读写锁,可以在多个协程之间提供对共享资源的并发访问控制。多个协程可以同时获取读锁,但只有一个协程可以获取写锁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
var mutex sync.Mutex  
var data int  
  
// 写操作,使用互斥锁保护数据  
mutex.Lock()  
data = 123  
mutex.Unlock()  
  
// 读操作,使用读锁保护数据  
//RLock()加读锁时,如果存在写锁,则无法加读锁;当只有读锁或者没有锁时,可以加读锁,读锁可以加载多个  
mutex.RLock()  
value := data  
mutex.RUnlock()  
  
var rwMutex sync.RWMutex  
var sharedData map[string]string  
  
// 读操作,使用rwMutex.RLock读锁保护数据  
func readData(key string) string {  
    rwMutex.RLock()  
    defer rwMutex.RUnlock()  
    return sharedData[key]  
}  
  
// 写操作,使用rwMutex.Lock写锁保护数据  
func writeData(key, value string) {  
    rwMutex.Lock()  
    defer rwMutex.Unlock()  
    sharedData[key] = value  
}

注意:sync.Mutex 的锁是不可以嵌套使用的 sync.RWMutex 的 RLock()是可以嵌套使用的 sync.RWMutex 的 mu.Lock() 是不可以嵌套的 sync.RWMutex 的 mu.Lock() 中不可以嵌套 mu.RLock()

2.7、使用sync.Cond进行条件变量控制

sync.Cond是一个条件变量,用于在协程之间进行通信和同步。它可以在指定的条件满足之前阻塞等待,并在条件满足时唤醒等待的协程。

2.8、使用sync.Pool管理对象池

sync.Pool是一个对象池,用于缓存和复用临时对象,可以提高对象的分配和回收效率。

1
<span></span><code>type&nbsp;MyObject&nbsp;struct&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;<span>//&nbsp;对象结构</span><br>}<br><br><span>var</span>&nbsp;objectPool&nbsp;=&nbsp;sync.Pool{<br>&nbsp;&nbsp;&nbsp;&nbsp;<span>New</span>:&nbsp;func()&nbsp;interface{}&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<span>//&nbsp;创建新对象</span><br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<span>return</span>&nbsp;&amp;MyObject{}<br>&nbsp;&nbsp;&nbsp;&nbsp;},<br>}<br><br><span>//&nbsp;从对象池获取对象</span><br><span>obj</span>&nbsp;:=&nbsp;objectPool.Get().(*MyObject)<br><br><span>//&nbsp;使用对象</span><br><br><span>//&nbsp;将对象放回对象池</span><br>objectPool.Put(obj)<br></code>

2.9、使用sync.Once实现只执行一次的操作

sync.Once用于确保某个操作只执行一次,无论有多少个协程尝试执行它,常用于初始化或加载资源等场景。

1
<span></span><code><span>var</span>&nbsp;once&nbsp;sync.Once<br><span>var</span>&nbsp;resource&nbsp;*Resource<br><br>func&nbsp;getResource()&nbsp;*Resource&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;once.Do(func()&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<span>//&nbsp;执行初始化资源的操作,仅执行一次</span><br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;resource&nbsp;=&nbsp;initResource()<br>&nbsp;&nbsp;&nbsp;&nbsp;})<br>&nbsp;&nbsp;&nbsp;&nbsp;<span>return</span>&nbsp;resource<br>}<br><br><span>//&nbsp;在多个协程中获取资源</span><br>go&nbsp;func()&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;<span>res</span>&nbsp;:=&nbsp;getResource()<br>&nbsp;&nbsp;&nbsp;&nbsp;<span>//&nbsp;使用资源</span><br>}()<br><br>go&nbsp;func()&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;<span>res</span>&nbsp;:=&nbsp;getResource()<br>&nbsp;&nbsp;&nbsp;&nbsp;<span>//&nbsp;使用资源</span><br>}()<br></code>

2.10、使用sync.Once和context.Context实现资源清理

可以结合使用sync.Once和context.Context来确保在多个协程之间只执行一次资源清理操作,并在取消或超时时进行清理。

1
<span></span><code><span>var</span>&nbsp;once&nbsp;sync.Once<br><br>func&nbsp;cleanup()&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;<span>//&nbsp;执行资源清理操作</span><br>}<br><br>func&nbsp;doTask(ctx&nbsp;context.Context)&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;go&nbsp;func()&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;select&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<span>case</span>&nbsp;<span>&lt;<span>-ctx.Done():</span><br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<span>once.Do</span>(<span>cleanup</span>)&nbsp;//&nbsp;只执行一次资源清理<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}<br>&nbsp;&nbsp;&nbsp;&nbsp;}()<br><br>&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;异步任务逻辑<br>}<br></span></code>

2.11、使用sync.Map实现并发安全的映射

sync.Map是Go语言标准库中提供的并发安全的映射类型,可在多个协程之间安全地进行读写操作。

1
<span></span><code><span>var</span>&nbsp;m&nbsp;sync.Map<br><br><span>//&nbsp;存储键值对</span><br>m.Store(<span>"key"</span>,&nbsp;<span>"value"</span>)<br><br><span>//&nbsp;获取值</span><br><span>if</span>&nbsp;val,&nbsp;<span>ok</span>&nbsp;:=&nbsp;m.Load(<span>"key"</span>);&nbsp;ok&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;<span>//&nbsp;使用值</span><br>}<br><br><span>//&nbsp;删除键</span><br>m.Delete(<span>"key"</span>)<br></code>

2.12、使用context.Context进行协程管理和取消

context.Context用于在协程之间传递上下文信息,并可用于取消或超时控制。可以使用context.WithCancel()创建一个可取消的上下文,并使用context.WithTimeout()创建一个带有超时的上下文。

1
<span></span><code>ctx,&nbsp;<span>cancel</span>&nbsp;:=&nbsp;context.WithCancel(context.Background())<br><br>go&nbsp;func()&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;<span>//&nbsp;异步任务逻辑</span><br>&nbsp;&nbsp;&nbsp;&nbsp;<span>if</span>&nbsp;someCondition&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;cancel()&nbsp;<span>//&nbsp;取消任务</span><br>&nbsp;&nbsp;&nbsp;&nbsp;}<br>}()<br><br><span>//&nbsp;等待任务完成或取消</span><br>select&nbsp;{<br><span>case</span>&nbsp;<span>&lt;<span>-ctx.Done():</span><br>&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;任务被取消或超时<br>}<br></span></code>

2.13、使用context.WithDeadline()和context.WithTimeout()设置截止时间

context.WithDeadline()和context.WithTimeout()函数可以用于创建带有截止时间的上下文,以限制异步任务的执行时间。

1
<span></span><code>func&nbsp;doTask(ctx&nbsp;context.Context)&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;<span>//&nbsp;异步任务逻辑</span><br><br>&nbsp;&nbsp;&nbsp;&nbsp;select&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;<span>case</span>&nbsp;&lt;-time.After(5&nbsp;*&nbsp;time.Second):<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;超时处理<br>&nbsp;&nbsp;&nbsp;&nbsp;case&nbsp;&lt;-ctx.Done():<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;上下文取消处理<br>&nbsp;&nbsp;&nbsp;&nbsp;}<br>}<br><br>func&nbsp;main()&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;ctx&nbsp;:=&nbsp;context.Background()<br>&nbsp;&nbsp;&nbsp;&nbsp;ctx,&nbsp;cancel&nbsp;:=&nbsp;context.WithTimeout(ctx,&nbsp;3*time.Second)<br>&nbsp;&nbsp;&nbsp;&nbsp;defer&nbsp;cancel()<br><br>&nbsp;&nbsp;&nbsp;&nbsp;go&nbsp;doTask(ctx)<br><br>&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;继续其他操作<br>}<br></code>

2.14、使用context.WithValue()传递上下文值

context.WithValue()函数可用于在上下文中传递键值对,以在协程之间共享和传递上下文相关的值。

1
<span></span><code>type&nbsp;keyContextValue&nbsp;string<br><br>func&nbsp;doTask(ctx&nbsp;context.Context)&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;<span>if</span>&nbsp;val&nbsp;:=&nbsp;ctx.Value(keyContextValue(<span>"key"</span>));&nbsp;val&nbsp;!=&nbsp;nil&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<span>//&nbsp;使用上下文值</span><br>&nbsp;&nbsp;&nbsp;&nbsp;}<br>}<br><br>func&nbsp;main()&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;<span>ctx</span>&nbsp;:=&nbsp;context.WithValue(context.Background(),&nbsp;keyContextValue(<span>"key"</span>),&nbsp;<span>"value"</span>)<br>&nbsp;&nbsp;&nbsp;&nbsp;go&nbsp;doTask(ctx)<br><br>&nbsp;&nbsp;&nbsp;&nbsp;<span>//&nbsp;继续其他操作</span><br>}<br></code>

2.15、使用atomic包进行原子操作

atomic包提供了一组函数,用于实现原子操作,以确保在并发环境中对共享变量的读写操作是原子的。

1
<span></span><code><span>var</span>&nbsp;counter&nbsp;int64<br><br>func&nbsp;increment()&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;atomic.AddInt64(&amp;counter,&nbsp;<span>1</span>)<br>}<br><br>func&nbsp;main()&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;<span>var</span>&nbsp;wg&nbsp;sync.WaitGroup<br><br>&nbsp;&nbsp;&nbsp;&nbsp;<span>for</span>&nbsp;i&nbsp;:=&nbsp;<span>0</span>;&nbsp;i&nbsp;&lt;&nbsp;<span>100</span>;&nbsp;i++&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;wg.Add(<span>1</span>)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;go&nbsp;func()&nbsp;{<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;defer&nbsp;wg.Done()<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;increment()<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}()<br>&nbsp;&nbsp;&nbsp;&nbsp;}<br><br>&nbsp;&nbsp;&nbsp;&nbsp;wg.Wait()<br>&nbsp;&nbsp;&nbsp;&nbsp;fmt.Println(<span>"Counter:"</span>,&nbsp;counter)<br>}<br></code>
Licensed under CC BY-NC-SA 4.0
Built with Hugo
Theme Stack designed by Jimmy