构建不泄漏的并发流水线:Fan-out、Fan-in 与取消
并发流水线把一项工作拆成多个 stage,每个 stage 从上游接收数据、处理后发给下游。它适合流式处理和阶段吞吐差异明显的任务,但也有一个经典故障:下游提前返回,上游永远阻塞在发送。
一个可用的流水线必须同时设计数据流和取消流。
flowchart LR
Source[Source] --> Parse1[Parse]
Source --> Parse2[Parse]
Parse1 --> Merge[Fan-in]
Parse2 --> Merge
Merge --> Store[Store]
Cancel[Context cancel] -.-> Source
Cancel -.-> Parse1
Cancel -.-> Parse2
Cancel -.-> Store
Stage 的基本契约
用泛型可以写出一个简单 stage:
func Map[T, R any](
ctx context.Context,
input <-chan T,
transform func(context.Context, T) (R, error),
) (<-chan R, <-chan error) {
output := make(chan R)
errorsCh := make(chan error, 1)
go func() {
defer close(output)
defer close(errorsCh)
for {
var value T
var ok bool
select {
case <-ctx.Done():
return
case value, ok = <-input:
if !ok {
return
}
}
result, err := transform(ctx, value)
if err != nil {
errorsCh <- err
return
}
select {
case output <- result:
case <-ctx.Done():
return
}
}
}()
return output, errorsCh
}它体现了几个契约:
- stage 自己创建 output,所以自己负责关闭;
- 接收 input 的 stage 不关闭 input;
- 向下游发送时同时监听取消;
- error channel 容量为 1,避免报告错误时因无人立即接收而阻塞。
实际 API 往往把值与错误合并成 Result[T],或用 errgroup 管理整条流水线,避免每个 stage 再配一条错误 channel。
Source 也必须能取消
func Generate[T any](ctx context.Context, values ...T) <-chan T {
output := make(chan T)
go func() {
defer close(output)
for _, value := range values {
select {
case output <- value:
case <-ctx.Done():
return
}
}
}()
return output
}如果只在 stage 接收时检查 Context,却在发送时直接 output <- value,下游停止读取后仍会泄漏。每个潜在阻塞点都要能观察取消。
Fan-out:多个 Worker 读取同一输入
多个 goroutine 同时 range 同一个 channel,channel 会把每个值交给其中一个接收者:
func Workers(
ctx context.Context,
count int,
jobs <-chan Job,
) []<-chan Result {
outputs := make([]<-chan Result, count)
for i := range count {
outputs[i] = worker(ctx, jobs)
}
return outputs
}这适合每个任务相互独立、顺序不重要、单个任务成本相近的场景。它不保证输出顺序。需要恢复顺序时,可以给输入附带序号并在下游重排,但重排缓冲会重新引入内存上界问题。
如果少数任务特别慢,静态地给每个 worker 分一批数据容易拖尾;共享输入 channel 可以让空闲 worker 自然领取下一项。
Fan-in:合并多个输出
func Merge[T any](
ctx context.Context,
inputs ...<-chan T,
) <-chan T {
output := make(chan T)
var wg sync.WaitGroup
wg.Add(len(inputs))
for _, input := range inputs {
input := input
go func() {
defer wg.Done()
for {
var value T
var ok bool
select {
case <-ctx.Done():
return
case value, ok = <-input:
if !ok {
return
}
}
select {
case output <- value:
case <-ctx.Done():
return
}
}
}()
}
go func() {
wg.Wait()
close(output)
}()
return output
}不能让每个转发 goroutine各自 close(output),因为多个发送方中任意一个先结束都不代表其他发送方结束。单独的协调 goroutine 等全部转发完成后关闭。
提前退出是泄漏高发点
for result := range results {
if result.Match {
return result // 上游可能还在发送
}
}调用方一旦提前返回,必须先取消整条流水线:
ctx, cancel := context.WithCancel(parent)
defer cancel()
for result := range results {
if result.Match {
return result
}
}defer cancel() 会在函数返回时执行。前提是所有 stage 在发送、接收或外部 I/O 时都传播 ctx。只在最外层创建 Context,而 stage 仍无条件发送,取消不会自动打断 channel 操作。
用 errgroup 管理整条流水线
当 stage 数量固定、错误需要首错取消时,errgroup 比“函数返回 channel”更容易建立清楚的生命周期:
func RunPipeline(ctx context.Context, input <-chan Item) error {
group, ctx := errgroup.WithContext(ctx)
parsed := make(chan Parsed)
group.Go(func() error {
defer close(parsed)
for item := range input {
value, err := parse(item)
if err != nil {
return err
}
select {
case parsed <- value:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
group.Go(func() error {
for value := range parsed {
if err := store(ctx, value); err != nil {
return err
}
}
return nil
})
return group.Wait()
}任一 stage 失败会取消 ctx,其他 stage 在阻塞点退出。Wait 确保函数返回时所有 stage 已收敛。
外部 input 的所有权
示例没有关闭 input,因为它不是本函数创建的。如果某个 stage 直接使用 for range input,而 input 在取消后没有关闭,该 stage 仍可能阻塞,无法观察 ctx。
需要改成显式 select 接收:
for {
select {
case <-ctx.Done():
return ctx.Err()
case item, ok := <-input:
if !ok {
return nil
}
// process item
}
}这是流水线代码里很容易漏掉的一层:接收本身也是阻塞点。
缓冲放在哪里
无缓冲 channel 提供最强背压:下游没有接收,上游就不能继续。缓冲可以吸收阶段之间的短暂速度差,但每个缓冲都增加在途数据和取消后的清理量。
优先从无缓冲开始,只在 profile 或吞吐实验表明阶段之间存在可被缓冲吸收的抖动时增加容量。把每段都设成 1000,不会让最慢 stage 变快,只会让问题晚一点暴露。
什么时候不用流水线
- 只有少量固定并发调用:errgroup 更简单;
- 任务需要可靠持久化:使用外部消息系统;
- 阶段之间共享复杂可变状态:锁和明确服务对象可能更清楚;
- 整批数据很小:顺序循环的维护成本最低。
流水线不是“高级 Go”的展示品。只有当流式处理、阶段隔离或背压确实属于问题本身时,它才会让设计更简单。