Java 多线程最终要落到数据结构和排查能力上。供应链系统里,仓储任务调度、库存缓存、波次队列、接口指标都会用到并发集合和原子类。选择正确的数据结构,可以减少手写锁;具备排查能力,才能在线上出现卡顿时定位问题。
ConcurrentHashMap:本地任务状态缓存
仓储系统可能需要缓存正在处理的上架任务,避免同一个任务在当前实例内重复提交:
1 2 3 4 5 6 7 8 9 10 11
| public class PutawayTaskRegistry { private final ConcurrentHashMap<Long, PutawayTask> runningTasks = new ConcurrentHashMap<>();
public boolean register(PutawayTask task) { return runningTasks.putIfAbsent(task.id(), task) == null; }
public void unregister(Long taskId) { runningTasks.remove(taskId); } }
|
putIfAbsent 是原子操作。多个线程同时注册同一个任务,只有一个会成功。
注意,这只保护当前 JVM。多实例部署时,仍然要靠数据库状态条件防重:
1 2 3 4
| UPDATE scm_putaway_task SET status = 'RUNNING' WHERE id = #{taskId} AND status = 'WAITING';
|
BlockingQueue:生产者消费者
仓库波次任务可以用阻塞队列实现生产者消费者:
1 2 3 4 5 6 7 8 9 10 11
| public class WaveDispatchQueue { private final BlockingQueue<WaveTask> queue = new ArrayBlockingQueue<>(1000);
public void submit(WaveTask task) throws InterruptedException { queue.put(task); }
public WaveTask take() throws InterruptedException { return queue.take(); } }
|
生产者生成波次任务,消费者线程处理任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public void startWorkers() { for (int i = 0; i < 8; i++) { executor.execute(() -> { while (!Thread.currentThread().isInterrupted()) { try { WaveTask task = queue.take(); waveService.dispatch(task); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); } }
|
阻塞队列的好处是自然支持等待和唤醒,不需要自己写 wait/notify。
CopyOnWriteArrayList:读多写少配置
仓库作业规则通常读多写少,比如上架策略规则:
1 2 3 4 5 6 7 8 9 10 11 12
| public class PutawayRuleHolder { private final CopyOnWriteArrayList<PutawayRule> rules = new CopyOnWriteArrayList<>();
public List<PutawayRule> currentRules() { return rules; }
public void reload(List<PutawayRule> latest) { rules.clear(); rules.addAll(latest); } }
|
CopyOnWriteArrayList 写入时复制数组,读取时不加锁。它适合规则、配置、监听器列表,不适合高频写入的数据。
CAS 和 AtomicReference
本地策略快照可以用 AtomicReference 原子替换:
1 2 3 4 5 6 7 8 9 10 11 12
| public class RoutingPolicyCache { private final AtomicReference<RoutingPolicy> policyRef = new AtomicReference<>(RoutingPolicy.defaultPolicy());
public RoutingPolicy current() { return policyRef.get(); }
public void refresh(RoutingPolicy latest) { policyRef.set(latest); } }
|
如果要防止旧版本覆盖新版本:
1 2 3 4 5 6 7 8 9 10 11
| public boolean refreshIfNewer(RoutingPolicy latest) { while (true) { RoutingPolicy current = policyRef.get(); if (latest.version() <= current.version()) { return false; } if (policyRef.compareAndSet(current, latest)) { return true; } } }
|
这适合单 JVM 内的配置引用。业务库存余额不应该用本地 CAS 保存,因为库存需要跨实例一致、事务回滚和审计。
死锁排查
Java 死锁常见于多个线程以不同顺序获取锁。例如两个仓储任务同时锁两个库位:
1 2 3 4 5
| synchronized (binA) { synchronized (binB) { transfer(); } }
|
另一个线程反过来:
1 2 3 4 5
| synchronized (binB) { synchronized (binA) { transfer(); } }
|
解决办法是固定加锁顺序:
1 2 3 4 5 6 7 8 9 10
| public void transfer(Bin from, Bin to) { Bin first = from.id() < to.id() ? from : to; Bin second = from.id() < to.id() ? to : from;
synchronized (first) { synchronized (second) { doTransfer(from, to); } } }
|
线上排查可以用:
或者 Arthas:
重点看线程是否大量 BLOCKED,以及堆栈里等待的是哪把锁。
小结
并发集合和原子类能减少手写锁,让并发代码更清晰。供应链系统里,ConcurrentHashMap 适合本地任务注册,BlockingQueue 适合生产者消费者,CopyOnWriteArrayList 适合读多写少规则,AtomicReference 适合配置快照替换。核心业务数据仍然要用数据库状态机和事务保护。线上卡顿时,要能用 jstack、Arthas 定位线程阻塞和死锁。