V
Vel·ToolKit
简洁 · 高效 · 即开即用
ZH
第 15 章 / 共 20 章

Channel

无缓冲/有缓冲、select、关闭、worker pool

Channel 基础

channel 是带类型的同步队列,用于 goroutine 之间安全地传递值。无缓冲 channel 是“会合点”:发送方与接收方都必须到达才能继续。

package main

import "fmt"

func main() {
    ch := make(chan int)
    go func() { ch <- 42 }()
    v := <-ch
    fmt.Println(v) // 42
}

有缓冲 channel

make(chan T, n) 创建容量为 n 的缓冲 channel。发送在缓冲未满前不阻塞,接收在缓冲非空前不阻塞。缓冲不是用来“提速”的,而是用来削峰填谷或解耦生产/消费节奏。

package main

import "fmt"

func main() {
    ch := make(chan int, 3)
    ch <- 1
    ch <- 2
    ch <- 3
    // ch <- 4 // 缓冲已满,下一次发送会阻塞
    fmt.Println(len(ch), cap(ch)) // 3 3
}

关闭与 range

close(ch) 表示“不会再有新值”。接收方可用第二返回值 ok 区分“拿到值”和“channel 已关且空”;range 会持续读直到 channel 关闭并清空。

package main

import "fmt"

func main() {
    ch := make(chan int)
    go func() {
        for i := 0; i < 3; i++ {
            ch <- i
        }
        close(ch)
    }()
    for v := range ch {
        fmt.Println(v)
    }

    // 显式判断
    v, ok := <-ch // 已 close 且空,ok=false
    fmt.Println(v, ok)
}

单向 channel

在函数签名里把 channel 限定为只发或只收,可以明确职责,编译期防止误用。双向 channel 可隐式转换为单向,反之不行。

package main

import "fmt"

func producer(out chan<- int) {
    for i := 0; i < 3; i++ {
        out <- i
    }
    close(out)
}

func consumer(in <-chan int) {
    for v := range in {
        fmt.Println(v)
    }
}

func main() {
    ch := make(chan int)
    go producer(ch)
    consumer(ch)
}

select 多路复用

select 随机选择一个就绪的 case 执行;都不就绪则阻塞,存在 default 则立刻走 default。是写超时、取消、多源合并的核心工具。

package main

import (
    "context"
    "fmt"
    "time"
)

func race(ctx context.Context) error {
    ch1 := make(chan int, 1)
    ch2 := make(chan int, 1)
    go func() { ch1 <- 1 }()

    select {
    case v := <-ch1:
        fmt.Println("from ch1:", v)
    case ch2 <- 100:
        fmt.Println("sent to ch2")
    case <-time.After(time.Second):
        fmt.Println("timeout")
    case <-ctx.Done():
        return ctx.Err()
    }
    return nil
}

func main() {
    _ = race(context.Background())
}

nil channel 的妙用

对 nil channel 的发送和接收都永远阻塞,因此在 select 里可以把某个 case 的 channel 置 nil,相当于“临时禁用这一分支”。常用于关闭后停止重复触发某个 case。

package main

import (
    "context"
    "fmt"
    "time"
)

func run(ctx context.Context, resume <-chan struct{}) {
    ticker := time.NewTicker(200 * time.Millisecond)
    defer ticker.Stop()
    tickC := ticker.C // 想暂停时设为 nil
    paused := false
    for {
        select {
        case <-tickC:
            fmt.Println("tick")
            if !paused {
                paused = true
                tickC = nil // 屏蔽这条分支
            }
        case <-resume:
            paused = false
            tickC = ticker.C
        case <-ctx.Done():
            return
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    run(ctx, make(chan struct{}))
}

worker pool(扇出 / 扇入)

多个 goroutine 消费同一个 jobs channel(扇出),结果汇总到一个 results channel(扇入),是 Go 最经典的并发模型。下面是完整可运行版本:

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        time.Sleep(50 * time.Millisecond)
        results <- j * j
    }
}

func main() {
    jobs := make(chan int, 5)
    results := make(chan int, 5)

    var wg sync.WaitGroup
    for w := 1; w <= 3; w++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            worker(id, jobs, results)
        }(w)
    }

    // 发任务
    for j := 1; j <= 5; j++ { jobs <- j }
    close(jobs)

    // 等所有 worker 退出后再关 results
    go func() { wg.Wait(); close(results) }()

    for r := range results { fmt.Println(r) }
}

done 通道 vs context

早期 Go 习惯用 done := make(chan struct{}) + close(done) 广播取消;新代码更推荐 context.Context——它把取消、超时、值传递统一在一个标准接口里,并能跨 RPC/HTTP 自动传播。

package main

import (
    "fmt"
    "time"
)

func main() {
    done := make(chan struct{})
    for i := 0; i < 3; i++ {
        i := i
        go func() {
            <-done
            fmt.Println("bye", i)
        }()
    }
    close(done) // 一次关,所有等待者同时收到
    time.Sleep(100 * time.Millisecond)
}

常见陷阱

  • 向未初始化的 channel(var ch chan int)发送/接收:永远阻塞
  • 多个 goroutine 写同一个 channel 又都想关闭它:用 sync.Once 包一下 close
  • 想做无界队列:用 channel + slice 缓冲,或直接用切片 + Mutex,不要无限增大 channel buffer
  • select 里多 case 同时就绪是随机的,不要依赖顺序