SQL里面各种锁的原理

SQL 里面的锁,表面上看是数据库为了防止并发冲突做的限制,实际本质是数据库在并发读写之间做秩序管理。只要系统里存在多个事务同时读写同一批数据,就一定会遇到锁。锁设计得好,系统可以同时保证数据正确和较高吞吐;锁用得不好,轻则接口变慢,重则死锁、阻塞、库存扣错、订单状态错乱。

很多开发者第一次接触锁,是因为线上出现了 Lock wait timeout exceededDeadlock found。但真正理解锁,不能只背“共享锁、排他锁、行锁、表锁”这些名称,而要看清楚三个问题:锁保护的对象是什么、锁之间是否兼容、锁在事务什么时候加上和释放。

SQL 锁类型和事务执行流程

为什么数据库需要锁

数据库锁要解决的核心问题是并发一致性。

假设供应链系统里有一条库存记录:

1
2
3
sku_id = 1001
warehouse_id = 8
available_qty = 10

现在两个订单同时提交,每个订单都要锁定 8 件库存。如果两个事务都先读到 available_qty = 10,然后都认为库存足够,再分别把库存更新为 2,就会出现超卖。数据库必须让这两个更新按某种顺序执行,或者让其中一个事务发现条件已经不满足。

锁就是这个顺序的基础。它告诉数据库:某个事务正在读或写某个资源,其他事务能不能同时读、能不能同时写、要不要等待。

共享锁和排他锁

共享锁也叫 S 锁,英文是 Shared Lock。它表示当前事务要读取数据,并且希望读取期间数据不要被别人修改。

排他锁也叫 X 锁,英文是 Exclusive Lock。它表示当前事务要修改数据,其他事务不能同时修改,也通常不能再加共享锁读取同一行。

它们的兼容关系可以简单理解为:

已有锁 新申请共享锁 新申请排他锁
共享锁 兼容 不兼容
排他锁 不兼容 不兼容

共享锁之间兼容,是因为多个事务同时读同一行数据不会破坏数据。排他锁和任何锁都不兼容,是因为写操作必须独占资源。

在 MySQL InnoDB 中,可以通过下面的方式显式加锁:

1
2
3
SELECT * FROM inventory
WHERE sku_id = 1001
LOCK IN SHARE MODE;

或者:

1
2
3
SELECT * FROM inventory
WHERE sku_id = 1001
FOR UPDATE;

LOCK IN SHARE MODE 倾向于加共享锁,FOR UPDATE 会对命中的记录加排他锁。业务里更常见的是更新语句自动加排他锁:

1
2
3
4
UPDATE inventory
SET available_qty = available_qty - 8
WHERE sku_id = 1001
AND available_qty >= 8;

这条 SQL 在更新命中的记录时,会自动对相关记录加排他锁。

表锁和行锁

表锁保护的是整张表。一个事务锁住表以后,其他事务对这张表的读写可能都会受到影响。表锁粒度大,管理简单,但并发能力弱。

行锁保护的是某一行或某个索引范围。行锁粒度小,并发能力强,但实现复杂,也更容易出现死锁。

举个例子,ERP 系统里有一张 order 表。如果系统对整张订单表加表锁,那么一个用户修改订单时,其他用户可能连其他订单也无法修改。这对高并发系统非常不友好。

如果使用行锁,一个用户修改订单 A001,另一个用户修改订单 A002,两者互不影响。只有两个事务同时修改同一张订单时,才需要等待。

InnoDB 支持行级锁,但有一个非常重要的前提:行锁通常是加在索引上的。如果查询条件没有命中索引,数据库可能扫描大量记录,锁范围也会扩大,甚至表现得像锁了很多行。

例如:

1
2
3
UPDATE order_info
SET status = 'CLOSED'
WHERE order_no = 'SO20220917001';

如果 order_no 有唯一索引,InnoDB 可以精准锁住这一行。如果 order_no 没有索引,数据库需要扫描全表判断哪些行满足条件,锁冲突风险就会明显增加。

意向锁

意向锁是很多人容易忽略的一类锁。它不是直接锁某一行业务数据,而是用来协调表锁和行锁。

假设事务 A 已经对订单表中的某一行加了排他行锁。此时事务 B 想对整张订单表加表级排他锁。数据库必须知道表里是否已经有行锁,否则就要扫描整张表逐行检查,成本很高。

