V
Vel·ToolKit
简洁 · 高效 · 即开即用
ZH
第 19 章 / 共 20 章

并发基础

Thread、synchronized、volatile、ExecutorService、CompletableFuture

并发的应用场景

Web 服务每秒处理多个请求、批量处理几万条数据、定时任务、向多个下游服务并发发起 RPC、抓取多个 URL、缓存刷新——这些都需要在一个进程内同时跑多件事。Java 提供了从底层(Thread)到高层(线程池、CompletableFuture)的完整工具链。

  • 底层:Thread / Runnable — 已不直接使用
  • 线程安全原语:synchronized / volatile / java.util.concurrent.locks.*
  • 并发集合:ConcurrentHashMap / CopyOnWriteArrayList
  • 线程池:ExecutorService(生产代码主力)
  • 异步编排:CompletableFuture(链式 + 组合)
  • 原子类:AtomicInteger / AtomicReference(无锁计数)

Thread 与 Runnable

最底层的创建线程方式。实际生产里几乎不直接 new Thread——用线程池来管理生命周期。这里仅作为概念入门。

// ThreadBasic.java
public class ThreadBasic {
    public static void main(String[] args) throws InterruptedException {
        // Runnable 是函数式接口,可以用 lambda
        Thread t = new Thread(() -> {
            for (int i = 0; i < 3; i++) {
                System.out.println("worker tick " + i);
                try { Thread.sleep(100); } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }, "worker-1");

        t.start();         // 启动线程,run() 在新线程跑
        t.join();          // main 等 t 跑完再继续
        System.out.println("main done");
    }
}

synchronized 与可见性

多线程读写共享变量必须加锁。synchronized 是最简单的锁——保证同时只有一个线程进入临界区,且对共享变量的修改对其它线程立即可见。没锁的话,结果通常是错的。

// SyncCounter.java
public class SyncCounter {
    private int count = 0;

    public synchronized void inc() {  // 方法级别 synchronized
        count++;
    }

    public synchronized int get() {
        return count;
    }

    public static void main(String[] args) throws InterruptedException {
        SyncCounter c = new SyncCounter();
        Thread[] ts = new Thread[10];
        for (int i = 0; i < ts.length; i++) {
            ts[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) c.inc();
            });
            ts[i].start();
        }
        for (Thread t : ts) t.join();
        System.out.println(c.get());  // 10_000(加了锁)
    }
}

volatile:仅保证可见性

volatile 保证字段的读写对所有线程立即可见,但**不保证原子性**(i++ 不是原子操作)。常用于"标志位"——一个线程改状态,其它线程感知。

// VolatileFlag.java
public class VolatileFlag {
    static volatile boolean running = true;

    public static void main(String[] args) throws InterruptedException {
        Thread worker = new Thread(() -> {
            int i = 0;
            while (running) {  // 没 volatile 时编译器可能把这个值缓存到寄存器,永不退出
                i++;
            }
            System.out.println("worker stopped after " + i + " loops");
        });
        worker.start();

        Thread.sleep(200);
        running = false;       // 让 worker 退出
        worker.join();
    }
}

原子类:AtomicInteger

应用场景:并发计数器(请求数、活跃用户数)。比 synchronized 更快——底层用 CPU 的 CAS 指令实现无锁递增。

// AtomicCounter.java
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicCounter {
    public static void main(String[] args) throws InterruptedException {
        AtomicInteger count = new AtomicInteger();

        Thread[] ts = new Thread[10];
        for (int i = 0; i < ts.length; i++) {
            ts[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) count.incrementAndGet();
            });
            ts[i].start();
        }
        for (Thread t : ts) t.join();
        System.out.println(count.get());  // 10_000
    }
}

ExecutorService 线程池(生产主力)

应用场景:处理一批独立任务(爬虫、批量计算、并发 RPC)。不要每个任务 new 一个 Thread——线程创建开销大、上限失控。用线程池统一调度。Executors 工厂提供常用变体;newFixedThreadPool 是 90% 场景的首选。

