串行与并行:高并发系统里的优雅接口设计

串行与并行:高并发系统里的优雅接口设计

Scroll Down

串行与并行:高并发系统里的优雅接口设计

最近做 CR(Code Review) 时发现:某个接口对同一下游服务发起了两次 Dubbo RPC 调用,但代码是串行等待。如果这些 RPC 之间没有真实依赖,端到端耗时会被“等待链”相加。本文从 Java 工程视角,讨论如何把串行编排升级为可控的并行扇出/汇总,并把超时、线程池隔离与降级兜底一起设计好。


1. 串行调用的问题,到底在哪?

先看一个最常见的串行伪代码:

Result a = rpcA(); // 100ms
Result b = rpcB(); // 120ms
Result c = rpcC(); // 80ms
return combine(a, b, c);

假设每个 RPC 平均 100ms 左右,三个串起来就是 300ms 左右。只要你是串行一步步等结果,整体耗时就是所有调用时间之和。

如果是 10 个 RPC 呢?
哪怕每个只要 80ms,串行下来也接近 800ms,用户基本能感觉到“卡了一下”。
如果你在一个高并发场景(比如大促、秒杀、首页加载)这么干,就很容易把自己卡死在下游调用上。

但有个关键点:很多 RPC 之间其实是互不依赖的。
既然不依赖,为什么一定要一个一个排队调用呢?


2. 能并行的调用,尽量别串行

最容易想到的优化是:把互不依赖的 RPC 并行化

假设有 10 个 RPC 调用,其中前 8 个互不依赖,最后 2 个依赖前面 8 个的结果。这其实是一个很典型的“扇出 + 汇总”的调用结构:

  • 第一层:扇出 8 个相互独立的 RPC 调用
  • 第二层:在拿到前 8 个结果之后,再调用 2 个依赖它们结果的 RPC

如果还是串行写法:

  • 第一层:8 × 80ms ≈ 640ms
  • 第二层:2 × 80ms ≈ 160ms
  • 总计接近 800ms

如果你把前 8 个变成并行调用

  • 第一层:max(8 个 RPC 耗时) ≈ 80~100ms
  • 第二层:2 × 80ms ≈ 160ms(假设这里还是串行)
  • 整体耗时:约 260ms 左右

同样 10 次调用,耗时直接砍掉三分之二。

很多人在第一次做这类优化时会发现:卡的往往不是下游速度,而是业务代码把“互不依赖”的调用强行串起来了(即同步等待思维)。


3. 如何优雅地实现并行?别把线程池用乱

知道“要并行”很容易,难的是“线程池/超时/异常语义如何统一”,否则很容易把吞吐换成排队。

常见的几个实现思路:

  • 自己开线程池 + Future
  • 使用语言提供的异步编程模型(如 Java CompletableFuture、Go goroutine + channel、Node.js Promise/async/await 等)
  • 使用框架封装好的并发工具或流水线框架

以 Java 为例,用 CompletableFuture 可以很自然地把“互不依赖的一批任务”并行化:

CompletableFuture<ResultA> fa = supplyAsync(this::rpcA, executor);
CompletableFuture<ResultB> fb = supplyAsync(this::rpcB, executor);
CompletableFuture<ResultC> fc = supplyAsync(this::rpcC, executor);

// 等待全部完成
CompletableFuture.allOf(fa, fb, fc).join();

ResultA a = fa.get();
ResultB b = fb.get();
ResultC c = fc.get();

return combine(a, b, c);

调用方写起来还是“同步思路”,但底层实际是并行执行的。
这就是所谓的:用同步风格写异步逻辑

工程上建议同时注意三点:

  • supplyAsync/异步执行必须显式传入业务隔离的 executor,避免落入公共线程池导致全局互相影响。
  • allOf(...).join() 只负责“等待全部完成”;真正取结果时按 get()/join() 的异常语义处理(join() 会把异常包装成 CompletionException)。
  • “超时快速返回”与“取消/中断底层任务”是两件事,只有调用链支持取消时,超时才可能真正释放资源。

4. 典型场景:8 个互不依赖 + 2 个有依赖

接下来把重点放到“依赖型后续”怎么落地:先把互不依赖的那 8 次 RPC 并行扇出拿到结果,再把这批结果做一次聚合/摘要,最后基于摘要触发依赖那 2 次调用。

工程上再把编排和执行分开即可:应用层负责“依赖编排”,基础设施层负责“并发执行 + 容错兜底”。

// 8 个互不依赖 + 2 个有依赖(依赖 summary 的后续示例)
// 演示用:给每个任务加超时和失败兜底(返回 InfoN.empty() / DepResultN.empty() 之类的默认值)
CompletableFuture<Info1> f1 = CompletableFuture
    .supplyAsync(this::rpc1, executor)
    .orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
    .exceptionally(ex -> Info1.empty());
CompletableFuture<Info2> f2 = CompletableFuture
    .supplyAsync(this::rpc2, executor)
    .orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
    .exceptionally(ex -> Info2.empty());
CompletableFuture<Info3> f3 = CompletableFuture
    .supplyAsync(this::rpc3, executor)
    .orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
    .exceptionally(ex -> Info3.empty());
CompletableFuture<Info4> f4 = CompletableFuture
    .supplyAsync(this::rpc4, executor)
    .orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
    .exceptionally(ex -> Info4.empty());
