V
Vel·ToolKit
简洁 · 高效 · 即开即用
ZH
第 14 章 / 共 20 章

Goroutine

go 关键字、调度、WaitGroup、context 取消

启动 goroutine

go 关键字让函数在一个新 goroutine 中并发执行。goroutine 是 Go 运行时调度的“用户态线程”:初始栈仅 2KB 并按需增长,可以在单进程内同时跑数十万个,远比 OS 线程便宜。

package main

import (
    "fmt"
    "time"
)

func handle(id int) {
    fmt.Println("handling", id)
}

func main() {
    go func() {
        fmt.Println("running in another goroutine")
    }()
    go handle(42)

    // 给 goroutine 一点时间执行;下一节用 WaitGroup 替代
    time.Sleep(100 * time.Millisecond)
}

GMP 调度模型一句话

G(goroutine)由 M(OS 线程)执行,由 P(逻辑处理器)持有可运行队列。GOMAXPROCS 控制 P 的数量,默认等于 CPU 核数;当一个 G 阻塞在系统调用时,Go 会自动把 P 转交给另一个 M 继续跑剩下的 G,不会拖累其它 goroutine。

package main

import (
    "fmt"
    "runtime"
)

func main() {
    fmt.Println(runtime.NumCPU(), runtime.GOMAXPROCS(0))
    runtime.GOMAXPROCS(4) // 一般不用手动改
}

等待完成:sync.WaitGroup

main 退出后所有 goroutine 都会被强制终止。要等子任务完成,使用 WaitGroup:Add 在启动前调用,Done 用 defer 保证执行,Wait 阻塞直到计数归零。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            fmt.Println("task", i)
        }(i)
    }
    wg.Wait()
}

errgroup:带错误传播的并发组

golang.org/x/sync/errgroup 是 WaitGroup 的“可返回 error”增强版:任何一个 goroutine 报错都会自动取消整组,常用于并发 RPC、批量爬取。

package main

import (
    "context"
    "log"
    "net/http"

    "golang.org/x/sync/errgroup"
)

func fetchAll(ctx context.Context, urls []string) error {
    g, ctx := errgroup.WithContext(ctx)
    for _, url := range urls {
        url := url
        g.Go(func() error {
            req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return err
            }
            resp.Body.Close()
            return nil
        })
    }
    return g.Wait()
}

func main() {
    err := fetchAll(context.Background(), []string{
        "https://example.com",
        "https://example.org",
    })
    if err != nil {
        log.Printf("group failed: %v", err)
    }
}

用 context 取消 goroutine

Go 没有“杀死 goroutine”的 API。终止子任务的标准方式是传入 context.Context,子任务自己监听 ctx.Done() 退出。这种合作式取消让资源清理可控。

package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context, id int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("worker %d stop: %v\n", id, ctx.Err())
            return
        case <-time.After(200 * time.Millisecond):
            fmt.Printf("worker %d tick\n", id)
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    go worker(ctx, 1)
    <-ctx.Done()
    time.Sleep(50 * time.Millisecond) // 让 worker 打印完最后一行
}

panic 不会跨 goroutine

一个 goroutine 内部的 panic 如果不被 recover 捕获,会让整个进程崩掉,而不是只死掉这个 goroutine。生产中要在每个长期运行的 goroutine 入口处做兜底 recover。

package main

import (
    "log"
    "runtime/debug"
    "time"
)

func safeGo(fn func()) {
    go func() {
        defer func() {
            if r := recover(); r != nil {
                log.Printf("goroutine panic: %v\n%s", r, debug.Stack())
            }
        }()
        fn()
    }()
}

func main() {
    safeGo(func() {
        panic("boom in goroutine")
    })
    time.Sleep(100 * time.Millisecond)
}

竞态检测

并发写共享变量必须加锁或用 channel。开发期一定要打开 race detector,它能在运行时记录所有内存访问并发现没有 happens-before 关系的读写。

$ go run -race main.go
$ go test -race ./...
$ go build -race -o app   # 二进制带 race 跑灰度环境也可

sync.Mutex / RWMutex

Mutex 是互斥锁,RWMutex 允许多个读者并发但写者独占;读多写少且临界区不小时再上 RWMutex,否则普通 Mutex 更快。

package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    mu sync.RWMutex
    m  map[string]int
}

func NewCounter() *Counter { return &Counter{m: map[string]int{}} }

func (c *Counter) Get(k string) int {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.m[k]
}

func (c *Counter) Inc(k string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.m[k]++
}

func main() {
    c := NewCounter()
    var wg sync.WaitGroup
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            c.Inc("hits")
        }()
    }
    wg.Wait()
    fmt.Println(c.Get("hits")) // 100
}

sync.Once 与 atomic

sync.Once 保证某段初始化代码只跑一次(即使被多个 goroutine 同时触发);sync/atomic 提供无锁的整数 / 指针操作,用于计数、状态位等热点场景。

package main

import (
    "fmt"
    "net/http"
    "sync"
    "sync/atomic"
    "time"
)

var (
    once   sync.Once
    client *http.Client
)

func Client() *http.Client {
    once.Do(func() {
        client = &http.Client{Timeout: 5 * time.Second}
    })
    return client
}

func main() {
    fmt.Println(Client() == Client()) // true,同一个实例

    var hit atomic.Int64
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            hit.Add(1)
        }()
    }
    wg.Wait()
    fmt.Println(hit.Load()) // 1000
}