意向锁就是一个提示:某个事务打算在这张表里的某些行上加锁。

常见意向锁有:

  • IS,意向共享锁,表示事务准备在某些行上加共享锁。
  • IX,意向排他锁,表示事务准备在某些行上加排他锁。

当事务要给某行加共享锁时,会先在表上加 IS 锁。当事务要给某行加排他锁时,会先在表上加 IX 锁。

意向锁的价值是让表级锁判断冲突更快。它像是在表门口挂了一个牌子:里面已经有人在某些行上操作,整表加锁前先看看是否兼容。

记录锁

记录锁是 InnoDB 最容易理解的行锁,它锁住的是索引上的一条记录。

例如库存表有唯一索引:

1
UNIQUE KEY uk_sku_warehouse (sku_id, warehouse_id)

执行:

1
2
3
4
5
SELECT *
FROM inventory
WHERE sku_id = 1001
AND warehouse_id = 8
FOR UPDATE;

如果命中一条记录,InnoDB 会对这条索引记录加排他记录锁。其他事务再想更新同一条库存记录,就必须等待当前事务提交或回滚。

记录锁适合解决“同一行业务数据不能被并发修改”的问题,例如订单状态流转、库存数量变更、账户余额扣减。

间隙锁

间隙锁锁住的不是已经存在的记录,而是索引记录之间的空隙。它的目的主要是防止幻读。

假设库存预警表里已有预警阈值:

1
threshold: 10, 20, 50

一个事务执行范围查询:

1
2
3
4
SELECT *
FROM stock_warning_rule
WHERE threshold BETWEEN 10 AND 50
FOR UPDATE;

如果数据库只锁住 10、20、50 这几条已经存在的记录,另一个事务仍然可以插入 threshold = 30 的新记录。第一个事务再次查询时,就会发现多了一条之前不存在的数据,这就是幻读。

间隙锁会锁住索引范围中的空隙,让其他事务不能在这个范围里插入新记录。它牺牲了一部分并发能力,换取范围查询的一致性。

需要注意,间隙锁依赖索引范围。如果 SQL 没有合适索引,锁范围可能比预期大很多。

Next-Key Lock

Next-Key Lock 可以理解为记录锁加间隙锁。它既锁住已经存在的索引记录,也锁住记录前后的范围。

在 InnoDB 的可重复读隔离级别下,范围查询加锁时经常会使用 Next-Key Lock 来防止幻读。

例如:

1
2
3
4
5
SELECT *
FROM purchase_order
WHERE supplier_id = 88
AND amount BETWEEN 10000 AND 50000
FOR UPDATE;

如果 supplier_id, amount 上有联合索引,InnoDB 会锁定这个索引范围内的记录和间隙。其他事务不能随便插入符合这个范围的新采购单。

Next-Key Lock 的好处是一致性强,坏处是容易让范围更新、范围查询变得更容易互相阻塞。因此业务 SQL 要尽量让范围条件走合适索引,避免锁住过大的范围。

乐观锁

乐观锁不是数据库引擎内部固定的一种锁,而是一种并发控制思想。它假设冲突不常发生,所以不提前阻塞别人,而是在提交更新时检查数据有没有被别人改过。

最常见实现是版本号字段:

1
2
3
4
5
6
7
UPDATE inventory
SET available_qty = available_qty - 8,
version = version + 1
WHERE sku_id = 1001
AND warehouse_id = 8
AND available_qty >= 8
AND version = 12;

如果更新影响行数为 1,说明版本没变,扣减成功。如果影响行数为 0,说明数据已经被别人改过,当前事务需要重试或提示失败。

乐观锁适合读多写少、冲突概率低的场景。例如商品资料编辑、客户档案修改、配置项更新。它的优点是不会长时间阻塞,缺点是冲突发生时需要业务处理重试和失败提示。

悲观锁

悲观锁也是一种思想。它假设冲突很可能发生,所以在操作前先把数据锁住。

典型写法是:

1
2
3
4
5
SELECT *
FROM inventory
WHERE sku_id = 1001
AND warehouse_id = 8
FOR UPDATE;

当前事务拿到锁以后,再计算库存、写入订单、更新库存。其他事务想修改同一条库存记录,就必须等待。

悲观锁适合写冲突高、数据不能错的场景,例如库存扣减、余额扣减、核心单据状态流转。缺点也明显:事务时间越长,等待越多,吞吐越低。