// PoolDemo.java
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class PoolDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(4);

        List<Future<Integer>> results = new ArrayList<>();
        for (int i = 1; i <= 8; i++) {
            final int n = i;
            results.add(pool.submit(() -> {
                Thread.sleep(100);
                return n * n;
            }));
        }

        for (Future<Integer> f : results) {
            System.out.println(f.get());   // 阻塞等结果
        }

        // 优雅关停:不再接新任务,等存量跑完
        pool.shutdown();
        if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
            pool.shutdownNow();
        }
    }
}

CompletableFuture:异步组合(最常用)

应用场景:一个接口要并发查 3 个下游,合并结果再返回——典型的 "async / await" 风格。CompletableFuture 让你像写同步代码一样写异步:supplyAsync 启动任务,thenApply 转换,thenCompose 串联,allOf 并行等多个。

// AsyncDemo.java
import java.util.concurrent.CompletableFuture;

public class AsyncDemo {
    static int slowQuery(String key) {
        try { Thread.sleep(200); } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return key.length();
    }

    public static void main(String[] args) throws Exception {
        long t0 = System.currentTimeMillis();

        // 并发跑三个慢查询
        CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> slowQuery("alpha"));
        CompletableFuture<Integer> b = CompletableFuture.supplyAsync(() -> slowQuery("beta"));
        CompletableFuture<Integer> c = CompletableFuture.supplyAsync(() -> slowQuery("gamma"));

        // 合并
        CompletableFuture<Integer> sum = a.thenCombine(b, Integer::sum)
                                          .thenCombine(c, Integer::sum);
        System.out.println("sum = " + sum.get());

        System.out.println("elapsed: " + (System.currentTimeMillis() - t0) + "ms"); // ~200ms 而不是 600ms

        // 链式:执行 + 转换 + 副作用
        CompletableFuture.supplyAsync(() -> slowQuery("hello"))
            .thenApply(n -> n * 10)
            .thenAccept(r -> System.out.println("result = " + r))
            .get();
    }
}

并发集合:ConcurrentHashMap

应用场景:多线程共享 map(缓存、计数器、去重表)。普通 HashMap 多线程下会数据错乱甚至死循环。ConcurrentHashMap 是"分段锁 / CAS 优化"的高效并发实现。常用方法:computeIfAbsent(懒初始化)、merge(计数累加)。

// ConcurrentMap.java
import java.util.concurrent.ConcurrentHashMap;

public class ConcurrentMap {
    public static void main(String[] args) throws InterruptedException {
        ConcurrentHashMap<String, Integer> counts = new ConcurrentHashMap<>();

        Thread[] ts = new Thread[10];
        for (int i = 0; i < ts.length; i++) {
            ts[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    counts.merge("hit", 1, Integer::sum);
                }
            });
            ts[i].start();
        }
        for (Thread t : ts) t.join();
        System.out.println(counts.get("hit")); // 10_000

        // computeIfAbsent:懒初始化
        ConcurrentHashMap<String, java.util.List<Integer>> buckets = new ConcurrentHashMap<>();
        buckets.computeIfAbsent("a", k -> new java.util.concurrent.CopyOnWriteArrayList<>()).add(1);
        System.out.println(buckets);
    }
}

实战:并发 fetch 多个 URL

把 CompletableFuture + 线程池组合起来,构建"并发请求 + 汇总"的标准模式。

// ConcurrentFetch.java
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class ConcurrentFetch {
    // 模拟一个会阻塞 200ms 的 "HTTP 请求"
    static String fetch(String url) {
        try { Thread.sleep(200); } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return url + " -> ok";
    }

    public static void main(String[] args) throws Exception {
        List<String> urls = List.of("a.com", "b.com", "c.com", "d.com", "e.com");
        ExecutorService pool = Executors.newFixedThreadPool(5);
        long t0 = System.currentTimeMillis();

        List<CompletableFuture<String>> futures = urls.stream()
            .map(u -> CompletableFuture.supplyAsync(() -> fetch(u), pool))
            .toList();

        // 等所有完成
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

        List<String> results = futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
        results.forEach(System.out::println);

        System.out.println("elapsed: " + (System.currentTimeMillis() - t0) + "ms"); // ~200ms
        pool.shutdown();
    }
}