Project Reactor 与 Spring WebFlux:响应式编程避坑指南

字数:约4500字 | 阅读时间:18分钟
“异步非阻塞不是银弹,用错了反而是灾难——但用对了,它是一把利器。”


响应式编程喊了好几年,Spring WebFlux 也稳定了,但真正在生产环境里用的人还是少数。原因很简单:心智模型完全不一样,踩坑成本太高。一个不小心就把”高性能”变成了”高压崩溃”。

这篇文章不聊概念普及,直接从实战角度出发,讲清楚 Project Reactor 的核心类型与操作符、Spring WebFlux 函数式端点的正确打开方式、R2DBC 数据库访问的坑点,以及最容易被忽视的几个致命陷阱。目标:让你看完就能上手,避掉我踩过的坑。


1. 响应式编程核心概念:Backpressure 与信号类型

1.1 背压(Backpressure):消费者说了算

传统异步回调里,生产者发送数据的速度如果超过消费者处理能力,要么积压内存爆掉,要么丢事件。响应式流(Reactive Streams)的核心设计之一就是背压:消费者通过信号告诉生产者自己能处理多少数据,生产者必须遵守这个约束。

这听起来很简单,但实际理解上有两个常见误区:

误区一:背压是”推”模型的反压机制。 实际上,Reactive Streams 默认是拉取(Pull)模型:下游通过 request(n) 主动向上游拉取 n 个元素,上游只在下游请求时才发送。上游的 onNext 调用是被下沉的 request 驱动的,不是上游主动推送的。

误区二:背压能自动解决所有积压问题。 背压只是流量控制手段,不是无限缓冲。上游如果产生数据的速度永远快于下游消费的速度,即便有背压机制,最终也会 OOM。正确的做法是在源头限流,或者使用 limitRate、窗口操作等手段控制数据产生速度。

背压信号通过 Subscription.request(long n) 传递,记住这个原则:谁持有 Subscription,谁决定什么时候 request

1.2 信号类型与生命周期

Reactive Streams 定义了四种信号,严格遵守才能写出正确的响应式程序:

信号 方法 语义
onNext onNext(T) 发射一个数据元素
onError onError(Throwable) 异常终止,不可恢复
onComplete onComplete() 正常终止,无更多数据
onSubscribe onSubscribe(Subscription) 订阅建立,传递 Subscription

这里有个致命规范:任何信号之后,都不允许再调用其他信号。一旦 onNextonErroronComplete 被调用,该订阅就结束了,Subscriptioncancel() 会被自动调用。

1
2
3
4
5
6
flux.take(5) // 只取5个元素
.subscribe(
System.out::println,
Throwable::printStackTrace,
() -> System.out.println("完成")
);

2. Project Reactor 核心类型与操作符

2.1 Mono vs Flux:到底什么时候用哪个

Flux:发射 0 到 N 个元素,有可能有错误和完成信号。
Mono:发射最多 1 个元素(T 或 Empty),或者是错误。

记住一个原则:能用 Mono 的地方坚决不要用 Flux。Flux 的开销比 Mono 高,因为它需要处理可变数量的元素和更复杂的订阅逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
// 查一个用户 — Mono
Mono<User> findById(Long id);

// 查所有用户 — Flux
Flux<User> findAll();

// 错误示范:查单个用户用 Flux
Flux<User> findById(Long id); // 语义错误,且浪费资源

// 正确示范
Mono<User> findById(Long id);
Mono<Optional<User>> findByIdOpt(Long id); // 需要区分"不存在"和"NPE"时

2.2 常用操作符分类与避坑

2.2.1 转换类操作符

map 和 flatMap 是最常用的两个,但行为差异巨大:

1
2
3
4
5
6
// map:同步转换,不改变发布-订阅链路
flux.map(user -> user.getName().toUpperCase())

// flatMap:异步转换,返回 Mono/Flux,最终被压平合并
flux.flatMap(user -> userService.findByName(user.getName()))
// flatMap 内部的 Mono 被"订阅"了,数据最终打平

flatMap 的坑:flatMap 内部的异步操作是并发执行的(不等一个完成就启动下一个),如果你需要顺序执行,用 concatMapflatMapSequential

1
2
3
4
5
6
7
8
// 并发执行(谁先完成谁先输出,不保证顺序)
flux.flatMap(this::asyncProcess)

// 顺序执行(严格按顺序,等一个完成再处理下一个)
flux.concatMap(this::asyncProcess)

// flatMapSequential:并发执行,但输出保持原顺序
flux.flatMapSequential(this::asyncProcess)

2.2.2 过滤类操作符

