Java并发工具类:采购对账和库存汇总如何并行协作

Java 并发工具类解决的是线程之间的协作问题。供应链系统里,很多流程不是简单加锁,而是多个任务并行执行后汇总结果,或者限制同时访问某个下游系统的并发量。常用工具包括 CountDownLatchSemaphoreCompletableFuture

CountDownLatch:等待多个任务完成

采购对账时,需要同时加载三类数据:

  • 采购入库单。
  • 供应商发票。
  • 付款记录。

三类数据查询互不依赖,可以并行查询,最后汇总差异。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public ReconcileResult reconcile(long supplierId, LocalDate month) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);

AtomicReference<List<Receipt>> receiptsRef = new AtomicReference<>();
AtomicReference<List<Invoice>> invoicesRef = new AtomicReference<>();
AtomicReference<List<Payment>> paymentsRef = new AtomicReference<>();

executor.execute(() -> {
try {
receiptsRef.set(receiptService.query(supplierId, month));
} finally {
latch.countDown();
}
});

executor.execute(() -> {
try {
invoicesRef.set(invoiceService.query(supplierId, month));
} finally {
latch.countDown();
}
});

executor.execute(() -> {
try {
paymentsRef.set(paymentService.query(supplierId, month));
} finally {
latch.countDown();
}
});

latch.await();
return reconcileEngine.compare(receiptsRef.get(), invoicesRef.get(), paymentsRef.get());
}

这里的收益是缩短对账等待时间。原来三类数据串行查询,现在可以并行加载。

Semaphore:限制并发访问下游

物流轨迹同步可能调用承运商 API。承运商接口有 QPS 限制,不能因为系统里有大量线程就无限请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CarrierTrackClient {
private final Semaphore semaphore = new Semaphore(20);

public TrackInfo queryTrack(String trackingNo) {
boolean acquired = false;
try {
acquired = semaphore.tryAcquire(1, 2, TimeUnit.SECONDS);
if (!acquired) {
throw new BizException("承运商接口繁忙,请稍后重试");
}
return doQuery(trackingNo);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BizException("查询被中断");
} finally {
if (acquired) {
semaphore.release();
}
}
}
}

Semaphore 控制的是并发许可数。这里最多允许 20 个线程同时访问承运商接口,保护下游系统,也保护自己。

CompletableFuture:异步编排更清晰

CompletableFuture 适合多个异步任务组合。订单详情页需要同时展示订单基础信息、库存状态、物流轨迹、应收金额:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public OrderDetail detail(long orderId) {
CompletableFuture<Order> orderFuture =
CompletableFuture.supplyAsync(() -> orderService.get(orderId), executor);

CompletableFuture<InventoryView> inventoryFuture =
CompletableFuture.supplyAsync(() -> inventoryService.viewByOrder(orderId), executor);

CompletableFuture<TrackInfo> trackFuture =
CompletableFuture.supplyAsync(() -> trackService.queryByOrder(orderId), executor);

CompletableFuture<Receivable> receivableFuture =
CompletableFuture.supplyAsync(() -> financeService.receivable(orderId), executor);

CompletableFuture.allOf(orderFuture, inventoryFuture, trackFuture, receivableFuture).join();

return new OrderDetail(
orderFuture.join(),
inventoryFuture.join(),
trackFuture.join(),
receivableFuture.join()
);
}

注意要传入业务线程池,不要默认依赖 ForkJoinPool.commonPool(),否则不同业务会混用同一个公共线程池,排查困难。

异常处理

异步任务一定要处理异常:

1
2
3
4
CompletableFuture<TrackInfo> trackFuture =
CompletableFuture
.supplyAsync(() -> trackService.queryByOrder(orderId), executor)
.exceptionally(e -> TrackInfo.empty("物流轨迹暂不可用"));

供应链系统的详情页通常允许部分信息降级,比如物流轨迹临时失败不应该导致整个订单详情不可用。但结算、扣库存这类核心流程不能随意吞异常。

小结

并发工具类的价值是让线程协作更清晰。CountDownLatch 适合等待多个并行任务完成;Semaphore 适合限制下游并发;CompletableFuture 适合异步任务编排。供应链系统使用这些工具时,必须区分查询类流程和交易类流程:查询可以并行和降级,交易必须保证状态一致、异常可追踪。