Golang工具库:errgroup

简介

> 官方文档

在Go语言中,errgroup是一个非常实用的并发工具包,它允许你启动多个goroutine,并提供错误传播和任务取消的功能。当你想实现一个“一荣俱荣,一损俱损”的场景,即一个任务失败时,其他正在执行或等待执行的任务也能立即终止,errgroup就能派上大用场。

errgroup的主要特点是:

  • 错误收集: 它会收集所有goroutine返回的第一个非nil错误。
  • 上下文取消: 当任何一个goroutine返回错误时,errgroup内部维护的context.Context会被取消。你可以通过监听这个ContextDone通道来感知取消事件,并及时终止当前goroutine的执行。

用法

实现任务失败,其他任务终止

  1. 创建errgroup.GroupContext
1
2
3
4
5
6
7
8
9
10
11
12
13
import (
"context"
"fmt"
"log"
"sync/errgroup"
"time"
)

func main() {
group, ctx := errgroup.WithContext(context.Background())
// ...
}

errgroup.WithContext(context.Background()) 会返回一个*errgroup.Group实例和一个派生自context.Background()Context。这个Context就是用于协同取消的关键。

  1. 启动Goroutine并传入Context

每个需要协同取消的goroutine都应该接收这个Context。在goroutine内部,你需要定期检查ctx.Done()通道。

1
2
3
4
5
6
7

for i := 0; i < 5; i++ {
taskID := i
group.Go(func() error {
return runTask(ctx, taskID)
})
}
  1. Goroutine 内部处理取消:

runTask函数(或其他实际任务函数)中,你需要通过select语句监听ctx.Done()。一旦ctx.Done()通道有信号(表示上下文已被取消),就应该立即停止当前任务的执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func runTask(ctx context.Context, id int) error {
log.Printf("任务 %d: 开始执行\n", id)
for i := 0; i < 10; i++ {
select {
case <-ctx.Done():
log.Printf("任务 %d: 检测到取消,正在终止\n", id)
return ctx.Err() // 返回上下文取消的错误
default:
// 模拟任务执行
time.Sleep(100 * time.Millisecond)
if id == 2 && i == 3 { // 模拟任务2在第三步失败
log.Printf("任务 %d: 发生错误!\n", id)
return fmt.Errorf("任务 %d 失败", id)
}
log.Printf("任务 %d: 执行步骤 %d\n", id, i+1)
}
}
log.Printf("任务 %d: 完成\n", id)
return nil
}
  1. 等待所有任务完成并获取错误:

最后,使用group.Wait()等待所有goroutine完成。Wait()会阻塞直到所有Go方法提交的goroutine都返回。如果任何一个goroutine返回了非nil的错误,Wait()也会返回这个错误(通常是第一个遇到的错误)。

1
2
3
4
5
if err := group.Wait(); err != nil {
log.Printf("所有任务完成,遇到错误: %v\n", err)
} else {
log.Println("所有任务成功完成")
}

完整示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package main

import (
"context"
"fmt"
"log"
"sync/errgroup"
"time"
)

func main() {
log.SetFlags(log.Lmicroseconds) // 设置日志显示微秒,便于观察
fmt.Println("--- 启动任务 ---")

group, ctx := errgroup.WithContext(context.Background())

// 启动多个并发任务
for i := 0; i < 5; i++ {
taskID := i
group.Go(func() error {
return runTask(ctx, taskID)
})
}

// 等待所有任务完成
if err := group.Wait(); err != nil {
fmt.Printf("--- 所有任务完成,遇到错误: %v ---\n", err)
} else {
fmt.Println("--- 所有任务成功完成 ---")
}
}

// runTask 模拟一个需要执行的任务
func runTask(ctx context.Context, id int) error {
log.Printf("任务 %d: 开始执行\n", id)
for i := 0; i < 10; i++ {
select {
case <-ctx.Done():
// 如果上下文被取消,立即返回错误
log.Printf("任务 %d: 检测到取消信号,正在终止\n", id)
return ctx.Err() // 返回上下文取消的错误,例如 context.Canceled
default:
// 模拟任务执行中的一些耗时操作
time.Sleep(100 * time.Millisecond)

// 模拟特定任务失败
if id == 2 && i == 3 { // 模拟任务2在执行到第四步时失败
log.Printf("任务 %d: 发生预期错误!\n", id)
return fmt.Errorf("任务 %d 模拟失败", id)
}
log.Printf("任务 %d: 执行步骤 %d\n", id, i+1)
}
}
log.Printf("任务 %d: 正常完成\n", id)
return nil
}

SetLimitTryGo 的使用

golang.org/x/sync/errgroup 包中,SetLimitTryGo 方法提供了对并发任务数量更精细的控制,这在资源受限或需要避免过度并发的场景中非常有用。

