想象这样一个场景:你组织了一场朋友聚会,需要等所有人都到齐才能开始切蛋糕。CountDownLatch 就像那个站在门口数人数的朋友,每到一个就减少一个数字,直到人数归零才宣布派对正式开始。
1.1 CountDownLatch 核心原理与工作机制
CountDownLatch 是 Java 并发包中的一个同步工具类。它的工作方式很直观——初始化时设定一个计数值,每当有线程完成特定任务时调用 countDown() 方法,计数器就会减1。当计数器归零时,所有在 await() 方法上等待的线程都会被唤醒继续执行。
这个机制让我想起去年参与的一个项目。我们需要在系统启动时加载多个配置模块,只有所有配置都加载完成后才能对外提供服务。使用 CountDownLatch 后,主线程只需简单调用 await() 等待,各个配置加载线程完成后调用 countDown(),代码简洁性和可读性都得到了很大提升。
它的内部实现基于 AQS(AbstractQueuedSynchronizer),通过状态值来维护当前计数。这种设计保证了线程安全性,同时避免了复杂的锁竞争问题。
1.2 CountDownLatch 主要方法详解
构造函数 CountDownLatch(int count) 创建时必须指定初始计数值,这个值一旦设定就不能重新修改。计数值必须是非负整数,否则会抛出 IllegalArgumentException。
await() 系列方法 - await(): 使当前线程等待直到计数器归零 - await(long timeout, TimeUnit unit): 带超时机制的等待,避免无限期阻塞
countDown() 方法 每次调用都会使计数器减1,当计数器达到零时,所有等待的线程都会被释放。这个方法可以在任何线程中调用,不受创建 CountDownLatch 的线程限制。
记得我第一次使用时犯了个错误,在多个线程中重复调用 countDown(),结果计数器变成了负数。实际上 CountDownLatch 的设计很巧妙,当计数器已经为零时,后续的 countDown() 调用不会产生任何效果,也不会抛出异常。
1.3 CountDownLatch 适用场景分析
任务分治与结果汇总 当需要将一个大任务拆分成多个子任务并行执行,然后等待所有子任务完成后再进行结果汇总时,CountDownLatch 是理想选择。比如数据报表生成,多个数据源并行查询,最后统一生成报表。
服务启动依赖管理 在微服务架构中,经常需要等待多个依赖服务都启动成功后才能对外提供服务。使用 CountDownLatch 可以优雅地实现这种启动协调。
测试中的并发控制 在单元测试中,需要模拟多个线程同时执行某个操作的场景。CountDownLatch 可以精确控制线程的启动时机和等待条件。
不过要注意,CountDownLatch 是一次性的,计数器归零后就不能重复使用。如果需要可重复的同步机制,可能需要考虑 CyclicBarrier。这个限制在实际使用中需要特别注意,我曾经就因为忽略这一点而遇到了一个难以发现的 bug。 public class UserDataProcessor {
private final ExecutorService executor = Executors.newFixedThreadPool(5);
public void processBatchUsers(List<User> users) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(users.size());
for (User user : users) {
executor.submit(() -> {
try {
processSingleUser(user); // 处理单个用户数据
} finally {
latch.countDown(); // 无论如何都要减少计数
}
});
}
latch.await(); // 等待所有用户处理完成
generateSummaryReport(); // 生成汇总报告
}
}
// CountDownLatch 示例:主线程等待工作线程完成 CountDownLatch latch = new CountDownLatch(3); for (int i = 0; i < 3; i++) {
new Thread(() -> {
doWork();
latch.countDown();
}).start();
} latch.await(); // 主线程等待所有工作线程完成 proceedToNextPhase();
// CyclicBarrier 示例:所有线程相互等待 CyclicBarrier barrier = new CyclicBarrier(3); for (int i = 0; i < 3; i++) {
new Thread(() -> {
doWork();
barrier.await(); // 所有线程在这里相互等待
proceedToNextPhase();
}).start();
}