并发基础
Thread、synchronized、volatile、ExecutorService、CompletableFuture
并发的应用场景
Web 服务每秒处理多个请求、批量处理几万条数据、定时任务、向多个下游服务并发发起 RPC、抓取多个 URL、缓存刷新——这些都需要在一个进程内同时跑多件事。Java 提供了从底层(Thread)到高层(线程池、CompletableFuture)的完整工具链。
- 底层:Thread / Runnable — 已不直接使用
- 线程安全原语:synchronized / volatile / java.util.concurrent.locks.*
- 并发集合:ConcurrentHashMap / CopyOnWriteArrayList
- 线程池:ExecutorService(生产代码主力)
- 异步编排:CompletableFuture(链式 + 组合)
- 原子类:AtomicInteger / AtomicReference(无锁计数)
Thread 与 Runnable
最底层的创建线程方式。实际生产里几乎不直接 new Thread——用线程池来管理生命周期。这里仅作为概念入门。
// ThreadBasic.java
public class ThreadBasic {
public static void main(String[] args) throws InterruptedException {
// Runnable 是函数式接口,可以用 lambda
Thread t = new Thread(() -> {
for (int i = 0; i < 3; i++) {
System.out.println("worker tick " + i);
try { Thread.sleep(100); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}, "worker-1");
t.start(); // 启动线程,run() 在新线程跑
t.join(); // main 等 t 跑完再继续
System.out.println("main done");
}
}synchronized 与可见性
多线程读写共享变量必须加锁。synchronized 是最简单的锁——保证同时只有一个线程进入临界区,且对共享变量的修改对其它线程立即可见。没锁的话,结果通常是错的。
// SyncCounter.java
public class SyncCounter {
private int count = 0;
public synchronized void inc() { // 方法级别 synchronized
count++;
}
public synchronized int get() {
return count;
}
public static void main(String[] args) throws InterruptedException {
SyncCounter c = new SyncCounter();
Thread[] ts = new Thread[10];
for (int i = 0; i < ts.length; i++) {
ts[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) c.inc();
});
ts[i].start();
}
for (Thread t : ts) t.join();
System.out.println(c.get()); // 10_000(加了锁)
}
}volatile:仅保证可见性
volatile 保证字段的读写对所有线程立即可见,但**不保证原子性**(i++ 不是原子操作)。常用于"标志位"——一个线程改状态,其它线程感知。
// VolatileFlag.java
public class VolatileFlag {
static volatile boolean running = true;
public static void main(String[] args) throws InterruptedException {
Thread worker = new Thread(() -> {
int i = 0;
while (running) { // 没 volatile 时编译器可能把这个值缓存到寄存器,永不退出
i++;
}
System.out.println("worker stopped after " + i + " loops");
});
worker.start();
Thread.sleep(200);
running = false; // 让 worker 退出
worker.join();
}
}原子类:AtomicInteger
应用场景:并发计数器(请求数、活跃用户数)。比 synchronized 更快——底层用 CPU 的 CAS 指令实现无锁递增。
// AtomicCounter.java
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicCounter {
public static void main(String[] args) throws InterruptedException {
AtomicInteger count = new AtomicInteger();
Thread[] ts = new Thread[10];
for (int i = 0; i < ts.length; i++) {
ts[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) count.incrementAndGet();
});
ts[i].start();
}
for (Thread t : ts) t.join();
System.out.println(count.get()); // 10_000
}
}ExecutorService 线程池(生产主力)
应用场景:处理一批独立任务(爬虫、批量计算、并发 RPC)。不要每个任务 new 一个 Thread——线程创建开销大、上限失控。用线程池统一调度。Executors 工厂提供常用变体;newFixedThreadPool 是 90% 场景的首选。
// PoolDemo.java
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class PoolDemo {
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(4);
List<Future<Integer>> results = new ArrayList<>();
for (int i = 1; i <= 8; i++) {
final int n = i;
results.add(pool.submit(() -> {
Thread.sleep(100);
return n * n;
}));
}
for (Future<Integer> f : results) {
System.out.println(f.get()); // 阻塞等结果
}
// 优雅关停:不再接新任务,等存量跑完
pool.shutdown();
if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
pool.shutdownNow();
}
}
}CompletableFuture:异步组合(最常用)
应用场景:一个接口要并发查 3 个下游,合并结果再返回——典型的 "async / await" 风格。CompletableFuture 让你像写同步代码一样写异步:supplyAsync 启动任务,thenApply 转换,thenCompose 串联,allOf 并行等多个。
// AsyncDemo.java
import java.util.concurrent.CompletableFuture;
public class AsyncDemo {
static int slowQuery(String key) {
try { Thread.sleep(200); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return key.length();
}
public static void main(String[] args) throws Exception {
long t0 = System.currentTimeMillis();
// 并发跑三个慢查询
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> slowQuery("alpha"));
CompletableFuture<Integer> b = CompletableFuture.supplyAsync(() -> slowQuery("beta"));
CompletableFuture<Integer> c = CompletableFuture.supplyAsync(() -> slowQuery("gamma"));
// 合并
CompletableFuture<Integer> sum = a.thenCombine(b, Integer::sum)
.thenCombine(c, Integer::sum);
System.out.println("sum = " + sum.get());
System.out.println("elapsed: " + (System.currentTimeMillis() - t0) + "ms"); // ~200ms 而不是 600ms
// 链式:执行 + 转换 + 副作用
CompletableFuture.supplyAsync(() -> slowQuery("hello"))
.thenApply(n -> n * 10)
.thenAccept(r -> System.out.println("result = " + r))
.get();
}
}并发集合:ConcurrentHashMap
应用场景:多线程共享 map(缓存、计数器、去重表)。普通 HashMap 多线程下会数据错乱甚至死循环。ConcurrentHashMap 是"分段锁 / CAS 优化"的高效并发实现。常用方法:computeIfAbsent(懒初始化)、merge(计数累加)。
// ConcurrentMap.java
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentMap {
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, Integer> counts = new ConcurrentHashMap<>();
Thread[] ts = new Thread[10];
for (int i = 0; i < ts.length; i++) {
ts[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counts.merge("hit", 1, Integer::sum);
}
});
ts[i].start();
}
for (Thread t : ts) t.join();
System.out.println(counts.get("hit")); // 10_000
// computeIfAbsent:懒初始化
ConcurrentHashMap<String, java.util.List<Integer>> buckets = new ConcurrentHashMap<>();
buckets.computeIfAbsent("a", k -> new java.util.concurrent.CopyOnWriteArrayList<>()).add(1);
System.out.println(buckets);
}
}实战:并发 fetch 多个 URL
把 CompletableFuture + 线程池组合起来,构建"并发请求 + 汇总"的标准模式。
// ConcurrentFetch.java
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class ConcurrentFetch {
// 模拟一个会阻塞 200ms 的 "HTTP 请求"
static String fetch(String url) {
try { Thread.sleep(200); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return url + " -> ok";
}
public static void main(String[] args) throws Exception {
List<String> urls = List.of("a.com", "b.com", "c.com", "d.com", "e.com");
ExecutorService pool = Executors.newFixedThreadPool(5);
long t0 = System.currentTimeMillis();
List<CompletableFuture<String>> futures = urls.stream()
.map(u -> CompletableFuture.supplyAsync(() -> fetch(u), pool))
.toList();
// 等所有完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
List<String> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
results.forEach(System.out::println);
System.out.println("elapsed: " + (System.currentTimeMillis() - t0) + "ms"); // ~200ms
pool.shutdown();
}
}