结构化并发:WaitGroup、errgroup 与并发任务收敛
并发最难维护的形态不是 goroutine 多,而是任务离开了调用栈:函数已经返回,子任务还在运行;某个子任务失败,兄弟任务毫不知情;调用方不知道什么时候资源才真正释放。
结构化并发要求并发任务形成清楚的父子范围:父调用启动子任务,在离开这个范围前等待它们结束;取消和错误沿范围传播。
flowchart TD
P[父调用] --> A[任务 A]
P --> B[任务 B]
P --> C[任务 C]
A --> J[Wait / 收敛点]
B --> J
C --> J
J --> R[父调用返回]
WaitGroup 只负责等待
经典模式:
var wg sync.WaitGroup
for _, item := range items {
item := item
wg.Add(1)
go func() {
defer wg.Done()
process(item)
}()
}
wg.Wait()Add 要在启动 goroutine 之前执行,否则新 goroutine 可能还没 Add,主 goroutine 的 Wait 已经返回。
Go 1.25 起可以使用 WaitGroup.Go 消除 Add/Done 配对:
var wg sync.WaitGroup
for _, item := range items {
item := item
wg.Go(func() {
process(item)
})
}
wg.Wait()传给 wg.Go 的函数不应 panic。WaitGroup 不收集错误,也不自动取消兄弟任务。如果任务可能失败,需要自己增加错误通道、锁或其他协调,这通常就是应该考虑 errgroup 的信号。
手写错误收集为什么容易失控
errorsCh := make(chan error, len(items))
var wg sync.WaitGroup
for _, item := range items {
item := item
wg.Add(1)
go func() {
defer wg.Done()
errorsCh <- process(item)
}()
}
wg.Wait()
close(errorsCh)这段代码还要决定:
- nil error 是否发送;
- 返回第一个错误还是合并全部错误;
- 一个任务失败后,其他任务是否继续;
- 调用方取消时,任务如何停止;
- channel 容量是否可能导致发送阻塞。
不是不能手写,而是这些决定已经构成一个并发协议。对“任一失败则取消全部”的任务组,golang.org/x/sync/errgroup 已经提供了成熟实现。
errgroup 把错误和取消绑在一起
group, ctx := errgroup.WithContext(parent)
for _, id := range orderIDs {
id := id
group.Go(func() error {
return rebuildIndex(ctx, id)
})
}
if err := group.Wait(); err != nil {
return fmt.Errorf("rebuild indexes: %w", err)
}WithContext 返回的 Context 会在第一个任务返回非 nil error 时取消,也会在 Wait 返回时取消。任务必须真正使用这个 ctx,取消才有意义。
func rebuildIndex(ctx context.Context, id string) error {
rows, err := db.QueryContext(ctx, query, id)
// ...
}errgroup 返回第一个非 nil 错误。它适合“一个失败,整个组合操作就失败”的语义;批处理若要求记录每个条目的结果,不应把所有错误都压成第一个。
结果收集要避免并发 append
var results []Result
group.Go(func() error {
result := compute()
results = append(results, result) // 数据竞争
return nil
})如果输入数量固定,预分配并让每个任务写唯一索引最简单:
results := make([]Result, len(inputs))
group, ctx := errgroup.WithContext(parent)
for i, input := range inputs {
i, input := i, input
group.Go(func() error {
result, err := compute(ctx, input)
if err != nil {
return err
}
results[i] = result
return nil
})
}
if err := group.Wait(); err != nil {
return nil, err
}
return results, nil不同 goroutine 写不同数组元素可以避免对同一内存位置的竞争;切片长度和底层数组不能在期间改变。返回结果前等待全部任务,确保写入已经完成。
SetLimit 把并发上限放进任务组
group, ctx := errgroup.WithContext(parent)
group.SetLimit(8)
for _, item := range items {
item := item
group.Go(func() error {
return process(ctx, item)
})
}
return group.Wait()当已有 8 个任务运行时,后续 Go 调用会阻塞,直到出现空位。限制值不应在任务仍活跃时修改。
注意这个阻塞发生在提交任务的 goroutine。如果提交本身也必须响应 Context,可以在循环中先检查 ctx,或者使用更显式的队列/worker pool。
TryGo 在没有空位时返回 false,适合调用方拥有清晰降级策略的场景。不要把 false 静默当作成功,否则任务会无声丢失。
什么时候不该取消兄弟任务
假设批量发送 100 封通知,某一封失败不代表其他 99 封应该取消。此时可以让每个任务写入独立结果,WaitGroup 等待全部完成:
type Outcome struct {
ID string
Err error
}
outcomes := make([]Outcome, len(messages))
var wg sync.WaitGroup
for i, message := range messages {
i, message := i, message
wg.Add(1)
go func() {
defer wg.Done()
outcomes[i] = Outcome{
ID: message.ID,
Err: send(parent, message),
}
}()
}
wg.Wait()结构化并发不等于“第一个失败就取消”,而是父调用明确拥有任务范围和收敛点。错误策略由业务决定。
panic 如何处理
普通 goroutine 中未恢复的 panic 会让整个进程崩溃。不要期待 WaitGroup 或 errgroup 自动把 panic 变成 error。
如果任务来自不可信插件或必须隔离的作业,可以在任务边界 recover、记录堆栈并转换错误;普通内部代码更适合让 panic 暴露程序错误,而不是悄悄吞掉。
结构化并发的代码形状
一段容易维护的并发代码通常具备这些特征:
Go和Wait出现在可见的同一范围;- 子任务继承父 Context;
- 错误策略明确:首错取消、全部收集或允许部分成功;
- 结果写入没有共享 append;
- 并发上限是配置,不由输入规模决定;
- 父函数返回时,子任务已经结束。
这比“到处启动 goroutine,再用日志观察它们”可靠得多。