所以悲观锁一定要控制事务范围。不要在持有锁期间调用外部接口、发送 MQ、请求第三方系统,也不要在事务里做复杂计算。

元数据锁

元数据锁也叫 MDL,Metadata Lock。它保护的是表结构,而不是具体业务行。

当一个事务正在查询或修改某张表时,数据库会持有这张表的元数据锁,防止另一个会话同时修改表结构。否则就可能出现一个事务读表读到一半,另一个事务把字段删了。

常见问题是:一个长事务一直不提交,导致 ALTER TABLE 等 DDL 操作被阻塞;DDL 又反过来阻塞后续普通查询,最后形成一串等待。

例如:

1
2
3
BEGIN;
SELECT * FROM order_info WHERE id = 1;
-- 长时间不提交

此时另一个会话执行:

1
ALTER TABLE order_info ADD COLUMN source_type varchar(32);

DDL 可能会等待前面的事务释放 MDL。后续新的查询又可能排在 DDL 后面,导致业务接口突然大面积变慢。

线上做 DDL 时,必须关注长事务和元数据锁等待。大表变更最好使用在线 DDL 工具或低峰期执行。

自增锁

自增锁用于处理自增主键分配。多个事务同时插入数据时,数据库要保证自增 ID 不重复。

InnoDB 对自增锁做过很多优化,不同配置下表现不同。简单理解,普通插入通常可以较快分配自增值;批量插入、INSERT ... SELECT 这类语句可能持有自增相关锁更久。

业务上不建议依赖自增 ID 的连续性。事务回滚、插入失败、并发插入都可能造成 ID 跳号。自增 ID 的目标是唯一和大体递增,不是绝对连续。

死锁是怎么发生的

死锁是两个或多个事务互相等待对方释放锁。

例如供应链系统里同时更新订单和库存:

事务 A:

1
2
1. 锁订单 O1001
2. 再锁库存 S1001

事务 B:

1
2
1. 锁库存 S1001
2. 再锁订单 O1001

事务 A 拿到了订单锁,等待库存锁。事务 B 拿到了库存锁,等待订单锁。双方都不释放,就形成死锁。

数据库通常会检测死锁,并主动回滚其中一个事务。业务系统看到的就是死锁异常。

减少死锁的关键方法是:

  • 多表更新保持固定顺序。
  • 批量更新时按主键排序。
  • 事务尽量短。
  • 查询条件命中索引,减少锁范围。
  • 避免在事务中做远程调用。
  • 捕获死锁异常,对幂等操作做有限重试。

怎么排查锁等待

线上出现锁等待时,不要只盯着慢 SQL。要看谁在等锁,谁持有锁,事务已经执行了多久。

MySQL 里常用的排查方向包括:

1
SHOW PROCESSLIST;

查看当前连接状态。

1
SHOW ENGINE INNODB STATUS;

查看最近死锁、锁等待、事务信息。

在 MySQL 8 中,也可以通过 performance_schemasys 库查看锁等待关系。

排查时重点看:

  • 哪个事务持有锁。
  • 持锁事务执行了多久。
  • 等待的 SQL 是什么。
  • 是否存在未提交长事务。
  • SQL 是否走了索引。
  • 是否有 DDL 和普通业务 SQL 互相阻塞。

实际开发里的用锁建议

第一,能用一条原子 SQL 解决的,不要拆成先查再改。

库存扣减推荐写成:

1
2
3
4
5
6
UPDATE inventory
SET available_qty = available_qty - 8,
locked_qty = locked_qty + 8
WHERE sku_id = 1001
AND warehouse_id = 8
AND available_qty >= 8;

然后根据影响行数判断是否成功。

第二,加锁查询必须有合适索引。

FOR UPDATE 不是魔法。如果条件没有索引,锁范围会扩大,性能和并发都会出问题。

第三,事务里只放必须保持一致的操作。

订单创建、库存锁定、订单主表写入可以放在事务里;短信通知、日志上报、消息推送应该放到事务提交后。

第四,统一更新顺序。

如果业务规定先锁订单,再锁库存,再锁财务单据,那么所有代码都要遵守这个顺序。不要一个接口先锁订单,另一个接口先锁库存。

第五,锁冲突高的热点数据要做业务拆分。

例如某个爆款 SKU 的库存行成为热点,可以按仓库、批次、库存桶拆分,降低单行竞争。

总结

