第 15 章 / 共 20 章
Channel
无缓冲/有缓冲、select、关闭、worker pool
Channel 基础
channel 是带类型的同步队列,用于 goroutine 之间安全地传递值。无缓冲 channel 是“会合点”:发送方与接收方都必须到达才能继续。
package main
import "fmt"
func main() {
ch := make(chan int)
go func() { ch <- 42 }()
v := <-ch
fmt.Println(v) // 42
}有缓冲 channel
make(chan T, n) 创建容量为 n 的缓冲 channel。发送在缓冲未满前不阻塞,接收在缓冲非空前不阻塞。缓冲不是用来“提速”的,而是用来削峰填谷或解耦生产/消费节奏。
package main
import "fmt"
func main() {
ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3
// ch <- 4 // 缓冲已满,下一次发送会阻塞
fmt.Println(len(ch), cap(ch)) // 3 3
}关闭与 range
close(ch) 表示“不会再有新值”。接收方可用第二返回值 ok 区分“拿到值”和“channel 已关且空”;range 会持续读直到 channel 关闭并清空。
package main
import "fmt"
func main() {
ch := make(chan int)
go func() {
for i := 0; i < 3; i++ {
ch <- i
}
close(ch)
}()
for v := range ch {
fmt.Println(v)
}
// 显式判断
v, ok := <-ch // 已 close 且空,ok=false
fmt.Println(v, ok)
}单向 channel
在函数签名里把 channel 限定为只发或只收,可以明确职责,编译期防止误用。双向 channel 可隐式转换为单向,反之不行。
package main
import "fmt"
func producer(out chan<- int) {
for i := 0; i < 3; i++ {
out <- i
}
close(out)
}
func consumer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
ch := make(chan int)
go producer(ch)
consumer(ch)
}select 多路复用
select 随机选择一个就绪的 case 执行;都不就绪则阻塞,存在 default 则立刻走 default。是写超时、取消、多源合并的核心工具。
package main
import (
"context"
"fmt"
"time"
)
func race(ctx context.Context) error {
ch1 := make(chan int, 1)
ch2 := make(chan int, 1)
go func() { ch1 <- 1 }()
select {
case v := <-ch1:
fmt.Println("from ch1:", v)
case ch2 <- 100:
fmt.Println("sent to ch2")
case <-time.After(time.Second):
fmt.Println("timeout")
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func main() {
_ = race(context.Background())
}nil channel 的妙用
对 nil channel 的发送和接收都永远阻塞,因此在 select 里可以把某个 case 的 channel 置 nil,相当于“临时禁用这一分支”。常用于关闭后停止重复触发某个 case。
package main
import (
"context"
"fmt"
"time"
)
func run(ctx context.Context, resume <-chan struct{}) {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
tickC := ticker.C // 想暂停时设为 nil
paused := false
for {
select {
case <-tickC:
fmt.Println("tick")
if !paused {
paused = true
tickC = nil // 屏蔽这条分支
}
case <-resume:
paused = false
tickC = ticker.C
case <-ctx.Done():
return
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
run(ctx, make(chan struct{}))
}worker pool(扇出 / 扇入)
多个 goroutine 消费同一个 jobs channel(扇出),结果汇总到一个 results channel(扇入),是 Go 最经典的并发模型。下面是完整可运行版本:
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
time.Sleep(50 * time.Millisecond)
results <- j * j
}
}
func main() {
jobs := make(chan int, 5)
results := make(chan int, 5)
var wg sync.WaitGroup
for w := 1; w <= 3; w++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(id, jobs, results)
}(w)
}
// 发任务
for j := 1; j <= 5; j++ { jobs <- j }
close(jobs)
// 等所有 worker 退出后再关 results
go func() { wg.Wait(); close(results) }()
for r := range results { fmt.Println(r) }
}done 通道 vs context
早期 Go 习惯用 done := make(chan struct{}) + close(done) 广播取消;新代码更推荐 context.Context——它把取消、超时、值传递统一在一个标准接口里,并能跨 RPC/HTTP 自动传播。
package main
import (
"fmt"
"time"
)
func main() {
done := make(chan struct{})
for i := 0; i < 3; i++ {
i := i
go func() {
<-done
fmt.Println("bye", i)
}()
}
close(done) // 一次关,所有等待者同时收到
time.Sleep(100 * time.Millisecond)
}常见陷阱
- 向未初始化的 channel(var ch chan int)发送/接收:永远阻塞
- 多个 goroutine 写同一个 channel 又都想关闭它:用 sync.Once 包一下 close
- 想做无界队列:用 channel + slice 缓冲,或直接用切片 + Mutex,不要无限增大 channel buffer
- select 里多 case 同时就绪是随机的,不要依赖顺序