V
Vel·ToolKit
Simple · Fast · Ready to use
EN
Chapter 19 of 20

Concurrency

Thread, synchronized, volatile, ExecutorService, CompletableFuture

Where Concurrency Is Used

A web service handling multiple requests per second, batch-processing tens of thousands of records, scheduled tasks, firing concurrent RPCs to several downstreams, crawling multiple URLs, cache refresh — all need several things running at once in one process. Java provides a full toolchain from low-level (Thread) to high-level (thread pools, CompletableFuture).

  • Low-level: Thread / Runnable — no longer used directly
  • Thread-safety primitives: synchronized / volatile / java.util.concurrent.locks.*
  • Concurrent collections: ConcurrentHashMap / CopyOnWriteArrayList
  • Thread pools: ExecutorService (the workhorse in production code)
  • Async orchestration: CompletableFuture (chaining + composition)
  • Atomic classes: AtomicInteger / AtomicReference (lock-free counting)

Thread and Runnable

The lowest-level way to create a thread. Production code almost never new Thread directly — use a thread pool to manage the lifecycle. This is just a conceptual intro.

// ThreadBasic.java
public class ThreadBasic {
    public static void main(String[] args) throws InterruptedException {
        // Runnable is a functional interface, you can use a lambda
        Thread t = new Thread(() -> {
            for (int i = 0; i < 3; i++) {
                System.out.println("worker tick " + i);
                try { Thread.sleep(100); } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }, "worker-1");

        t.start();         // start the thread; run() executes on the new thread
        t.join();          // main waits for t to finish before continuing
        System.out.println("main done");
    }
}

synchronized and Visibility

Reading/writing a shared variable across threads must be locked. synchronized is the simplest lock — it ensures only one thread enters the critical section at a time and that changes to shared variables are immediately visible to other threads. Without a lock the result is usually wrong.

// SyncCounter.java
public class SyncCounter {
    private int count = 0;

    public synchronized void inc() {  // method-level synchronized
        count++;
    }

    public synchronized int get() {
        return count;
    }

    public static void main(String[] args) throws InterruptedException {
        SyncCounter c = new SyncCounter();
        Thread[] ts = new Thread[10];
        for (int i = 0; i < ts.length; i++) {
            ts[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) c.inc();
            });
            ts[i].start();
        }
        for (Thread t : ts) t.join();
        System.out.println(c.get());  // 10_000 (locked)
    }
}

volatile: Visibility Only

volatile guarantees that field reads/writes are immediately visible to all threads, but **does not guarantee atomicity** (i++ is not atomic). Commonly used for a "flag" — one thread changes the state, others observe it.

// VolatileFlag.java
public class VolatileFlag {
    static volatile boolean running = true;

    public static void main(String[] args) throws InterruptedException {
        Thread worker = new Thread(() -> {
            int i = 0;
            while (running) {  // without volatile the compiler may cache this in a register and never exit
                i++;
            }
            System.out.println("worker stopped after " + i + " loops");
        });
        worker.start();

        Thread.sleep(200);
        running = false;       // make the worker exit
        worker.join();
    }
}

Atomic Classes: AtomicInteger

Use case: a concurrent counter (request count, active users). Faster than synchronized — implemented lock-free using the CPU's CAS instruction.

// AtomicCounter.java
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicCounter {
    public static void main(String[] args) throws InterruptedException {
        AtomicInteger count = new AtomicInteger();

        Thread[] ts = new Thread[10];
        for (int i = 0; i < ts.length; i++) {
            ts[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) count.incrementAndGet();
            });
            ts[i].start();
        }
        for (Thread t : ts) t.join();
        System.out.println(count.get());  // 10_000
    }
}

ExecutorService Thread Pool (the Production Workhorse)

Use case: processing a batch of independent tasks (crawlers, batch computation, concurrent RPCs). Don't new a Thread per task — thread creation is expensive and the count goes out of control. Use a thread pool for unified scheduling. The Executors factory provides common variants; newFixedThreadPool is the first choice for 90% of cases.

