Concurrency
Thread, synchronized, volatile, ExecutorService, CompletableFuture
Where Concurrency Is Used
A web service handling multiple requests per second, batch-processing tens of thousands of records, scheduled tasks, firing concurrent RPCs to several downstreams, crawling multiple URLs, cache refresh — all need several things running at once in one process. Java provides a full toolchain from low-level (Thread) to high-level (thread pools, CompletableFuture).
- Low-level: Thread / Runnable — no longer used directly
- Thread-safety primitives: synchronized / volatile / java.util.concurrent.locks.*
- Concurrent collections: ConcurrentHashMap / CopyOnWriteArrayList
- Thread pools: ExecutorService (the workhorse in production code)
- Async orchestration: CompletableFuture (chaining + composition)
- Atomic classes: AtomicInteger / AtomicReference (lock-free counting)
Thread and Runnable
The lowest-level way to create a thread. Production code almost never new Thread directly — use a thread pool to manage the lifecycle. This is just a conceptual intro.
// ThreadBasic.java
public class ThreadBasic {
public static void main(String[] args) throws InterruptedException {
// Runnable is a functional interface, you can use a lambda
Thread t = new Thread(() -> {
for (int i = 0; i < 3; i++) {
System.out.println("worker tick " + i);
try { Thread.sleep(100); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}, "worker-1");
t.start(); // start the thread; run() executes on the new thread
t.join(); // main waits for t to finish before continuing
System.out.println("main done");
}
}synchronized and Visibility
Reading/writing a shared variable across threads must be locked. synchronized is the simplest lock — it ensures only one thread enters the critical section at a time and that changes to shared variables are immediately visible to other threads. Without a lock the result is usually wrong.
// SyncCounter.java
public class SyncCounter {
private int count = 0;
public synchronized void inc() { // method-level synchronized
count++;
}
public synchronized int get() {
return count;
}
public static void main(String[] args) throws InterruptedException {
SyncCounter c = new SyncCounter();
Thread[] ts = new Thread[10];
for (int i = 0; i < ts.length; i++) {
ts[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) c.inc();
});
ts[i].start();
}
for (Thread t : ts) t.join();
System.out.println(c.get()); // 10_000 (locked)
}
}volatile: Visibility Only
volatile guarantees that field reads/writes are immediately visible to all threads, but **does not guarantee atomicity** (i++ is not atomic). Commonly used for a "flag" — one thread changes the state, others observe it.
// VolatileFlag.java
public class VolatileFlag {
static volatile boolean running = true;
public static void main(String[] args) throws InterruptedException {
Thread worker = new Thread(() -> {
int i = 0;
while (running) { // without volatile the compiler may cache this in a register and never exit
i++;
}
System.out.println("worker stopped after " + i + " loops");
});
worker.start();
Thread.sleep(200);
running = false; // make the worker exit
worker.join();
}
}Atomic Classes: AtomicInteger
Use case: a concurrent counter (request count, active users). Faster than synchronized — implemented lock-free using the CPU's CAS instruction.
// AtomicCounter.java
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicCounter {
public static void main(String[] args) throws InterruptedException {
AtomicInteger count = new AtomicInteger();
Thread[] ts = new Thread[10];
for (int i = 0; i < ts.length; i++) {
ts[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) count.incrementAndGet();
});
ts[i].start();
}
for (Thread t : ts) t.join();
System.out.println(count.get()); // 10_000
}
}ExecutorService Thread Pool (the Production Workhorse)
Use case: processing a batch of independent tasks (crawlers, batch computation, concurrent RPCs). Don't new a Thread per task — thread creation is expensive and the count goes out of control. Use a thread pool for unified scheduling. The Executors factory provides common variants; newFixedThreadPool is the first choice for 90% of cases.
// PoolDemo.java
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class PoolDemo {
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(4);
List<Future<Integer>> results = new ArrayList<>();
for (int i = 1; i <= 8; i++) {
final int n = i;
results.add(pool.submit(() -> {
Thread.sleep(100);
return n * n;
}));
}
for (Future<Integer> f : results) {
System.out.println(f.get()); // block waiting for the result
}
// graceful shutdown: stop accepting new tasks, let existing ones finish
pool.shutdown();
if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
pool.shutdownNow();
}
}
}CompletableFuture: Async Composition (the Most Common)
Use case: one endpoint needs to concurrently query 3 downstreams and merge results before returning — classic "async / await" style. CompletableFuture lets you write async like sync code: supplyAsync starts a task, thenApply transforms, thenCompose chains, allOf waits for several in parallel.
// AsyncDemo.java
import java.util.concurrent.CompletableFuture;
public class AsyncDemo {
static int slowQuery(String key) {
try { Thread.sleep(200); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return key.length();
}
public static void main(String[] args) throws Exception {
long t0 = System.currentTimeMillis();
// run three slow queries concurrently
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> slowQuery("alpha"));
CompletableFuture<Integer> b = CompletableFuture.supplyAsync(() -> slowQuery("beta"));
CompletableFuture<Integer> c = CompletableFuture.supplyAsync(() -> slowQuery("gamma"));
// merge
CompletableFuture<Integer> sum = a.thenCombine(b, Integer::sum)
.thenCombine(c, Integer::sum);
System.out.println("sum = " + sum.get());
System.out.println("elapsed: " + (System.currentTimeMillis() - t0) + "ms"); // ~200ms instead of 600ms
// chained: execute + transform + side effect
CompletableFuture.supplyAsync(() -> slowQuery("hello"))
.thenApply(n -> n * 10)
.thenAccept(r -> System.out.println("result = " + r))
.get();
}
}Concurrent Collections: ConcurrentHashMap
Use case: a map shared across threads (cache, counter, dedup table). A plain HashMap corrupts data or even infinite-loops under multithreading. ConcurrentHashMap is an efficient concurrent implementation with "segment locks / CAS optimization". Common methods: computeIfAbsent (lazy init), merge (count accumulation).
// ConcurrentMap.java
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentMap {
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, Integer> counts = new ConcurrentHashMap<>();
Thread[] ts = new Thread[10];
for (int i = 0; i < ts.length; i++) {
ts[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counts.merge("hit", 1, Integer::sum);
}
});
ts[i].start();
}
for (Thread t : ts) t.join();
System.out.println(counts.get("hit")); // 10_000
// computeIfAbsent: lazy init
ConcurrentHashMap<String, java.util.List<Integer>> buckets = new ConcurrentHashMap<>();
buckets.computeIfAbsent("a", k -> new java.util.concurrent.CopyOnWriteArrayList<>()).add(1);
System.out.println(buckets);
}
}In Practice: Concurrently Fetch Multiple URLs
Combine CompletableFuture + a thread pool to build the standard "concurrent requests + aggregation" pattern.
// ConcurrentFetch.java
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class ConcurrentFetch {
// simulate an "HTTP request" that blocks for 200ms
static String fetch(String url) {
try { Thread.sleep(200); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return url + " -> ok";
}
public static void main(String[] args) throws Exception {
List<String> urls = List.of("a.com", "b.com", "c.com", "d.com", "e.com");
ExecutorService pool = Executors.newFixedThreadPool(5);
long t0 = System.currentTimeMillis();
List<CompletableFuture<String>> futures = urls.stream()
.map(u -> CompletableFuture.supplyAsync(() -> fetch(u), pool))
.toList();
// wait for all to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
List<String> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
results.forEach(System.out::println);
System.out.println("elapsed: " + (System.currentTimeMillis() - t0) + "ms"); // ~200ms
pool.shutdown();
}
}