1
2
3
4
flux.filter(User::isActive)
.take(10) // 只取前10个
.distinct() // 去重
.skip(5) // 跳过前5个

take(n) 会在收到第 n 个元素后调用 cancel(),这是正确的背压行为。但要小心:如果在 take 之前做了缓存类操作符(如 collectList),take 不会触发取消,内存会先爆

2.2.3 错误处理操作符

1
2
3
4
5
6
7
8
9
10
11
12
// 1. onErrorReturn:出错时返回默认值
mono.onErrorReturn("DEFAULT")

// 2. onErrorResume:出错时切换到另一个 Mono
mono.onErrorResume(e -> Mono.just(fallbackValue))

// 3. onErrorMap:转换异常类型
mono.onErrorMap(e -> new BusinessException("处理失败", e))

// 4. retry / retryBackoff:重试
mono.retry(3)
mono.retryBackoff(3, Duration.ofSeconds(1))

retry 的坑:retry 会重新订阅上游,如果上游是 HTTP 调用,会重新发请求;如果上游是数据库查询,会重新执行 SQL。这些幂等操作才能 retry,非幂等的操作(如扣款)绝对不能 retry。

2.2.4 时间相关操作符

1
2
3
flux.take(Duration.ofSeconds(5))  // 5秒后自动cancel
flux.timeout(Duration.ofSeconds(3)) // 3秒没收到数据就报TimeoutException
flux.delayElements(Duration.ofMillis(100)) // 每个元素之间延迟100ms

delayElements 的坑:它是基于数据流内部时钟的,如果上游数据产生本身就慢,delayElements 不会帮你”加速”消费,只是增加了额外延迟。它不会跨元素累积等待时间。

2.2.5 组合类操作符

1
2
3
4
5
6
7
8
9
// merge vs concat:并发 vs 顺序
Flux.merge(source1, source2); // 并发合并,交替发射
Flux.concat(source1, source2); // 串联合并,先source1完成再source2

// zip:一一对应组合
Mono.zip(mono1, mono2, (a, b) -> a + b)

// combineLatest:任意一个变就输出最新组合
Flux.combineLatest(mono1, mono2, (a, b) -> a + b)

3. Spring WebFlux 实战:函数式端点开发

3.1 两种编程模型

Spring WebFlux 支持两套并存的编程模型:

  1. 基于注解的响应式模型@RestController + @GetMapping 等,标注驱动)
  2. 函数式路由模型RouterFunction + HandlerFunction,代码驱动)

函数式模型是趋势,原因:它是声明式的,路由规则是数据,可以被动态修改,适合构建 API Gateway 类的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
public class UserRouter {

@Bean
public RouterFunction<ServerResponse> route(UserHandler handler) {
return RouterFunctions.route(
GET("/users"), handler::listUsers)
.andRoute(GET("/users/{id}"), handler::getUser)
.andRoute(POST("/users"), handler::createUser)
.andRoute(PUT("/users/{id}"), handler::updateUser)
.andRoute(DELETE("/users/{id}"), handler::deleteUser);
}
}

3.2 Handler 正确返回类型

Handler 的每个方法接收 ServerRequest 返回 Mono<ServerResponse>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Component
public class UserHandler {

private final UserRepository userRepository;

public Mono<ServerResponse> listUsers(ServerRequest request) {
Flux<User> users = userRepository.findAll();
return ServerResponse.ok().body(users, User.class);
}

public Mono<ServerResponse> getUser(ServerRequest request) {
String id = request.pathVariable("id");
return userRepository.findById(Long.valueOf(id))
.flatMap(user -> ServerResponse.ok().bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}

public Mono<ServerResponse> createUser(ServerRequest request) {
return request.bodyToMono(User.class)
.flatMap(user -> userRepository.save(user))
.flatMap(saved -> ServerResponse
.status(HttpStatus.CREATED)
.bodyValue(saved));
}
}

注意ServerResponse.ok().body(users, User.class) 会自动调用 users.collectList(),因为 body() 方法签名是 Publisher<?> body(Publisher<?> publisher, Class<?> elementClass)

3.3 过滤器的坑:WebFilter 执行顺序

函数式端点使用 WebFilter 做横切关注点,过滤器的顺序非常关键

1
2
3
4
5
6
7
8
9
@Component
@Order(Ordered.HIGHEST_PRECEDENCE) // 数字越小越先执行
public class AuthenticationFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
// 认证逻辑
return chain.filter(exchange);
}
}

常见错误是假设过滤器执行顺序无关紧要。在响应式链路上,如果认证失败,应该直接 return ServerResponse.status(401).build().then(...),而不是调用 chain.filter(exchange) 后再判断——后者会导致未认证的请求继续流向下游。