SQL 里的锁不是孤立概念,而是一套并发控制体系。共享锁和排他锁决定读写是否兼容,表锁和行锁决定锁粒度,意向锁协调表级和行级锁,记录锁保护已有记录,间隙锁和 Next-Key Lock 保护索引范围,乐观锁和悲观锁是两种业务并发控制思想,元数据锁保护表结构,自增锁保证自增值分配。

真正写业务代码时,最重要的不是记住所有锁名,而是控制三个东西:索引、事务范围、更新顺序。索引决定锁得准不准,事务范围决定锁持有多久,更新顺序决定是否容易死锁。把这三点做好,绝大多数 SQL 锁问题都会少很多。

Java并发集合与排查:仓储任务并发安全怎么落地

Java 多线程最终要落到数据结构和排查能力上。供应链系统里,仓储任务调度、库存缓存、波次队列、接口指标都会用到并发集合和原子类。选择正确的数据结构,可以减少手写锁;具备排查能力,才能在线上出现卡顿时定位问题。

ConcurrentHashMap:本地任务状态缓存

仓储系统可能需要缓存正在处理的上架任务,避免同一个任务在当前实例内重复提交:

1
2
3
4
5
6
7
8
9
10
11
public class PutawayTaskRegistry {
private final ConcurrentHashMap<Long, PutawayTask> runningTasks = new ConcurrentHashMap<>();

public boolean register(PutawayTask task) {
return runningTasks.putIfAbsent(task.id(), task) == null;
}

public void unregister(Long taskId) {
runningTasks.remove(taskId);
}
}

putIfAbsent 是原子操作。多个线程同时注册同一个任务,只有一个会成功。

注意,这只保护当前 JVM。多实例部署时,仍然要靠数据库状态条件防重:

1
2
3
4
UPDATE scm_putaway_task
SET status = 'RUNNING'
WHERE id = #{taskId}
AND status = 'WAITING';

BlockingQueue:生产者消费者

仓库波次任务可以用阻塞队列实现生产者消费者:

1
2
3
4
5
6
7
8
9
10
11
public class WaveDispatchQueue {
private final BlockingQueue<WaveTask> queue = new ArrayBlockingQueue<>(1000);

public void submit(WaveTask task) throws InterruptedException {
queue.put(task);
}

public WaveTask take() throws InterruptedException {
return queue.take();
}
}

