Java25 线程体系 与 异步编程
线程是 CPU 调度的最小执行单位
1 2 3 4 5 Thread t = new Thread(() -> { System.out.println("hello"); }); t.start();
1 2 3 4 5 6 7 Java Thread ↓ JVM ↓ (1v1绑定) OS Thread(Linux pthread) ↓ CPU 调度同步:程序逐步执行,上一行完成后才会执行下一行。
异步:程序多线程同时执行,上一行被其他线程运行,下一行无需等待上一行执行完成可继续执行。
内存模型
现代CPU每个核心通常有三级缓存。L1 cache为每个核心独自占用,L2 cache为某模块内所有核心共用,L3 cache为CPU内所有核心共用。对于Java,每一个载体线程(虚拟线程)、平台线程(传统线程)都会占用一个CPU核心。
线程种类
Java25中的线程分为 平台线程(传统线程)和 虚拟线程。
Java 的并发模型发展:传统线程(OS线程)→ 线程池(资源复用)→ 虚拟线程(Java 21+)
传统线程
Java 最原始的并发单元,操作系统线程,开销重、数量有限。
传统线程中,每个传统线程都对应一个操作系统内核线程,即(传统线程:OS线程=1:1)。当任务在线程中执行遇到堵塞操作时,这会同时堵塞传统线程和对应的操作系统内核线程,为了节省CPU资源,当某个操作系统内核线程被堵塞后,操作系统调度器会把这个内核线程从 CPU 上切走,放进阻塞队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 方式1:继承 Thread
class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread: " + Thread.currentThread().getName());
}
}
new MyThread().start();
// 方式2:实现 Runnable(推荐,解耦逻辑与线程)
Thread t = new Thread(() -> System.out.println("Runnable thread"));
t.start();
// 方式3:实现 Callable(有返回值 + 可抛异常)
Callable<String> callable = () -> "result";
FutureTask<String> task = new FutureTask<>(callable);
new Thread(task).start();
String result = task.get(); // 阻塞等待结果
线程状态:
缺点:创建一个线程约消耗 1MB 栈内存,操作系统上下文切换开销大,通常系统最多支撑数千个并发线程。频繁的创建、销毁传统线程造成大量消耗。
传统线程池
线程池将线程的”创建/销毁”与”任务执行”解耦,通过预先创建好的线程反复接收任务,大幅降低创建新线程和销毁线程的开销。
传统线程中,每个传统线程都对应一个操作系统内核线程,即(传统线程:OS线程=1:1)。当任务在线程中执行遇到堵塞操作时,这会同时堵塞传统线程和对应的操作系统内核线程,为了节省CPU资源,当某个操作系统内核线程被堵塞后,操作系统调度器会把这个内核线程从 CPU 上切走,放进阻塞队列。此时,线程池中的这个线程已经失去了执行能力,无法再为其调度线程池等待队列的其他任务。
线程池:
优点:节省传统线程为每个任务创建/销毁线程的开销。
缺点:没有解决传统线程绑定OS核心线程,当堵塞时切换线程发生在操作系统的内核层,导致线程切换开销巨大。
为了解决内核线程切换的开销,有两种方式:
- 在传统线程里执行 异步非堵塞 操作,使用操作系统提供的 epoll 机制,线程发起一个网络请求后立刻返回去干别的活,等到网络数据准备好了,操作系统再通知线程(不一定是之前调用的线程,从线程池中获取一个)回来执行回调函数。
- 使用虚拟线程,在用户态自己实现了一个JVM调度器,把 M 个虚拟线程(协程) 映射到 N 个操作系统的物理线程 上。
创建线程池:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 1. 固定大小线程池 — CPU密集型任务
ExecutorService fixed = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
// 2. 缓存线程池 — 大量短生命周期任务
ExecutorService cached = Executors.newCachedThreadPool();
// 线程空闲60s自动回收,按需创建
// 3. 单线程池 — 保证任务串行执行
ExecutorService single = Executors.newSingleThreadExecutor();
// 4. 定时线程池
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(4);
scheduled.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
// 生产推荐:手动创建,明确每个参数
ExecutorService pool = new ThreadPoolExecutor(
4, // corePoolSize
16, // maximumPoolSize
60L, TimeUnit.SECONDS, // keepAliveTime
new LinkedBlockingQueue<>(1000), // 有界队列,避免OOM
new ThreadFactoryBuilder()
.setNameFormat("my-pool-%d")
.build(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:调用者执行
);
执行异步任务:
1
2
3
4
5
6
7
8
9
// execute:无返回值,异常会丢失
pool.execute(() -> System.out.println("fire and forget"));
// submit:返回 Future,可获取结果和捕获异常
Future<String> future = pool.submit(() -> {
Thread.sleep(1000);
return "done";
});
String result = future.get(2, TimeUnit.SECONDS); // 限时等待
虚拟线程
虚拟线程是 Java 21 通过 Project Loom 引入的革命性特性,底层是 M:N 调度:大量虚拟线程复用少量 载体线程(carrier thread),遇到 I/O 阻塞时自动 卸载(unmount) 当前虚拟线程,让出载体线程给其他任务。
(虚拟线程:载体线程=M:N)(载体线程:OS线程=1:1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 1. 直接创建
Thread vt = Thread.ofVirtual().name("my-vt").start(() -> {
System.out.println("I'm a virtual thread!");
});
// 2. 虚拟线程池(每个任务用独立虚拟线程)
// 内部是对 1.直接创建 的简单封装,引入ExecutorService主要是为了向前兼容
ExecutorService vtPool = Executors.newVirtualThreadPerTaskExecutor();
vtPool.submit(() -> {
// 同步风格写阻塞代码,底层自动卸载不占OS线程
String body = httpClient.send(request, BodyHandlers.ofString()).body();
db.save(body); // 数据库操作,自动 park/unpark
});
// 3. 旧代码无缝迁移:只需改线程工厂
ExecutorService legacy = new ThreadPoolExecutor(
100, 100, 0, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
Thread.ofVirtual().factory() // ← 只改这一行
);
对于虚拟线程,无论是使用直接创建Thread.ofVirtual()还是通过类似线程池Executors.newVirtualThreadPerTaskExecutor()两者本质是一样的,他们对每个任务都创建一个新的JVM虚拟线程,最终都会映射到JVM共享的ForkJoinPool的载体线程池上。
比较
| 传统线程/线程池 | 虚拟线程 | |
|---|---|---|
| CPU 密集型 | ✅ 最佳 | 无收益(受限CPU核心数) |
| I/O 密集型 | 线程数受限 | ✅ 百万并发 |
| 内存占用 | ~1MB/线程 | ~几KB/虚拟线程 |
| 代码风格 | 同步阻塞 | 同步阻塞(外表一样) |
| 调试栈帧 | 清晰 | ✅ 同样清晰(优于回调) |
传统线程适合简单场景或底层控制,但高并发下内存和调度成本高,生产中几乎不直接裸用。
线程池是目前最主流的方案,CPU 密集型任务首选 FixedThreadPool(线程数 = CPU 核数),I/O 密集型可适当加大。生产中必须手动创建,指定有界队列和拒绝策略,避免 OOM。
虚拟线程(Java 21+)专为 I/O 密集型高并发设计,用同步代码写出异步的吞吐量,非常适合 Web 服务、数据库查询等场景。
同步/异步/堵塞/非堵塞(并发编程)
- 同步 vs 异步:关注的是调用方是否需要等待结果(行为模式)。
- 阻塞 vs 非阻塞:关注的是线程在等待时是否被挂起(线程状态)。
同步与异步 (Synchronous vs Asynchronous)
同步 (Synchronous) :调用方发出一个请求后,必须主动等待被调用方返回结果,才能继续执行后续的任务。整个过程是串行的。
1 2 3 4
// 主线程会在这里卡住,必须等 doTask() 执行完拿到结果,才会打印 "Main end" System.out.println("Main start"); String result = doTask(); System.out.println("Main end");
异步 (Asynchronous):调用方发出请求后,不需要原地等待结果,而是直接返回去执行其他任务。被调用方通常会在完成任务后,通过回调函数、事件通知或 Future 对象来告知调用方结果。
1 2 3 4 5 6 7
System.out.println("Main start"); // 提交异步任务到后台线程池执行,主线程不等待,直接往下走 CompletableFuture.runAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) {} System.out.println("Async task completed"); }); System.out.println("Main continues immediately"); // 这句话会紧接着打印出来
同步、异步的主要区别:谁来等待数据,应用程序线程在等待(可能阻塞,也可能轮询)即同步;内核在等待,完成后回调,即异步)
- 同步IO:应用程序主动发起IO操作,并自己等待(阻塞或轮询)数据就绪。
- 异步IO:应用程序发起IO操作后立即返回,由操作系统内核完成数据准备和拷贝,然后通知应用程序。
阻塞与非阻塞 (Blocking vs Non-blocking)
阻塞 (Blocking) : 当线程发起一个操作(如读取文件、网络请求、获取锁)时,如果条件不满足,该线程会被操作系统挂起(进入 BLOCKED/WAITING 状态),暂停执行并释放 CPU 资源,直到条件满足后被唤醒。
1 2 3
ServerSocket serverSocket = new ServerSocket(8080); // 如果没有客户端连接,当前线程会一直阻塞在这里,无法执行后面的代码 Socket clientSocket = serverSocket.accept();
注:此段堵塞代码与上面同步代码的区别:同步代码可能因为堵塞而耗时(线程处于挂起),也可能因为计算复杂而耗时(线程处于执行中),而此段堵塞代码线程一定处于挂起状态。
非阻塞 (Non-blocking):当线程发起一个操作时,无论结果如何,它都会立即返回。如果条件不满足,它会马上得到一个错误码或空结果,而不会被挂起,可以继续去处理其他事情。通常会配合轮询(不断询问)或 I/O 多路复用(让内核通知)来使用。
1 2 3 4
// 将通道设置为非阻塞模式 socketChannel.configureBlocking(false); // 即使没有数据可读,read() 也会立刻返回(通常返回0或-1),线程不会停在这里 int bytesRead = socketChannel.read(buffer);
| 特性 | 阻塞任务 | 非阻塞任务 |
|---|---|---|
| 线程状态 | 线程进入 WAITING/TIMED_WAITING | 线程始终处于 RUNNABLE |
| 资源占用 | 占用线程但什么都不做 | 快速执行完释放线程(任务过重会占用大量时间) |
| 典型场景 | 网络IO、磁盘IO、数据库查询、Thread.sleep() | 计算、转换、过滤、组合 |
堵塞、非堵塞的核心区别:调用线程的状态
- 调用线程会被操作系统挂起(暂停),进入
WAITING状态,即为堵塞。 - 调用线程不会被挂起,会保持
RUNNING运行状态,即为非堵塞。
同步堵塞和异步非堵塞很好理解,下面给出同步非堵塞,异步堵塞的代码实例,方便理解:
| 组合模式 | 核心特点 | 谁来确认结果? | 线程状态 |
|---|---|---|---|
| 同步阻塞 | 傻等模式 | 调用方主动等 | 挂起(睡觉) |
| 同步非阻塞 | 忙等/轮询模式 | 调用方主动问 | 运行(不睡觉,一直问) |
| 异步阻塞 | 干等通知模式 | 被调用方通知 | 挂起(睡觉) |
| 异步非阻塞 | 完美模式 | 被调用方通知 | 运行(干别的,等通知) |
同步非堵塞:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 将通道设置为非阻塞模式
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 同步非阻塞的核心:主动轮询(忙等)
while (true) {
// 调用 read() 时,如果没有数据,它会立刻返回 0,线程不会挂起,可以继续执行后面的代码
int bytesRead = socketChannel.read(buffer);
if (bytesRead > 0) {
System.out.println("读到数据了!");
break; // 拿到结果,跳出循环
} else {
// 没读到数据,线程没有傻等,而是去干点别的(比如处理其他连接或逻辑)
// 然后马上又回到这里,再次主动询问(轮询)
System.out.println("还没数据,我去干点别的,马上再回来看看...");
}
}
异步堵塞:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 1. 提交一个异步任务(这是异步的,任务会扔到别的线程去跑)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try { Thread.sleep(3000); } catch (InterruptedException e) {}
return "任务完成";
});
System.out.println("任务已提交,继续往下走...");
// 2. 但是!紧接着调用了 future.get()
// 此时,当前主线程会被强行卡在这里,一直阻塞等待异步任务的结果返回
String result = future.get(); // <--- 这里发生了阻塞!
System.out.println("拿到结果:" + result);
| 组合 | 示例 | 特点 |
|---|---|---|
| 同步阻塞 | 传统 Socket IO,BIO | 每个连接一个线程,read() 会阻塞等待数据 |
| 同步非阻塞 | Netty NIO,Java Selector + 非阻塞 Channel | 单线程轮询多个连接,读写立即返回 |
| 异步阻塞 | 几乎不存在(矛盾概念) | 极少场景 |
| 异步非阻塞 | Java AIO(Proactor 模式),Windows IOCP | 系统完成后回调,无需主动轮询 |
操作系统的支持
epoll(Linux)和 kqueue(BSD/macOS):属于 I/O 多路复用(multiplexing),通常配合 非阻塞 I/O(non-blocking I/O) 使用。属于同步 非堵塞 I/O 模型,不会阻塞线程等待单个 I/O 完成,但应用程序需主动查询I/O 是否完成并调用
read()/write()来完成数据传输。- IOCP (Window):真正的 异步 非堵塞 I/O(Asynchronous I/O)。应用发起 I/O 请求,立即返回。内核在 后台完成整个 I/O 操作(包括数据拷贝) 后,通过完成端口通知应用。应用无需主动调用
read,数据已就绪。 - io_uring (Linux):高性能异步 I/O 框架,也是内核完成全部操作后通知用户。
| 特性 | epoll (Linux) | kqueue (BSD/macOS) | IOCP (Windows) | io_uring (Linux) |
|---|---|---|---|---|
| 工作模式 | 就绪通知 | 就绪通知 | 完成通知 | 完成通知 (提交&收割队列) |
| 内核驱动 | 被动。调用epoll_wait时,内核检查并返回就绪事件。 | 被动。调用kevent时,内核返回事件。 | 主动。I/O完成后,内核主动将完成状态放入完成队列。 | 主动。I/O完成后,内核主动将完成事件放入CQ(完成队列)。 |
| 提供线程 | 不提供。用户线程调用epoll_wait时阻塞。 | 不提供。用户线程调用kevent时阻塞。 | 提供。 | 不提供。用户线程调用io_uring_wait_cqe时阻塞,但提供内核侧轮询线程查询SQE并开启IO任务 |
| 主流用法 | 一个线程循环调用epoll_wait,处理就绪事件。 | 一个线程循环调用kevent,处理就绪事件。 | 线程池 + 完成端口。通常启多个线程调用GetQueuedCompletionStatus,每个线程都可处理完成事件。 | 一个线程循环调用io_uring_wait_cqe() 阻塞。通过IORING_SETUP_SQPOLL启用内核侧轮询线程,可在提交SQE任务时,不陷入内核态。 |
Java异步框架 Netty
异步编排框架 CompletableFuture
CompletableFuture 是 Java 8 引入的异步编程框架,允许以链式调用的方式组合多个异步操作,避免回调地狱。
如果任务是同步堵塞的,那么CompletableFuture并不是真正的异步非堵塞,其只是将堵塞任务交给其他线程了(如ForkJoinPool中的线程),仅仅不堵塞主线程(主动调用线程)而已,ForkJoinPool中的线程依然被堵塞。但如果任务是支持异步非堵塞的,那么CompletableFuture就是真正的异步非堵塞模型,需要借助complete()方法主动完成任务。
注:此框架可防止编写有返回值的异步代码堵塞主线程(主动调用线程)
使用Future接受异步返回值,在主线程调用get()时会被堵塞。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.*;
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 创建传统线程池
ExecutorService executor = Executors.newFixedThreadPool(2);
System.out.println("【主线程】准备点外卖...");
// 2. 提交任务,返回一个 Future 占位符
Future<String> future = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
// 模拟耗时
Thread.sleep(2000);
return "热腾腾的牛肉面";
}
});
// 3. 获取结果:如果尚未完成,主线程会在这里死死卡住(阻塞)
String result = future.get();
System.out.println("【主线程】终于拿到外卖啦: " + result);
// 5. 关闭线程池
executor.shutdown();
}
}
在第三步调用get()方法时,如果异步任务没有完成,主线程(调用线程)会被堵塞。为了解决这个问题,Java8引入了CompletableFuture,其允许把多个异步任务像流水线一样串联起来,且全程不需要阻塞主线程。
其中异步任务被交给了默认的ForkJoinPool线程池中的线程,其中thenAccept()中的回调函数默认也会交给ForkJoinPool线程池中的线程执行。thenApply()中的回调函数会交给ForkJoinPool中之前执行异步任务的线程执行(保证同一个线程,因为之前的线程会堵塞,堵塞结束接着执行回调函数),thenApplyAsync()中的回调函数交给ForkJoinPool中任意一个空闲的线程。
1
2
3
4
5
6
7
8
9
10
11
12
System.out.println("【主线程】点完外卖了,继续向下执行核心业务...");
CompletableFuture.supplyAsync(() -> {
// 异步任务
return "牛肉面";
}).thenAccept(food -> {
// 异步任务完成后的 执行代码(回调函数)
System.out.println("【工作线程】饭好了,我已经帮主人把 " + food + " 炫嘴里了。");
});
// 主线程会直接继续执行下面的代码
xxx();
对于thenAccept中的任务由哪个线程执行与线程竞争有关:
Java的CompletableFuture本质还是使用线程池模拟异步(并不是真正的异步) supplyAsync()是用于提交同步堵塞任务给特定线程池(如线程1)执行(最好不要在supplyAsync()方法中使用异步非堵塞IO,否则会出错)提交的任务会封装一个complete()方法用于激活处理后续的回调函数(如thenApply等)由线程1执行 thenApply()和thenApplyAsync()都是在complete()方法中被执行的。即线程1完成同步堵塞任务后被唤醒并执行thenApply()的任务 或 将thenApplyAsync()的任务分发到其他指定线程池(线程2)中执行。
强调一点:如果主线程在执行到thenApply()注册回调时,supplyAsync()任务已经完成返回,就会由主线程亲自执行thenApply()的任务 或 将thenApplyAsync()的任务分发到具体的其他线程池中执行。
如果要将CompletableFuture与netty结合,不能直接在supplyAsync()中编写netty异步任务,需要将CompletableFuture的回调触发(future.complete(buf);)封装到netty的回调函数中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// CompletableFuture异步 伪代码
CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
ioUring.submitRead(fd, buf, (res, data) -> {
future.complete(buf); // 由 io_uring 回调线程执行CompletableFuture的complete()触发回调链
});
future.thenApplyAsync(data -> process(data), bizExecutor)
// CompletableFuture Netty代码
CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
channel.writeAndFlush(req).addListener(f -> {
if (f.isSuccess()) future.complete(result);
else future.completeExceptionally(f.cause());
});
future.thenApplyAsync(data -> process(data), bizExecutor) // EventLoop 安全
CompletableFuture的使用:
创建异步任务
| 方法 | 用途 |
|---|---|
supplyAsync(Supplier) | 有返回值的异步任务 |
runAsync(Runnable) | 无返回值的异步任务 |
xxxAsync(..., Executor) | 指定自定义线程池(生产环境强烈推荐) |
结果转换与消费
| 方法 | 说明 | 同步/异步 |
|---|---|---|
thenApply(fn) | 转换结果(类似 Stream.map) | 当前线程/新线程 |
thenAccept(consumer) | 消费结果,无返回值 | 当前线程/新线程 |
thenRun(action) | 不关心上一步结果,仅执行动作 | 当前线程/新线程 |
handle(biFn) | 统一处理成功和异常,返回新结果 | 当前线程/新线程 |
whenComplete(biConsumer) | 观测结果或异常,不改变原结果 | 当前线程/新线程 |
多任务组合
| 方法 | 用途 |
|---|---|
thenCompose(fn) | 串行依赖:上一步结果作为下一步输入(flatMap) |
thenCombine(other, biFn) | 并行合并:两个独立任务都完成后合并结果 |
allOf(futures...) | 等待所有任务完成(返回 Void) |
anyOf(futures...) | 等待任一任务完成(返回最快结果) |
异常处理
| 方法 | 用途 |
|---|---|
exceptionally(fn) | 捕获异常并返回兜底值(类似 catch) |
handle(biFn) | 同时处理正常值和异常,返回统一类型 |
orTimeout(time, unit) | Java 9+ 超时自动抛 TimeoutException |
completeOnTimeout(val, time, unit) | Java 9+ 超时则用默认值完成 |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// ① 基础:创建异步任务
CompletableFuture<String> cf = CompletableFuture.supplyAsync(
() -> fetchFromDB(), // 异步执行
customPool // 指定线程池(默认 ForkJoinPool)
);
// ② 转换结果:thenApply(同步映射)
CompletableFuture<Integer> cf2 = cf.thenApply(s -> s.length());
// ③ 继续异步:thenCompose(flatMap,避免嵌套)
CompletableFuture<Order> orderFuture = cf
.thenCompose(userId -> fetchOrderAsync(userId));
// ④ 合并两个任务:thenCombine
CompletableFuture<String> result =
CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(
CompletableFuture.supplyAsync(() -> " World"),
(a, b) -> a + b
);
// ⑤ 等待全部完成
CompletableFuture<Void> all = CompletableFuture.allOf(cf1, cf2, cf3);
all.join();
// ⑥ 任意一个完成
CompletableFuture<Object> any = CompletableFuture.anyOf(cf1, cf2, cf3);
// ⑦ 异常处理
cf.exceptionally(ex -> {
log.error("failed", ex);
return "default";
}).thenAccept(System.out::println);
// ⑧ 超时控制(Java 9+)
cf.orTimeout(3, TimeUnit.SECONDS)
.completeOnTimeout("fallback", 2, TimeUnit.SECONDS);
虚拟线程以同步代码实现异步性能
假设场景:并发执行 3 个独立的 IO 任务(查询用户信息、查询订单列表、查询积分),然后聚合结果返回。
传统 CompletableFuture 写法(异步编排)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public UserProfile getUserProfileAsync(Long userId) {
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> userService.getById(userId));
CompletableFuture<List<Order>> orderFuture = CompletableFuture.supplyAsync(() -> orderService.listByUserId(userId));
CompletableFuture<Integer> pointsFuture = CompletableFuture.supplyAsync(() -> pointsService.getPoints(userId));
// 必须使用 allOf + thenApply 进行编排,调试困难,堆栈不连续
return CompletableFuture.allOf(userFuture, orderFuture, pointsFuture)
.thenApply(v -> {
User user = userFuture.join();
List<Order> orders = orderFuture.join();
Integer points = pointsFuture.join();
return new UserProfile(user, orders, points);
})
.join();
}
虚拟线程写法(同步编码,异步性能)
纯虚拟线程基础 API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public UserProfile getUserProfileVtBasic(Long userId) throws Exception {
// 手动创建虚拟线程 + Future 模拟并发
var executor = Executors.newVirtualThreadPerTaskExecutor();
Future<User> userFuture = executor.submit(() -> userService.getById(userId));
Future<List<Order>> orderFuture = executor.submit(() -> orderService.listByUserId(userId));
Future<Integer> pointsFuture = executor.submit(() -> pointsService.getPoints(userId));
// 同步阻塞获取 —— 因为是虚拟线程,这里的 get() 不会占用平台线程
User user = userFuture.get();
List<Order> orders = orderFuture.get();
Integer points = pointsFuture.get();
executor.close();
return new UserProfile(user, orders, points);
}
使用 StructuredTaskScope
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// ==================== 核心编排逻辑 ====================
/**
* 使用 StructuredTaskScope 实现同步编码风格的并发 IO 编排
* - 三个 IO 任务真正并发执行
* - 任一子任务失败时,自动取消其余未完成的子任务(ShutdownOnFailure 语义)
* - 主虚拟线程在 join() 处挂起,不占用载体线程
*/
UserProfile getUserProfile(long userId) throws Exception {
// try-with-resources 确保 scope 关闭时所有子任务已完成或被取消
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// fork 的每个 lambda 都在独立的虚拟线程中执行
Subtask<String> userTask = scope.fork(() -> new UserService().getById(userId));
Subtask<List<String>> orderTask = scope.fork(() -> new OrderService().listByUserId(userId));
Subtask<Integer> pointsTask = scope.fork(() -> new PointsService().getPoints(userId));
// 【关键】同步阻塞等待所有子任务完成
// 虚拟线程在此处 yield,载体线程可调度其他虚拟线程
scope.join();
// 如果任一子任务抛出异常,此处会将其包装为 ExecutionException 抛出
// 同时 ShutdownOnFailure 会自动向其余子任务发送中断信号
scope.throwIfFailed();
// 安全获取结果(此时子任务必定已成功完成)
return new UserProfile(
userTask.get(),
orderTask.get(),
pointsTask.get()
);
}
}
结构化并发框架 StructuredTaskScope
StructuredTaskScope强制要求所有并发子任务必须在父任务的代码块内完成,确保父任务结束前,其所有子任务也必然结束,杜绝了任务泄漏的风险,并通过内置的关闭策略(ShutdownOnFailure 等)简化了错误传播和取消操作。
StructuredTaskScope 和 CompletableFuture 的区别:
StructuredTaskScope 提供了类似 try-with-resources 的语法结构,确保:
- 生命周期绑定:退出 scope 时,所有未完成的子任务会被自动取消并等待结束。
- 异常传播:子任务的失败可以策略性地传播给父线程。
- 可观测性:线程转储(Thread Dump)能清晰展示父子任务关系。
| 维度 | CompletableFuture | StructuredTaskScope |
|---|---|---|
| 核心设计理念 | 基于回调的 响应式/声明式 异步编程 | 基于语法的 结构化并发,将并发任务视为一个工作单元 |
| API 风格 | 声明式 (链式调用, 如 thenApply, allOf) | 命令式 (使用 fork, join, close, 如 try-with-resources) |
| 生命周期管理 | 非结构化。父线程与子任务无强制从属关系,线程可能泄漏 | 结构化。子任务生命周期与作用域代码块绑定,由 try-with-resources 自动管理 |
| 线程模型/资源 | 默认使用 ForkJoinPool.commonPool() (平台线程) 也可指定使用虚拟线程 | 默认使用虚拟线程(Project Loom),资源效率更高 |
| 错误/取消策略 | 内聚。异常处理和取消逻辑分散在链式调用中,需要开发者手动实现(如 exceptionally) | 外置。提供 ShutdownOnFailure 和 ShutdownOnSuccess 等明确策略,统一传播异常并自动取消其他任务 |
| 可观测性 | 低。调用栈不直观,难以追踪任务间的父子关系 | 高。作用域作为层级结构,维护清晰的父子任务关系,便于调试和监控 |
| 适用场景 | 复杂的异步数据流编排,如微服务间的流水线操作、响应式系统 | I/O 密集型任务的风扇出/聚合模式,如调用多个下游 RPC 聚合数据 |
并行执行,任一失败则全部取消 (ShutdownOnFailure)
必须遵循 fork() → join() → resultNow()/throwIfFailed() 的顺序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;
public class PageRenderer {
record User(String name) {}
record Recommendations(List<String> items) {}
public String renderPage() throws InterruptedException, ExecutionException {
// try-with-resources 确保离开块时所有子任务都被清理
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// fork 启动子任务,返回 Subtask(Future 的子接口)
Future<User> userFuture = scope.fork(() -> fetchUser());
Future<Recommendations> recFuture = scope.fork(() -> fetchRecommendations());
// join() 会阻塞直到所有子任务完成(成功或失败)
// 如果有子任务失败,join() 后会标记 scope 为失败状态
scope.join();
// throwIfFailed() 检查是否有子任务失败,如有则抛出异常
// 这会将子任务的异常包装后抛出
scope.throwIfFailed();
// 到达这里说明所有任务都成功了
User user = userFuture.resultNow();
Recommendations recs = recFuture.resultNow();
return buildHtml(user, recs);
}
}
}
竞速模式,取最快成功结果 (ShutdownOnSuccess)
适用于向多个镜像服务器发起请求,谁先返回就用谁的结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public String fetchFromFastestMirror() throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
scope.fork(() -> fetchFromServerA());
scope.fork(() -> fetchFromServerB());
scope.fork(() -> fetchFromServerC());
// join() 会在第一个成功完成时返回(其余自动取消)
// 如果所有都失败,join() 仍然会返回,但 scope 处于失败状态
scope.join();
// resultNow() 返回第一个成功的结果
// 如果没有成功的,抛出异常
return scope.resultNow();
}
}
自定义 Scope —— 收集所有结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 自定义 Scope:收集所有子任务的结果(无论成功失败)
public class CollectingScope<T> extends StructuredTaskScope<T> {
private final ConcurrentLinkedQueue<Subtask<? extends T>> results = new ConcurrentLinkedQueue<>();
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
// 每个子任务完成时(无论成功/失败)都会回调此方法
results.add(subtask);
}
public List<Subtask<? extends T>> getResults() {
return List.copyOf(results);
}
}
// 使用
try (var scope = new CollectingScope<String>()) {
scope.fork(() -> sendEmail("user1@test.com"));
scope.fork(() -> sendEmail("user2@test.com"));
scope.fork(() -> sendEmail("invalid-email"));
scope.join(); // 等待所有完成
for (var subtask : scope.getResults()) {
switch (subtask.state()) {
case SUCCESS -> System.out.println("Sent: " + subtask.get());
case FAILED -> System.out.println("Failed: " + subtask.exception());
case UNAVAILABLE -> throw new IllegalStateException("Should not happen after join");
}
}
}