4. 响应式数据库访问:R2DBC 与事务管理

4.1 R2DBC 是什么

R2DBC(Reactive Relational Database Connectivity)是 JDBC 的响应式替代品。它的设计目标:数据库交互全链路异步非阻塞,不阻塞任何线程

Spring Data R2DBC 提供了响应式的 Repository:

1
2
3
4
5
@Repository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
Mono<User> findByUsername(String username);
Flux<User> findByRole(String role);
}

R2DBC 的实现目前主流是 r2dbc-postgresqlr2dbc-mysql。注意:MySQL 的 R2DBC 驱动对批量操作支持不如 PostgreSQL 完善,如果吞吐量是核心指标,优先选 PostgreSQL。

4.2 事务管理

Spring WebFlux 的事务管理比 Spring MVC 更复杂,因为事务管理器本身需要支持响应式。最常用的是 ReactorTransactionalOperator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Service
public class UserService {

private final TransactionalOperator txOperator;

public UserService(TransactionManager r2dbcTransactionManager) {
this.txOperator = TransactionalOperator.from(r2dbcTransactionManager);
}

public Mono<Void> transfer(Long fromId, Long toId, BigDecimal amount) {
return userRepository.findByIdForUpdate(fromId) // SELECT ... FOR UPDATE
.flatMap(from -> userRepository.findByIdForUpdate(toId)
.flatMap(to -> {
if (from.getBalance().compareTo(amount) < 0) {
return Mono.error(new InsufficientBalanceException());
}
from.setBalance(from.getBalance().subtract(amount));
to.setBalance(to.getBalance().add(amount));
return userRepository.save(from)
.then(userRepository.save(to));
}))
.then() // 忘记 .then() 导致事务不提交
.as(txOperator::transactional);
}
}

这里有个大坑.as(txOperator::transactional) 是把当前 Mono 包装成事务的正确方式。但很多人忘了在链的最后加 .then()then() 的作用是等待前一步的 save 操作完成后再提交事务。如果不用 .then(),事务会在数据还没写进去之前就提交。

4.3 常见查询问题

R2DBC 不支持懒加载,关联查询需要手动做 join 或者分两次查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 错误:懒加载的关联对象永远是 null
user.findById(1L)
.map(user -> user.getOrders()) // 永远是 null,R2DBC没有懒加载

// 正确:用 join 或两次查询
// 方案1:JOIN
@Query("SELECT u.*, o.* FROM users u LEFT JOIN orders o ON u.id = o.user_id WHERE u.id = :id")
Flux<UserOrder> findUserWithOrders(Long id);

// 方案2:两次查询
userRepository.findById(1L)
.flatMap(user ->
orderRepository.findByUserId(user.getId())
.collectList()
.map(orders -> { user.setOrders(orders); return user; })
);

5. 常见坑:阻塞调用、热数据、心智模型

5.1 坑一:阻塞调用——最常见的性能杀手

响应式链路上,一个阻塞调用可以拖垮整个事件循环线程。

Spring WebFlux 默认使用 Netty 的事件循环线程(通常是 EventLoop 线程),如果在其中执行 Thread.sleep() 或者同步阻塞 I/O,整个事件循环会被卡住,导致该线程上所有其他请求全部 hang 住。

1
2
3
4
5
6
7
// 错误:在响应式链路里调用阻塞方法
public Mono<User> findById(Long id) {
return Mono.fromCallable(() -> {
// 这是阻塞的!绝对不能这样做
return jdbcTemplate.queryForObject(...); // JDBC 是阻塞的!
}).subscribeOn(Schedulers.boundedElastic()); // 虽然用了boundedElastic,但依然是阻塞
}

正确做法:所有数据库调用必须通过 R2DBC这类响应式驱动。如果必须桥接阻塞代码,用 publishOn(Schedulers.boundedElastic()) 显式地把阻塞调用放到专门的线程池:

1
2
Mono.fromCallable(() -> blockingOperation())
.publishOn(Schedulers.boundedElastic())

但这只是”不阻塞事件循环”,不是真正的非阻塞。如果可能,用响应式驱动替代所有阻塞调用

5.2 坑二:热数据 vs 冷数据

冷数据(Cold Publisher):每次订阅都重新执行,产生完整的数据序列。Flux.just()Mono.just()webClient.get() 都是冷数据。

热数据(Hot Publisher):没有订阅者时不产生数据,订阅时从当前时间开始接收数据,不重复历史数据。ReplayProcessorBehaviorProcessorSinks.Many 属于热数据。