生产者生成波次任务,消费者线程处理任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void startWorkers() {
for (int i = 0; i < 8; i++) {
executor.execute(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
WaveTask task = queue.take();
waveService.dispatch(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}
}

阻塞队列的好处是自然支持等待和唤醒,不需要自己写 wait/notify

CopyOnWriteArrayList:读多写少配置

仓库作业规则通常读多写少,比如上架策略规则:

1
2
3
4
5
6
7
8
9
10
11
12
public class PutawayRuleHolder {
private final CopyOnWriteArrayList<PutawayRule> rules = new CopyOnWriteArrayList<>();

public List<PutawayRule> currentRules() {
return rules;
}

public void reload(List<PutawayRule> latest) {
rules.clear();
rules.addAll(latest);
}
}

CopyOnWriteArrayList 写入时复制数组,读取时不加锁。它适合规则、配置、监听器列表,不适合高频写入的数据。

CAS 和 AtomicReference

本地策略快照可以用 AtomicReference 原子替换:

1
2
3
4
5
6
7
8
9
10
11
12
public class RoutingPolicyCache {
private final AtomicReference<RoutingPolicy> policyRef =
new AtomicReference<>(RoutingPolicy.defaultPolicy());

public RoutingPolicy current() {
return policyRef.get();
}

public void refresh(RoutingPolicy latest) {
policyRef.set(latest);
}
}

如果要防止旧版本覆盖新版本:

1
2
3
4
5
6
7
8
9
10
11
public boolean refreshIfNewer(RoutingPolicy latest) {
while (true) {
RoutingPolicy current = policyRef.get();
if (latest.version() <= current.version()) {
return false;
}
if (policyRef.compareAndSet(current, latest)) {
return true;
}
}
}

这适合单 JVM 内的配置引用。业务库存余额不应该用本地 CAS 保存,因为库存需要跨实例一致、事务回滚和审计。

死锁排查

Java 死锁常见于多个线程以不同顺序获取锁。例如两个仓储任务同时锁两个库位:

1
2
3
4
5
synchronized (binA) {
synchronized (binB) {
transfer();
}
}

另一个线程反过来:

1
2
3
4
5
synchronized (binB) {
synchronized (binA) {
transfer();
}
}

解决办法是固定加锁顺序:

1
2
3
4
5
6
7
8
9
10
public void transfer(Bin from, Bin to) {
Bin first = from.id() < to.id() ? from : to;
Bin second = from.id() < to.id() ? to : from;

synchronized (first) {
synchronized (second) {
doTransfer(from, to);
}
}
}

线上排查可以用:

1
jstack <pid>

或者 Arthas:

1
thread -b

重点看线程是否大量 BLOCKED,以及堆栈里等待的是哪把锁。

小结

并发集合和原子类能减少手写锁,让并发代码更清晰。供应链系统里,ConcurrentHashMap 适合本地任务注册,BlockingQueue 适合生产者消费者,CopyOnWriteArrayList 适合读多写少规则,AtomicReference 适合配置快照替换。核心业务数据仍然要用数据库状态机和事务保护。线上卡顿时,要能用 jstack、Arthas 定位线程阻塞和死锁。

Java并发工具类:采购对账和库存汇总如何并行协作

Java 并发工具类解决的是线程之间的协作问题。供应链系统里,很多流程不是简单加锁,而是多个任务并行执行后汇总结果,或者限制同时访问某个下游系统的并发量。常用工具包括 CountDownLatchSemaphoreCompletableFuture

CountDownLatch:等待多个任务完成

采购对账时,需要同时加载三类数据:

  • 采购入库单。
  • 供应商发票。
  • 付款记录。

三类数据查询互不依赖,可以并行查询,最后汇总差异。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public ReconcileResult reconcile(long supplierId, LocalDate month) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);

AtomicReference<List<Receipt>> receiptsRef = new AtomicReference<>();
AtomicReference<List<Invoice>> invoicesRef = new AtomicReference<>();
AtomicReference<List<Payment>> paymentsRef = new AtomicReference<>();

executor.execute(() -> {
try {
receiptsRef.set(receiptService.query(supplierId, month));
} finally {
latch.countDown();
}
});

executor.execute(() -> {
try {
invoicesRef.set(invoiceService.query(supplierId, month));
} finally {
latch.countDown();
}
});

executor.execute(() -> {
try {
paymentsRef.set(paymentService.query(supplierId, month));
} finally {
latch.countDown();
}
});

latch.await();
return reconcileEngine.compare(receiptsRef.get(), invoicesRef.get(), paymentsRef.get());
}

这里的收益是缩短对账等待时间。原来三类数据串行查询,现在可以并行加载。

Semaphore:限制并发访问下游

物流轨迹同步可能调用承运商 API。承运商接口有 QPS 限制,不能因为系统里有大量线程就无限请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CarrierTrackClient {
private final Semaphore semaphore = new Semaphore(20);

public TrackInfo queryTrack(String trackingNo) {
boolean acquired = false;
try {
acquired = semaphore.tryAcquire(1, 2, TimeUnit.SECONDS);
if (!acquired) {
throw new BizException("承运商接口繁忙,请稍后重试");
}
return doQuery(trackingNo);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BizException("查询被中断");
} finally {
if (acquired) {
semaphore.release();
}
}
}
}

Semaphore 控制的是并发许可数。这里最多允许 20 个线程同时访问承运商接口,保护下游系统,也保护自己。

CompletableFuture:异步编排更清晰

CompletableFuture 适合多个异步任务组合。订单详情页需要同时展示订单基础信息、库存状态、物流轨迹、应收金额:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public OrderDetail detail(long orderId) {
CompletableFuture<Order> orderFuture =
CompletableFuture.supplyAsync(() -> orderService.get(orderId), executor);

CompletableFuture<InventoryView> inventoryFuture =
CompletableFuture.supplyAsync(() -> inventoryService.viewByOrder(orderId), executor);

CompletableFuture<TrackInfo> trackFuture =
CompletableFuture.supplyAsync(() -> trackService.queryByOrder(orderId), executor);

CompletableFuture<Receivable> receivableFuture =
CompletableFuture.supplyAsync(() -> financeService.receivable(orderId), executor);

CompletableFuture.allOf(orderFuture, inventoryFuture, trackFuture, receivableFuture).join();

return new OrderDetail(
orderFuture.join(),
inventoryFuture.join(),
trackFuture.join(),
receivableFuture.join()
);
}

