Goroutine
go 关键字、调度、WaitGroup、context 取消
启动 goroutine
go 关键字让函数在一个新 goroutine 中并发执行。goroutine 是 Go 运行时调度的“用户态线程”:初始栈仅 2KB 并按需增长,可以在单进程内同时跑数十万个,远比 OS 线程便宜。
package main
import (
"fmt"
"time"
)
func handle(id int) {
fmt.Println("handling", id)
}
func main() {
go func() {
fmt.Println("running in another goroutine")
}()
go handle(42)
// 给 goroutine 一点时间执行;下一节用 WaitGroup 替代
time.Sleep(100 * time.Millisecond)
}GMP 调度模型一句话
G(goroutine)由 M(OS 线程)执行,由 P(逻辑处理器)持有可运行队列。GOMAXPROCS 控制 P 的数量,默认等于 CPU 核数;当一个 G 阻塞在系统调用时,Go 会自动把 P 转交给另一个 M 继续跑剩下的 G,不会拖累其它 goroutine。
package main
import (
"fmt"
"runtime"
)
func main() {
fmt.Println(runtime.NumCPU(), runtime.GOMAXPROCS(0))
runtime.GOMAXPROCS(4) // 一般不用手动改
}等待完成:sync.WaitGroup
main 退出后所有 goroutine 都会被强制终止。要等子任务完成,使用 WaitGroup:Add 在启动前调用,Done 用 defer 保证执行,Wait 阻塞直到计数归零。
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
fmt.Println("task", i)
}(i)
}
wg.Wait()
}errgroup:带错误传播的并发组
golang.org/x/sync/errgroup 是 WaitGroup 的“可返回 error”增强版:任何一个 goroutine 报错都会自动取消整组,常用于并发 RPC、批量爬取。
package main
import (
"context"
"log"
"net/http"
"golang.org/x/sync/errgroup"
)
func fetchAll(ctx context.Context, urls []string) error {
g, ctx := errgroup.WithContext(ctx)
for _, url := range urls {
url := url
g.Go(func() error {
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
resp.Body.Close()
return nil
})
}
return g.Wait()
}
func main() {
err := fetchAll(context.Background(), []string{
"https://example.com",
"https://example.org",
})
if err != nil {
log.Printf("group failed: %v", err)
}
}用 context 取消 goroutine
Go 没有“杀死 goroutine”的 API。终止子任务的标准方式是传入 context.Context,子任务自己监听 ctx.Done() 退出。这种合作式取消让资源清理可控。
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("worker %d stop: %v\n", id, ctx.Err())
return
case <-time.After(200 * time.Millisecond):
fmt.Printf("worker %d tick\n", id)
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
go worker(ctx, 1)
<-ctx.Done()
time.Sleep(50 * time.Millisecond) // 让 worker 打印完最后一行
}panic 不会跨 goroutine
一个 goroutine 内部的 panic 如果不被 recover 捕获,会让整个进程崩掉,而不是只死掉这个 goroutine。生产中要在每个长期运行的 goroutine 入口处做兜底 recover。
package main
import (
"log"
"runtime/debug"
"time"
)
func safeGo(fn func()) {
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("goroutine panic: %v\n%s", r, debug.Stack())
}
}()
fn()
}()
}
func main() {
safeGo(func() {
panic("boom in goroutine")
})
time.Sleep(100 * time.Millisecond)
}竞态检测
并发写共享变量必须加锁或用 channel。开发期一定要打开 race detector,它能在运行时记录所有内存访问并发现没有 happens-before 关系的读写。
$ go run -race main.go
$ go test -race ./...
$ go build -race -o app # 二进制带 race 跑灰度环境也可sync.Mutex / RWMutex
Mutex 是互斥锁,RWMutex 允许多个读者并发但写者独占;读多写少且临界区不小时再上 RWMutex,否则普通 Mutex 更快。
package main
import (
"fmt"
"sync"
)
type Counter struct {
mu sync.RWMutex
m map[string]int
}
func NewCounter() *Counter { return &Counter{m: map[string]int{}} }
func (c *Counter) Get(k string) int {
c.mu.RLock()
defer c.mu.RUnlock()
return c.m[k]
}
func (c *Counter) Inc(k string) {
c.mu.Lock()
defer c.mu.Unlock()
c.m[k]++
}
func main() {
c := NewCounter()
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
c.Inc("hits")
}()
}
wg.Wait()
fmt.Println(c.Get("hits")) // 100
}sync.Once 与 atomic
sync.Once 保证某段初始化代码只跑一次(即使被多个 goroutine 同时触发);sync/atomic 提供无锁的整数 / 指针操作,用于计数、状态位等热点场景。
package main
import (
"fmt"
"net/http"
"sync"
"sync/atomic"
"time"
)
var (
once sync.Once
client *http.Client
)
func Client() *http.Client {
once.Do(func() {
client = &http.Client{Timeout: 5 * time.Second}
})
return client
}
func main() {
fmt.Println(Client() == Client()) // true,同一个实例
var hit atomic.Int64
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
hit.Add(1)
}()
}
wg.Wait()
fmt.Println(hit.Load()) // 1000
}