// PoolDemo.java
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class PoolDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(4);

        List<Future<Integer>> results = new ArrayList<>();
        for (int i = 1; i <= 8; i++) {
            final int n = i;
            results.add(pool.submit(() -> {
                Thread.sleep(100);
                return n * n;
            }));
        }

        for (Future<Integer> f : results) {
            System.out.println(f.get());   // block waiting for the result
        }

        // graceful shutdown: stop accepting new tasks, let existing ones finish
        pool.shutdown();
        if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
            pool.shutdownNow();
        }
    }
}

CompletableFuture: Async Composition (the Most Common)

Use case: one endpoint needs to concurrently query 3 downstreams and merge results before returning — classic "async / await" style. CompletableFuture lets you write async like sync code: supplyAsync starts a task, thenApply transforms, thenCompose chains, allOf waits for several in parallel.

// AsyncDemo.java
import java.util.concurrent.CompletableFuture;

public class AsyncDemo {
    static int slowQuery(String key) {
        try { Thread.sleep(200); } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return key.length();
    }

    public static void main(String[] args) throws Exception {
        long t0 = System.currentTimeMillis();

        // run three slow queries concurrently
        CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> slowQuery("alpha"));
        CompletableFuture<Integer> b = CompletableFuture.supplyAsync(() -> slowQuery("beta"));
        CompletableFuture<Integer> c = CompletableFuture.supplyAsync(() -> slowQuery("gamma"));

        // merge
        CompletableFuture<Integer> sum = a.thenCombine(b, Integer::sum)
                                          .thenCombine(c, Integer::sum);
        System.out.println("sum = " + sum.get());

        System.out.println("elapsed: " + (System.currentTimeMillis() - t0) + "ms"); // ~200ms instead of 600ms

        // chained: execute + transform + side effect
        CompletableFuture.supplyAsync(() -> slowQuery("hello"))
            .thenApply(n -> n * 10)
            .thenAccept(r -> System.out.println("result = " + r))
            .get();
    }
}

Concurrent Collections: ConcurrentHashMap

Use case: a map shared across threads (cache, counter, dedup table). A plain HashMap corrupts data or even infinite-loops under multithreading. ConcurrentHashMap is an efficient concurrent implementation with "segment locks / CAS optimization". Common methods: computeIfAbsent (lazy init), merge (count accumulation).

// ConcurrentMap.java
import java.util.concurrent.ConcurrentHashMap;

public class ConcurrentMap {
    public static void main(String[] args) throws InterruptedException {
        ConcurrentHashMap<String, Integer> counts = new ConcurrentHashMap<>();

        Thread[] ts = new Thread[10];
        for (int i = 0; i < ts.length; i++) {
            ts[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    counts.merge("hit", 1, Integer::sum);
                }
            });
            ts[i].start();
        }
        for (Thread t : ts) t.join();
        System.out.println(counts.get("hit")); // 10_000

        // computeIfAbsent: lazy init
        ConcurrentHashMap<String, java.util.List<Integer>> buckets = new ConcurrentHashMap<>();
        buckets.computeIfAbsent("a", k -> new java.util.concurrent.CopyOnWriteArrayList<>()).add(1);
        System.out.println(buckets);
    }
}

In Practice: Concurrently Fetch Multiple URLs

Combine CompletableFuture + a thread pool to build the standard "concurrent requests + aggregation" pattern.

// ConcurrentFetch.java
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class ConcurrentFetch {
    // simulate an "HTTP request" that blocks for 200ms
    static String fetch(String url) {
        try { Thread.sleep(200); } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return url + " -> ok";
    }

    public static void main(String[] args) throws Exception {
        List<String> urls = List.of("a.com", "b.com", "c.com", "d.com", "e.com");
        ExecutorService pool = Executors.newFixedThreadPool(5);
        long t0 = System.currentTimeMillis();

        List<CompletableFuture<String>> futures = urls.stream()
            .map(u -> CompletableFuture.supplyAsync(() -> fetch(u), pool))
            .toList();

        // wait for all to complete
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

        List<String> results = futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
        results.forEach(System.out::println);

        System.out.println("elapsed: " + (System.currentTimeMillis() - t0) + "ms"); // ~200ms
        pool.shutdown();
    }
}