注意要传入业务线程池,不要默认依赖 ForkJoinPool.commonPool(),否则不同业务会混用同一个公共线程池,排查困难。

异常处理

异步任务一定要处理异常:

1
2
3
4
CompletableFuture<TrackInfo> trackFuture =
CompletableFuture
.supplyAsync(() -> trackService.queryByOrder(orderId), executor)
.exceptionally(e -> TrackInfo.empty("物流轨迹暂不可用"));

供应链系统的详情页通常允许部分信息降级,比如物流轨迹临时失败不应该导致整个订单详情不可用。但结算、扣库存这类核心流程不能随意吞异常。

小结

并发工具类的价值是让线程协作更清晰。CountDownLatch 适合等待多个并行任务完成;Semaphore 适合限制下游并发;CompletableFuture 适合异步任务编排。供应链系统使用这些工具时,必须区分查询类流程和交易类流程:查询可以并行和降级,交易必须保证状态一致、异常可追踪。

Java线程池实战:订单履约任务如何稳定提吞吐

线程池是 Java 多线程在业务系统中最常用的落地方式。它解决两个问题:复用线程,减少创建销毁成本;限制并发,避免请求无限堆积压垮系统。供应链系统里的订单履约、库存同步、物流轨迹拉取、报表生成,都应该用线程池管理并发。

不要无限创建线程

错误示例:

1
2
3
for (Long orderId : orderIds) {
new Thread(() -> fulfillmentService.fulfill(orderId)).start();
}

如果一次批处理有 5000 个订单,这段代码会创建 5000 个线程。线程本身占内存,调度也有成本,下游数据库、WMS、TMS 都可能被打爆。

应该使用 ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ThreadPoolExecutor executor = new ThreadPoolExecutor(
8,
16,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadFactory() {
private final AtomicInteger index = new AtomicInteger();

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "fulfillment-worker-" + index.incrementAndGet());
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);

这个线程池最多 16 个工作线程,队列最多缓存 1000 个任务。超过能力后,CallerRunsPolicy 会让提交任务的线程自己执行,形成反压。

七个核心参数

ThreadPoolExecutor 的关键参数包括:

  • corePoolSize:核心线程数。
  • maximumPoolSize:最大线程数。
  • keepAliveTime:非核心线程空闲多久回收。
  • unit:时间单位。
  • workQueue:任务队列。
  • threadFactory:线程工厂。
  • handler:拒绝策略。

供应链系统里,线程数不是越大越好。订单履约通常包含数据库、库存服务、仓储服务、物流服务调用,属于 IO 密集型,可以适度提高线程数。但如果下游 TMS 每秒只允许 100 次请求,线程池再大也只会制造超时。

订单履约 demo

批量履约订单时,可以这样提交任务:

1
2
3
4
5
6
7
8
9
10
11
public void fulfillBatch(List<Long> orderIds) {
for (Long orderId : orderIds) {
executor.execute(() -> {
try {
fulfillmentService.fulfill(orderId);
} catch (Exception e) {
fulfillmentLogService.recordFailed(orderId, e.getMessage());
}
});
}
}

任务内部要保证幂等:

1
2
3
4
5
6
7
8
9
10
11
@Transactional
public void fulfill(Long orderId) {
int affected = orderMapper.markFulfilling(orderId);
if (affected != 1) {
return;
}

inventoryService.reserve(orderId);
warehouseTaskService.createPickTask(orderId);
orderMapper.markFulfilled(orderId);
}

对应状态更新:

1
2
3
4
UPDATE scm_sales_order
SET status = 'FULFILLING'
WHERE id = #{orderId}
AND status = 'WAIT_FULFILL';

线程池负责并发执行,数据库状态机负责避免重复履约。

队列选择

常见队列:

  • ArrayBlockingQueue:有界数组队列,容量固定,适合明确限流。
  • LinkedBlockingQueue:链表队列,可有界也可无界;业务中必须设置容量。
  • SynchronousQueue:不存储任务,直接移交线程,适合快速扩容线程的场景。
  • PriorityBlockingQueue:优先级队列,适合高优先级订单先处理。

供应链系统建议优先使用有界队列。无界队列在高峰期会隐藏问题,直到内存被耗尽。

监控线程池

线程池必须监控:

