简介
> 官方文档
在Go语言中,errgroup
是一个非常实用的并发工具包,它允许你启动多个goroutine
,并提供错误传播和任务取消的功能。当你想实现一个“一荣俱荣,一损俱损”的场景,即一个任务失败时,其他正在执行或等待执行的任务也能立即终止,errgroup
就能派上大用场。
errgroup
的主要特点是:
- 错误收集: 它会收集所有
goroutine
返回的第一个非nil错误。
- 上下文取消: 当任何一个
goroutine
返回错误时,errgroup
内部维护的context.Context
会被取消。你可以通过监听这个Context
的Done
通道来感知取消事件,并及时终止当前goroutine
的执行。
用法
实现任务失败,其他任务终止
- 创建
errgroup.Group
和Context
:
1 2 3 4 5 6 7 8 9 10 11 12 13
| import ( "context" "fmt" "log" "sync/errgroup" "time" )
func main() { group, ctx := errgroup.WithContext(context.Background()) }
|
errgroup.WithContext(context.Background())
会返回一个*errgroup.Group
实例和一个派生自context.Background()
的Context
。这个Context
就是用于协同取消的关键。
- 启动
Goroutine
并传入Context
:
每个需要协同取消的goroutine
都应该接收这个Context
。在goroutine
内部,你需要定期检查ctx.Done()
通道。
1 2 3 4 5 6 7
| for i := 0; i < 5; i++ { taskID := i group.Go(func() error { return runTask(ctx, taskID) }) }
|
- 在
Goroutine
内部处理取消:
在runTask
函数(或其他实际任务函数)中,你需要通过select
语句监听ctx.Done()
。一旦ctx.Done()
通道有信号(表示上下文已被取消),就应该立即停止当前任务的执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func runTask(ctx context.Context, id int) error { log.Printf("任务 %d: 开始执行\n", id) for i := 0; i < 10; i++ { select { case <-ctx.Done(): log.Printf("任务 %d: 检测到取消,正在终止\n", id) return ctx.Err() default: time.Sleep(100 * time.Millisecond) if id == 2 && i == 3 { log.Printf("任务 %d: 发生错误!\n", id) return fmt.Errorf("任务 %d 失败", id) } log.Printf("任务 %d: 执行步骤 %d\n", id, i+1) } } log.Printf("任务 %d: 完成\n", id) return nil }
|
- 等待所有任务完成并获取错误:
最后,使用group.Wait()
等待所有goroutine
完成。Wait()
会阻塞直到所有Go
方法提交的goroutine
都返回。如果任何一个goroutine
返回了非nil
的错误,Wait()
也会返回这个错误(通常是第一个遇到的错误)。
1 2 3 4 5
| if err := group.Wait(); err != nil { log.Printf("所有任务完成,遇到错误: %v\n", err) } else { log.Println("所有任务成功完成") }
|
完整示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| package main
import ( "context" "fmt" "log" "sync/errgroup" "time" )
func main() { log.SetFlags(log.Lmicroseconds) fmt.Println("--- 启动任务 ---")
group, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 5; i++ { taskID := i group.Go(func() error { return runTask(ctx, taskID) }) }
if err := group.Wait(); err != nil { fmt.Printf("--- 所有任务完成,遇到错误: %v ---\n", err) } else { fmt.Println("--- 所有任务成功完成 ---") } }
func runTask(ctx context.Context, id int) error { log.Printf("任务 %d: 开始执行\n", id) for i := 0; i < 10; i++ { select { case <-ctx.Done(): log.Printf("任务 %d: 检测到取消信号,正在终止\n", id) return ctx.Err() default: time.Sleep(100 * time.Millisecond)
if id == 2 && i == 3 { log.Printf("任务 %d: 发生预期错误!\n", id) return fmt.Errorf("任务 %d 模拟失败", id) } log.Printf("任务 %d: 执行步骤 %d\n", id, i+1) } } log.Printf("任务 %d: 正常完成\n", id) return nil }
|
SetLimit
和 TryGo
的使用
在 golang.org/x/sync/errgroup
包中,SetLimit
和 TryGo
方法提供了对并发任务数量更精细的控制,这在资源受限或需要避免过度并发的场景中非常有用。
1. SetLimit(n int)
SetLimit
方法用于限制 errgroup
中同时运行的 goroutine 的最大数量。
- 功能: 设置并发限制。如果
n
为 0,则表示没有并发限制(默认行为)。如果 n
小于 0,SetLimit
会 panic。
- 使用场景:
- 控制资源使用: 例如,当你的任务需要访问有限的数据库连接、文件句柄或网络带宽时,可以通过限制并发数来避免资源耗尽。
- 避免系统过载: 防止启动过多的 goroutine 导致 CPU 争用、内存不足等问题。
- 流量控制: 在处理大量请求时,作为一种简单的流量控制机制。
示例:限制并发数为 2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| package main
import ( "context" "fmt" "log" "sync/errgroup" "time" )
func main() { log.SetFlags(log.Lmicroseconds) fmt.Println("--- 启动任务 (SetLimit = 2) ---")
group, ctx := errgroup.WithContext(context.Background()) group.SetLimit(2)
for i := 0; i < 5; i++ { taskID := i group.Go(func() error { return runLimitedTask(ctx, taskID) }) }
if err := group.Wait(); err != nil { fmt.Printf("--- 所有任务完成,遇到错误: %v ---\n", err) } else { fmt.Println("--- 所有任务成功完成 ---") } }
func runLimitedTask(ctx context.Context, id int) error { log.Printf("任务 %d: 开始执行\n", id) defer log.Printf("任务 %d: 结束执行\n", id)
select { case <-ctx.Done(): log.Printf("任务 %d: 检测到取消信号,正在终止\n", id) return ctx.Err() case <-time.After(time.Duration(2+id) * time.Second): }
if id == 3 { log.Printf("任务 %d: 模拟失败\n", id) return fmt.Errorf("任务 %d 失败", id) }
log.Printf("任务 %d: 正常完成\n", id) return nil }
|
2. TryGo(f func() error) bool
TryGo
方法尝试启动一个新的 goroutine。与 Go
不同的是,如果当前的并发数已经达到 SetLimit
设定的限制,TryGo
会立即返回 false
,而不会阻塞。
功能: 非阻塞地尝试启动一个 goroutine。
- 返回: 如果成功启动 goroutine,返回
true
;如果因达到并发限制而未能启动,返回 false
。
- 使用场景:
- 快速失败/跳过: 当你有一些非关键任务,或者希望在并发数已满时跳过某些任务而不是等待它们时。
- 结合其他调度策略: 可以与 channel 或其他同步原语结合,实现更复杂的任务调度逻辑,例如将未能启动的任务放入队列稍后重试。
- 避免死锁(在某些复杂场景下): 如果你的任务之间存在依赖,并且在并发数满时阻塞
Go
可能会导致死锁,那么 TryGo
可以提供一种非阻塞的选择。
示例:使用 TryGo
尝试启动任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| package main
import ( "context" "fmt" "log" "sync/errgroup" "time" )
func main() { log.SetFlags(log.Lmicroseconds) fmt.Println("--- 启动任务 (SetLimit = 2, 使用 TryGo) ---")
group, ctx := errgroup.WithContext(context.Background()) group.SetLimit(2)
for i := 0; i < 5; i++ { taskID := i if ok := group.TryGo(func() error { return runTryGoTask(ctx, taskID) }); ok { log.Printf("任务 %d: 成功提交\n", taskID) } else { log.Printf("任务 %d: 提交失败,并发限制已满\n", taskID) } }
if err := group.Wait(); err != nil { fmt.Printf("--- 所有任务完成,遇到错误: %v ---\n", err) } else { fmt.Println("--- 所有任务成功完成 ---") } }
func runTryGoTask(ctx context.Context, id int) error { log.Printf("任务 %d: 开始执行\n", id) defer log.Printf("任务 %d: 结束执行\n", id)
select { case <-ctx.Done(): log.Printf("任务 %d: 检测到取消信号,正在终止\n", id) return ctx.Err() case <-time.After(time.Duration(2+id) * time.Second): }
if id == 3 { log.Printf("任务 %d: 模拟失败\n", id) return fmt.Errorf("任务 %d 失败", id) }
log.Printf("任务 %d: 正常完成\n", id) return nil }
|