CompletableFuture<Info5> f5 = CompletableFuture
    .supplyAsync(this::rpc5, executor)
    .orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
    .exceptionally(ex -> Info5.empty());
CompletableFuture<Info6> f6 = CompletableFuture
    .supplyAsync(this::rpc6, executor)
    .orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
    .exceptionally(ex -> Info6.empty());
CompletableFuture<Info7> f7 = CompletableFuture
    .supplyAsync(this::rpc7, executor)
    .orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
    .exceptionally(ex -> Info7.empty());
CompletableFuture<Info8> f8 = CompletableFuture
    .supplyAsync(this::rpc8, executor)
    .orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
    .exceptionally(ex -> Info8.empty());

// 先汇总前 8 个结果(这一步会等待 f1~f8 全部完成)
CompletableFuture<Summary> summaryFuture = CompletableFuture
    .allOf(f1, f2, f3, f4, f5, f6, f7, f8)
    .thenApply(v -> buildSummary(
        f1.join(), f2.join(), f3.join(), f4.join(),
        f5.join(), f6.join(), f7.join(), f8.join()
    ));

// 再用 summary 触发后面 2 个“依赖型”调用:thenCompose 体现依赖关系
CompletableFuture<DepResult1> d1Future = summaryFuture.thenCompose(s ->
    CompletableFuture.supplyAsync(() -> rpcDep1(s), executor)
        .orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
        .exceptionally(ex -> DepResult1.empty())
);

CompletableFuture<DepResult2> d2Future = summaryFuture.thenCompose(s ->
    CompletableFuture.supplyAsync(() -> rpcDep2(s), executor)
        .orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
        .exceptionally(ex -> DepResult2.empty())
);

// 最后把依赖结果合并成统一返回
return d1Future.thenCombine(d2Future, this::combineFinal).join();

补充说明:上面示例里的 orTimeout 更准确地说是“让等待 future 的调用方快速返回一个超时完成结果”。它不等价于一定会在底层把正在执行的 RPC 立刻停掉;如果你的 RPC 框架支持取消/中断,建议把超时联动到 RPC 层的取消与资源释放,避免后台任务继续占用连接、线程或下游配额。

另外,exceptionally(...) 会把异常转换成默认值并“吞掉”部分堆栈信息;如果你把它当作降级兜底,务必在兜底分支里同步记录日志/指标/链路,方便定位尾延迟来源。


5. 并行不是越多越好:三个容易踩的坑

很多人一听并行,就想“能并行的全并行”。但实战中有三个常见坑:

补一句:并行也有开销。线程切换、任务调度、连接池争用都会带来额外成本,甚至可能让尾延迟(P99/P999)变差,所以要同时看平均耗时和延迟分布。

并行任务通常要落在“有界线程池”:合理设置 core/maxPoolSize、有界队列容量和拒绝策略;同时对“下游并发度”再加一层控制(例如 Semaphore/限流令牌),避免峰值把公共资源打爆。

  • 坑一:忽略下游的承载能力
    • 你本地一开 100 个线程并发打下游,看起来自己接口变快了
    • 但下游服务可能直接被你打挂,整体系统反而变慢
  • 坑二:线程池乱配,抢占宝贵资源
    • 一不小心搞了一堆大线程池,结果 CPU 都被你自己的业务线程抢走了
    • 其他核心服务的线程拿不到资源,系统整体吞吐下降
  • 坑三:超时与降级没设计好
    • 并行任务里,只要有一个卡住,整体就跟着被拖慢
    • 没有单任务超时失败快速返回降级兜底,并行很容易演变成“同时排队卡死”

补一句工程经验:并行优化通常先改善平均耗时,但更要盯住 P99/P999 的尾延迟,否则高峰期会出现“越并越慢”的反噬。

所以实践标准可以简单记为:

  • 能并行的尽量并行
  • 下游能扛的适度并行
  • 有兜底、有超时的安全并行

6. 从“写逻辑”到“设计调用拓扑”

很多同学写接口时,只是把它当作“写一段业务逻辑”。
但在高并发系统里,你其实是在设计调用拓扑

  • 哪些调用可以并行扇出?
  • 哪些调用有强依赖关系,只能串行?
  • 哪些调用是可选的、可以降级或缓存?
  • 哪些调用适合通过消息队列异步化?

当你用“调用拓扑图”的视角看一个接口时,会发现:

  • 串行与并行,只是拓扑上的边的时序关系
  • 优化接口性能,本质是在优化依赖图的结构 + 每条边的调用方式

7. 写在最后:把并行思维写进日常

以后你在 Code Review 里遇到这种“看起来没问题但就是串着等”的调用链,可以按这个顺序快速检查:

  • 先问依赖:这些 RPC 是否存在真实数据依赖?如果是“同屏展示/汇总”类接口,通常就能并行扇出。
  • 再做并行:能并行的优先并行,但要确保线程池/连接池隔离到位,避免把公共资源打满。
  • 最后要兜底:每个并行子任务是否有单任务超时与失败快速返回?超时是否能联动到 RPC 层的取消/资源释放(框架支持的话)?兜底结果是否足够让业务降级可用,并且产出可观测告警(日志/指标/链路)。

写代码时把“等待关系”画清楚、问对问题,你的系统就会更快也更稳。