1
2
3
4
5
6
7
8
public ThreadPoolStats stats() {
return new ThreadPoolStats(
executor.getPoolSize(),
executor.getActiveCount(),
executor.getQueue().size(),
executor.getCompletedTaskCount()
);
}

核心指标:

  • 活跃线程数是否长期接近最大线程数。
  • 队列长度是否持续增长。
  • 拒绝任务是否出现。
  • 单任务耗时是否变长。

如果队列持续增长,不要只加线程。要确认瓶颈是数据库、外部接口、锁等待还是代码慢。

小结

线程池的作用是稳定地控制并发,而不是无上限地提高并发。供应链系统中,线程池适合订单履约、库存同步、物流轨迹拉取等后台任务。设计时要使用有界队列、明确拒绝策略、处理任务异常、保证业务幂等,并持续监控线程池运行状态。

Java内存模型与锁:库存同步中的可见性和互斥

Java 多线程的核心问题可以归纳为三类:原子性、可见性、有序性。Java 内存模型,也就是 JMM,定义了线程之间如何看见彼此的写入,以及哪些同步动作能建立 happens-before 关系。供应链系统里,库存同步、价格缓存、仓库配置刷新都离不开这些基础。

可见性问题

库存同步任务通常有一个停止标志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class InventorySyncJob implements Runnable {
private boolean running = true;

public void stop() {
running = false;
}

@Override
public void run() {
while (running) {
syncOnce();
}
}
}

管理线程调用 stop() 后,工作线程不一定马上看到 running = false。这就是可见性问题。

可以使用 volatile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class InventorySyncJob implements Runnable {
private volatile boolean running = true;

public void stop() {
running = false;
}

@Override
public void run() {
while (running) {
syncOnce();
}
}
}

volatile 保证写入对其他线程可见,并限制相关指令重排。它适合任务开关、配置引用、状态标志。

volatile 不保证复合操作原子性

下面这个计数器是错误的:

1
2
3
4
5
6
7
public class SyncCounter {
private volatile int successCount;

public void success() {
successCount++;
}
}

successCount++ 包含读取、加一、写回三个步骤。多个线程同时执行会丢失更新。正确方式是使用原子类:

1
2
3
4
5
6
7
public class SyncCounter {
private final AtomicInteger successCount = new AtomicInteger();

public void success() {
successCount.incrementAndGet();
}
}

如果是高并发指标统计,可以用 LongAdder

1
2
3
4
5
6
7
8
9
10
11
public class SyncCounter {
private final LongAdder successCount = new LongAdder();

public void success() {
successCount.increment();
}

public long value() {
return successCount.sum();
}
}

synchronized 解决互斥

如果多个线程要修改同一个本地库存快照 Map,需要互斥控制:

1
2
3
4
5
6
7
8
9
10
11
public class InventorySnapshotCache {
private final Map<String, Integer> cache = new HashMap<>();

public synchronized void put(String key, Integer qty) {
cache.put(key, qty);
}

public synchronized Integer get(String key) {
return cache.get(key);
}
}

synchronized 修饰实例方法时,锁对象是 this。同一个对象上的同步方法互斥。

更推荐使用私有锁对象,避免外部代码锁住当前实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class InventorySnapshotCache {
private final Object lock = new Object();
private final Map<String, Integer> cache = new HashMap<>();

public void put(String key, Integer qty) {
synchronized (lock) {
cache.put(key, qty);
}
}

public Integer get(String key) {
synchronized (lock) {
return cache.get(key);
}
}
}

锁的边界

供应链系统一般是多实例部署。synchronized 只能保护当前 JVM 内存,不能保护数据库里的库存余额。如果两个应用实例同时扣同一条库存,Java 本地锁没有任何作用。

库存扣减必须落到数据库条件更新:

1
2
3
4
5
6
UPDATE scm_inventory
SET available_qty = available_qty - #{qty},
locked_qty = locked_qty + #{qty}
WHERE warehouse_id = #{warehouseId}
AND sku_id = #{skuId}
AND available_qty >= #{qty};

Java 锁适合保护本地缓存、内存队列、对象状态;数据库事务和行锁负责保护最终业务数据。

小结

JMM 是理解 Java 多线程的基础。volatile 解决可见性和有序性,不解决复合操作原子性;synchronized 解决单 JVM 内共享状态的互斥;数据库锁解决跨实例的业务数据一致性。供应链系统里要明确每把锁保护的对象,不能用本地锁替代数据库并发控制。