1
2
3
4
5
6
7
8
9
10
11
// 冷数据:每次订阅都重新发HTTP请求
webClient.get().uri("/api/users").retrieve().bodyToFlux(User.class)
.subscribe(); // 第一次订阅,发请求
webClient.get().uri("/api/users").retrieve().bodyToFlux(User.class)
.subscribe(); // 第二次订阅,再发一次请求(浪费!)

// 热数据:多个订阅者共享同一份数据
Sinks.Many<User> sink = Sinks.many().multicast().onBackpressureBuffer();
sink.asFlux().subscribe(u -> System.out.println("A: " + u));
sink.asFlux().subscribe(u -> System.out.println("B: " + u));
sink.emitNext(user1, FAIL_FAST); // A和B都能收到

常见错误:把 HTTP 调用放在冷 Flux 里,然后在多个地方订阅——每次订阅都会重复发请求。如果需要”多个消费者共享同一份数据”,必须用热数据发布者。

5.3 坑三:心智模型——响应式不是”异步的同步写法”

很多人在初次接触响应式编程时,会把它当作”用 lambda 写的同步代码”,把 .map() 当作”赋值语句”,把 .flatMap() 当作”串行执行”。这个心智模型是错的。

正确的响应式心智模型

  1. 数据流思维:不是”调用一个方法获得一个结果”,而是”一条河流上有水流动,沿着河流有处理节点”。
  2. 订阅驱动:没有 .subscribe(),就没有任何数据流动。所有操作符只是描述”如何处理数据”,真正的执行从第一次 request 开始。
  3. 操作符组合优于嵌套flatMap 嵌套层数越多,调试越困难,尽量用操作符组合替代嵌套。
  4. 背压是消费者主导:不是生产者”推送”数据,是消费者”拉取”数据。

5.4 坑四:subscribe 位置决定线程模型

subscribeOnpublishOn 决定代码运行在哪个线程池。但它们的放置位置影响整个链路:

1
2
3
4
5
6
// 所有操作都在 boundedElastic 线程执行
mono.subscribeOn(Schedulers.boundedElastic())
.map(this::syncTransform) // 在 boundedElastic 执行
.filter(this::syncCheck) // 在 boundedElastic 执行
.publishOn(Schedulers.parallel()) // 之后切换到parallel线程
.map(this::asyncTransform) // 在 parallel 执行

subscribeOn 决定源头的执行线程,publishOn 决定其后的操作符的执行线程。只能有一个 subscribeOn(多个只有第一个生效),但可以有多个 publishOn(每次切换线程)。


6. 响应式与传统命令式代码对比

维度 命令式(Spring MVC) 响应式(Spring WebFlux)
线程模型 每请求一个线程 少量事件循环线程(固定数量)
阻塞 I/O 线程等待时浪费 不阻塞,线程始终在做有用功
背压 依赖容器(Tomcat maxThreads) 语言级背压,request(n) 精确控制
适用场景 CPU 密集型、业务逻辑复杂 I/O 密集、高并发、微服务间通信
调试难度 低,堆栈清晰 高,异步链路堆栈深
生态兼容 JDBC、JPA、Spring Data 全支持 R2DBC、部分 NoSQL、部分 Spring Data

不要为了”响应式”而响应式。如果你的系统瓶颈是 CPU 密集型的业务逻辑(如复杂计算、图像处理),Spring MVC 的同步模型反而更简单高效。如果你的系统 I/O 占比高(网络请求多、数据库查询频繁),响应式才能发挥优势。

Spring Boot 3.4.x 下,两者可以共存于同一个应用:Spring MVC 部分用 @RestController,WebFlux 部分用 RouterFunction,通过不同的 URL 路径区分。这是平滑过渡的推荐路径。


总结:响应式编程的实用建议

  1. 从小处开始:先在单个 endpoint 上试点,用 WebClient 替代 RestTemplate,用 R2DBC Repository 替代 JdbcTemplate,感受异步链路。
  2. 永远不要阻塞事件循环:所有可能的阻塞调用,必须显式指定线程池。
  3. 选好数据库:PostgreSQL + R2DBC 是目前最成熟的响应式关系型数据库方案。
  4. 用 Mono 替代 Flux 能用就用:更轻量,语义更清晰。
  5. 热数据要显式构造:需要多订阅者共享数据时,用 Sinks.Many 构造热发布者。
  6. 测试用 StepVerifierStepVerifier.create(mono).expectNextCount(5).verifyComplete() 是检验响应式代码的标准姿势。

响应式编程不是银弹,但它是 I/O 密集场景下的最优解之一。理解它的心智模型,用对地方,它会是你工具箱里最锋利的那把刀。


相关阅读:


(本文基于 Spring Boot 3.4.x + Project Reactor 3.7.x + Java 21 编写)