并发编程之:CountDownLatch
大家好,我是小黑,一个在互联网苟且偷生的农民工。
先问大家一个问题,在主线程中创建多个线程,在这多个线程被启动之后,主线程需要等子线程执行完之后才能接着执行自己的代码,应该怎么实现呢?
Thread.join()
看过我 并发编程之:线程 的朋友应该知道怎么做,在Thread类中有一个方法join(),这个方法是一个阻塞方法,当前线程会等待调动join()方法的线程死亡之后再继续执行。
我们通过代码来看看执行结果。
public class JoinDemo { public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 100; i++) { Thread t = new Thread(() -> { System.out.println(Thread.currentThread().getName() + " run ~"); }); t.start(); t.join(); } System.out.println("main线程执行结束"); } }
从结果可以看出,main线程要等到所有子线程都执行完之后才会继续执行,并且每一个子线程是按顺序执行的。
我们在来看一下join()方法是如何让主线程阻塞的呢?来看一下源码。
public final void join() throws InterruptedException { // 默认传入0毫秒 join(0); } // 本方法是synchronized的 public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { // 测试当前线程是否还活着 while (isAlive()) { // 执行wait,当前线程等待 wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
从join方法的源码中我们可以看到几个重要的信息,首先join()方法默认是等待0毫秒;join(long millis)方法是一个synchronized方法;循环判断当前线程是否还活着。什么意思呢?
main线程在调用线程T的join()方法时,会先获取T对象的锁;
在join方法中会调用T对象的wait()方法等待,而wait()方法会释放T对象的锁,并且main线程在执行完wait()之后会进入阻塞状态;
最后main线程在被notify唤醒之后,需要再循环判断T对象是否还活着,如果还活着会再次执行wait()。
而在线程执行完run()方法之后,JVM会调用该线程的exit()方法,通过notifyAll()唤醒处于等待状态的线程。
private void exit() { if (group != null) { // 终止group中的线程this group.threadTerminated(this); group = null; } /* Aggressively null out all reference fields: see bug 4006245 */ target = null; /* Speed the release of some of these resources */ threadLocals = null; inheritableThreadLocals = null; inheritedAccessControlContext = null; blocker = null; uncaughtExceptionHandler = null; } void threadTerminated(Thread t) { synchronized (this) { remove(t); if (nthreads == 0) { // 唤醒等待线程 notifyAll(); } if (daemon && (nthreads == 0) && (nUnstartedThreads == 0) && (ngroups == 0)) { destroy(); } } }
细心的话你会发现,使用Thread.join()只能做到让一个线程执行完之后,做不到同时等待多个线程,比如我们上面的代码,线程1执行完之后才能执行线程2,无法做到让线程1和线程2同时处理。
CountDownLatch
而在JUC包中的工具类CountDownLatch具备和Thread.join()方法同样的能力,可以等待一个线程执行完之后再处理,并且支持同时等待多个线程。我们来修改一下上面Thread.join()的例子。
public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(100); for (int i = 0; i < 100; i++) { Thread t = new Thread(() -> { System.out.println(Thread.currentThread().getName() + " run ~"); countDownLatch.countDown(); }); t.start(); } countDownLatch.await(); System.out.println("main线程执行结束"); } }
CountDownLatch需要在创建时指定一个计数值,在子线程中执行完之后调用countDown()方法进行递减,主线程的await()方法会等到值减为0之后继续执行。
从运行结果我们可以看到,100个子线程并不是按顺序执行的,而是随机的。
我们通过CountDownLatch的源码来看一下是如何实现的。
private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
在CountDownLatch中我们看到有一个Sync变量,从上一期AQS源码解析内容中我们知道Sync是AQS的一个子类实现;
首先构造方法传入的count值会作为参数赋值给Sync中的state变量。
然后我们来看一下在线程中的CountDownLath.countDown()方法会做些什么事情。
public void countDown() { // 释放共享锁 sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
如果有看我上期AQS源码解析的同学一定很熟悉,这段代码就是共享锁的解锁过程,本质上就是对state-1。
那么主线程是如何实现的等待呢?我们猜一下,应该是去判断state有没有减为0,如果减为0则代表所有的线程都执行完countDown()方法,则可以继续执行,如果state还不等于0,则表示还有线程正在执行,等待就OK啦。
我们来看看源码,是否和我们猜想的一样呢?
public void await() throws InterruptedException { // 可中断地获取共享锁 sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 尝试获取共享锁 if (tryAcquireShared(arg) < 0) // state还不是1 doAcquireSharedInterruptibly(arg); } // 获取锁状态,当state减为0时,返回1 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 排入队尾 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 线程在这里park if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
可以发现await()方法和我们昨天看到的共享锁解锁过程一模一样,符合我们的猜想。
所以,CountDownLatch的底层实现也是依靠AQS来完成的,现在大家肯定对于AQS有更深刻的认识了。
区别
我们现在来对比一下Thread.join()和CountDownLatch有哪些区别:
Thread.join()是Thread类的一个方法,而CountDownLatch是JUC包中的一个工具类;
Thread.join()的实现是依靠Object的wait()和notifyAll()来完成的,而CountDownLatch是通过AQS完成的;
Thread.join()只支持让一个线程等待,不支持同时等待多个线程,而CountDownLatch可以支持,所以CountDownLatch的效率要更高。