1. SetLimit(n int)

SetLimit 方法用于限制 errgroup 中同时运行的 goroutine 的最大数量

  • 功能: 设置并发限制。如果 n 为 0,则表示没有并发限制(默认行为)。如果 n 小于 0,SetLimit 会 panic。
  • 使用场景:
    • 控制资源使用: 例如,当你的任务需要访问有限的数据库连接、文件句柄或网络带宽时,可以通过限制并发数来避免资源耗尽。
    • 避免系统过载: 防止启动过多的 goroutine 导致 CPU 争用、内存不足等问题。
    • 流量控制: 在处理大量请求时,作为一种简单的流量控制机制。

示例:限制并发数为 2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import (
"context"
"fmt"
"log"
"sync/errgroup"
"time"
)

func main() {
log.SetFlags(log.Lmicroseconds)
fmt.Println("--- 启动任务 (SetLimit = 2) ---")

group, ctx := errgroup.WithContext(context.Background())
group.SetLimit(2) // 设置最大并发数为 2

for i := 0; i < 5; i++ {
taskID := i
group.Go(func() error {
return runLimitedTask(ctx, taskID)
})
}

if err := group.Wait(); err != nil {
fmt.Printf("--- 所有任务完成,遇到错误: %v ---\n", err)
} else {
fmt.Println("--- 所有任务成功完成 ---")
}
}

func runLimitedTask(ctx context.Context, id int) error {
log.Printf("任务 %d: 开始执行\n", id)
defer log.Printf("任务 %d: 结束执行\n", id) // 确保在任务结束时打印

select {
case <-ctx.Done():
log.Printf("任务 %d: 检测到取消信号,正在终止\n", id)
return ctx.Err()
case <-time.After(time.Duration(2+id) * time.Second): // 模拟不同耗时
// 模拟任务执行,这里故意让任务0和任务1先完成,以便任务2、3、4能开始
// 实际应用中,这里是你的业务逻辑
}

// 模拟一个任务失败
if id == 3 {
log.Printf("任务 %d: 模拟失败\n", id)
return fmt.Errorf("任务 %d 失败", id)
}

log.Printf("任务 %d: 正常完成\n", id)
return nil
}

2. TryGo(f func() error) bool

TryGo 方法尝试启动一个新的 goroutine。与 Go 不同的是,如果当前的并发数已经达到 SetLimit 设定的限制,TryGo 会立即返回 false,而不会阻塞。

功能: 非阻塞地尝试启动一个 goroutine。

  • 返回: 如果成功启动 goroutine,返回 true;如果因达到并发限制而未能启动,返回 false
  • 使用场景:
    • 快速失败/跳过: 当你有一些非关键任务,或者希望在并发数已满时跳过某些任务而不是等待它们时。
    • 结合其他调度策略: 可以与 channel 或其他同步原语结合,实现更复杂的任务调度逻辑,例如将未能启动的任务放入队列稍后重试。
    • 避免死锁(在某些复杂场景下): 如果你的任务之间存在依赖,并且在并发数满时阻塞 Go 可能会导致死锁,那么 TryGo 可以提供一种非阻塞的选择。

示例:使用 TryGo 尝试启动任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package main

import (
"context"
"fmt"
"log"
"sync/errgroup"
"time"
)

func main() {
log.SetFlags(log.Lmicroseconds)
fmt.Println("--- 启动任务 (SetLimit = 2, 使用 TryGo) ---")

group, ctx := errgroup.WithContext(context.Background())
group.SetLimit(2) // 设置最大并发数为 2

for i := 0; i < 5; i++ {
taskID := i
// 尝试启动任务
if ok := group.TryGo(func() error {
return runTryGoTask(ctx, taskID)
}); ok {
log.Printf("任务 %d: 成功提交\n", taskID)
} else {
log.Printf("任务 %d: 提交失败,并发限制已满\n", taskID)
// 这里可以处理提交失败的情况,例如将任务放入队列稍后重试
}
}

if err := group.Wait(); err != nil {
fmt.Printf("--- 所有任务完成,遇到错误: %v ---\n", err)
} else {
fmt.Println("--- 所有任务成功完成 ---")
}
}

func runTryGoTask(ctx context.Context, id int) error {
log.Printf("任务 %d: 开始执行\n", id)
defer log.Printf("任务 %d: 结束执行\n", id)

select {
case <-ctx.Done():
log.Printf("任务 %d: 检测到取消信号,正在终止\n", id)
return ctx.Err()
case <-time.After(time.Duration(2+id) * time.Second): // 模拟不同耗时
}

if id == 3 {
log.Printf("任务 %d: 模拟失败\n", id)
return fmt.Errorf("任务 %d 失败", id)
}

log.Printf("任务 %d: 正常完成\n", id)
return nil
}