快速掌握并发编程---深入学习Condition

回复“000”获取大量电子书

目录

notify和waitConditionCondition使用案例生产者消费者测试类结果Condition源码分析await方法addConditionWaiter 方法fullyRelease方法isOnSyncQueue 方法signal方法doSignal 方法transferForSignal 方法从lock、await、signal,release的整个过程Condition等待通知的本质总结

notify和wait

在前面学习 synchronized 的时候:快速掌握并发编程---synchronized篇(上),有讲到 wait/notify 的基本使用,结合synchronized 可以实现对线程的通信。

waitnotifynotifyAll是Object对象的属性,并不属于线程Thread。

我们先解释这三个的一个很重要的概念:

wait使持有该对象的线程把该对象的控制权交出去,然后处于等待状态(这句话很重要,也就是说当调用wait的时候会释放锁并处于等待的状态)

notify:通知某个正在等待这个对象的控制权的线程可以继续运行(这个就是获取锁,使自己的程序开始执行,最后通过notify同样去释放锁,并唤醒正在等待的线程)

notifyAll:会通知所有等待这个对象控制权的线程继续运行(和上面一样,只不过是唤醒所有等待的线程继续执行)

从上面的解释我们可以看出通过wait和notify可以做线程之间的通信,当A线程处理完毕通知B线程执行,B线程执行完毕以后A线程可以继续执行。

那么这个时候我就在思考了,既然 J.U.C 里面提供了Lock锁的实现机制,那·J.U.C里面有没有提供类似的线程通信的工具呢?

Condition

Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒。

Condition使用案例

下面来实现一个非常典型的生产者和消费者模式;

生产者

import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class Producer implements Runnable{

private Queue<String> msg;
 private int maxSize;

Lock lock;
 Condition condition;

public Producer(Queue<String> msg, int maxSize, Lock lock, Condition condition) {
  this.msg = msg;
  this.maxSize = maxSize;
  this.lock = lock;
  this.condition = condition;
 }

@Override
 public void run() {
  int i=0;
  while(true){
   i++;
   lock.lock();
 //队列中消息满了,此时生产者不能再生产了,因为装不下了
 //所以生产者就开始等待状态
 while(msg.size()==maxSize){
  System.out.println("生产者队列满了,先等待");
  try {
   condition.await(); //阻塞线程并释放锁
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }
   try {
 Thread.sleep(1000);
   } catch (InterruptedException e) {
 e.printStackTrace();
   }
   System.out.println("生产消息:"+i);
 msg.add("生产者的消息内容"+i);
 condition.signal(); //唤醒阻塞状态下的线程
   lock.unlock();
  }
 }
}

消费者

import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class Consumer implements Runnable{
 private Queue<String> msg;
 private int maxSize;

Lock lock;
 Condition condition;

public Consumer(Queue<String> msg, int maxSize, Lock lock, Condition condition) {
  this.msg = msg;
  this.maxSize = maxSize;
  this.lock = lock;
  this.condition = condition;
 }

@Override
 public void run() {
  int i=0;
  while(true){
   i++;
   lock.lock(); //synchronized
   //消费者进来的时候需要判断是有可用的消息,
   //没有可用的消息就等待状态
   while(msg.isEmpty()){
 System.out.println("消费者队列空了,先等待");
 try {
  condition.await(); //阻塞线程并释放锁   wait
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
   }
   try {
 Thread.sleep(1000);
   } catch (InterruptedException e) {
 e.printStackTrace();
   }
   System.out.println("消费消息:"+msg.remove());
   condition.signal(); //唤醒阻塞状态下的线程
   lock.unlock();
  }
 }
}

测试类

public class TestCondition {
 public static void main( String[] args ){
  Queue<String> queue=new LinkedList<>();
  Lock lock=new ReentrantLock(); //重入锁
  Condition condition=lock.newCondition();
  int maxSize=5;

Producer producer=new Producer(queue,maxSize,lock,condition);
  Consumer consumer=new Consumer(queue,maxSize,lock,condition);

Thread t1=new Thread(producer);
  Thread t2=new Thread(consumer);
  t1.start();
  t2.start();

}
}

结果

通过这个案例简单实现了 wait 和 notify 的功能,当调用 await 方法后,当前线程会释放锁并等待,而其他线程调用 condition 对象的 signal 或者 signalall 方法通知并被阻塞的线程,然后自己执行 unlock 释放锁,被唤醒的线程获得之前的锁继续执行,最后释放锁。

所以,condition 中两个最重要的方法,一个是 await,一个是 signal 方法。

await:把当前线程阻塞挂起;

signal:唤醒阻塞的线程。

Condition源码分析

await方法

在Condition接口只是定义了await方法

void await() throws InterruptedException;

实现类在AQS

public final void await() throws InterruptedException {
 //表示 await 允许被中断
 if (Thread.interrupted()) throw new InterruptedException();
 //创建一个新的节点,节点状态为 condition,采用的数据结构仍然是单向链表
  Node node = addConditionWaiter();
 //释放当前的锁,得到锁的状态,并唤醒 AQS 队列中的一个线程
 //不管重入几次,都把state释放为0
 int savedState = fullyRelease(node);
 int interruptMode = 0;
 //如果当前节点没有在同步队列上,即还没有被 signal,则将当前线程阻塞
 while (!isOnSyncQueue(node)) {
   ////通过 park 挂起当前线程
   LockSupport.park(this);
   // 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
   if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  break;
  }
 // 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
 // interruptMode != THROW_IE -> 表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了.
 // 将这个变量设置成 REINTERRUPT
 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
 interruptMode = REINTERRUPT;
 // 如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点. 
 // 如果是 null ,就没有什么好清理的了.
 if (node.nextWaiter != null) {
  //清理掉状态为cancelled状态的 
  nlinkCancelledWaiters();
 }
 // 如果线程被中断了,需要抛出异常.或者什么都不做
 if (interruptMode != 0)
 reportInterruptAfterWait(interruptMode);
}

接下来吧整个方法里涉及到的重要方法走一遍。

addConditionWaiter 方法

这个方法的主要作用是把当前线程封装成 Node,添加到等待队列。这里的队列不再是双向链表,而是单向链表。

  /**
   * Adds a new waiter to wait queue.
   * @return its new wait node
   */
  private Node addConditionWaiter() {
   Node t = lastWaiter;
   // If lastWaiter is cancelled, clean out.
   //如果lastWaiter为canclled状态,则把他从链表中清理出去
   if (t != null && t.waitStatus != Node.CONDITION) {
 unlinkCancelledWaiters();
 t = lastWaiter;
   }
   //构建一个 Node,waitStatus=CONDITION。这里的链表是一个单向的,
   //所以相比 AQS 里的双向队来说简单了很多
   Node node = new Node(Thread.currentThread(), Node.CONDITION);
   if (t == null){
 firstWaiter = node;
   } else{
 t.nextWaiter = node;
   }
   lastWaiter = node;
   return node;
  }

上面这段代码用图来展示:

fullyRelease方法

就是彻底的释放锁,什么叫彻底呢,就是如果当前锁存在多次重入,那么在这个方法中只需要释放一次就会把所有的重入次数归零。

final int fullyRelease(Node node) {
  boolean failed = true;
  try {
   //获取AQS中state值
   int savedState = getState();
   //释放锁并且唤醒下一个同步队列中的线程
   //注意这里处理的是同步队列
   if (release(savedState)) {
 failed = false;
 return savedState;
   } else {
 throw new IllegalMonitorStateException();
   }
  } finally {
   if (failed)
 node.waitStatus = Node.CANCELLED;
  }
 }

此时线程A释放了锁,线程B就获得的锁。下面用一张图来展示:

isOnSyncQueue 方法

判断当前节点是否在同步队列中,返回 false 表示不在,返回 true 表示在。

如果不在 AQS 同步队列,说明当前节点没有唤醒去争抢同步锁,所以需要把当前线程阻塞起来,直到其他的线程调用 signal 唤醒。

如果在 AQS 同步队列,意味着它需要去竞争同步锁去获得执行程序执行权限。

为什么要做这个判断呢?

因为在 condition 队列中的节点会重新加入到 AQS 队列去竞争锁。也就是当调用 signal的时候,会把当前节点从 condition 队列转移到 AQS 队列。

 final boolean isOnSyncQueue(Node node) {
  //如果当前节点状态是CONDITION或node.prev是null,则证明当前节点在等待队列上而不是同步队列上。
  //之所以可以用node.prev来判断,是因为一个节点如果要加入同步队列,在加入前就会设置好prev字段。
  if (node.waitStatus == Node.CONDITION || node.prev == null)
   return false;
  //如果node.next不为null,则一定在同步队列上,
  //因为node.next是在节点加入同步队列后设置的
  if (node.next != null)
   return true;
  //前面的两个判断没有返回的话,就
  //从同步队列队尾遍历一个一个看是不是当前节点。
  return findNodeFromTail(node);
 } 
   //这个方法就相当简单了,就是从同步队列队尾遍历一个一个看是不是当前节点。
 private boolean findNodeFromTail(Node node) {
  Node t = tail;
  for (;;) {
   if (t == node)
 return true;
   if (t == null)
 return false;
   t = t.prev;
  }
 }

如何去判断ThreadA这个节点是否存在于 AQS队列中呢?

  1. 如果 ThreadAwaitStatus 的状态为 CONDITION,说明它存在于 condition 队列中,不在 AQS队列。因为AQS队列的状态一定不可能有 CONDITION

  2. 如果 node.prev为空,说明也不存在于 AQS队列,原因是prev=nullAQS队列中只有一种可能性,就是它是head 节点,head 节点意味着它是获得锁的节点。

  3. 如果node.next 不等于空,说明一定存在于 AQS队列中,因为只有 AQS队列才会存在 next 和 prev的关系

  4. findNodeFromTail,表示从 tail 节点往前扫描 AQS队列,一旦发现 AQS队列的节点和当前节点相等,说明节点一定存在于 AQS队列中

signal方法

await 方法会阻塞 ThreadA,然后 ThreadB抢占到了锁获得了执行权限,这个时候在 ThreadB中调用了 Condition的 signal()方法,将会唤醒在等待队列中节点。

public final void signal() {
   //先判断当前线程是否获得了锁,这个判断比较简单,直接用获得锁的线程和当前线程相比即可
   if (!isHeldExclusively()){
 //如果同步状态不是被当前线程独占,直接抛出异常。从这里也能看出来,Condition只能配合独占类同步组件使用。
 throw new IllegalMonitorStateException();
 }
 // 拿到 Condition 队列上第一个节点
 Node first = firstWaiter;
 if (first != null){
  //通知等待队列队首的节点。
  doSignal(first);
  }
 } 

doSignal 方法

private void doSignal(Node first) {
   do {
 //从 Condition 队列中删除 first 节点
 if ( (firstWaiter = first.nextWaiter) == null){
  // 将 next 节点设置成 null
  lastWaiter = null;
  }
  first.nextWaiter = null;
   //transferForSignal方法尝试唤醒当前节点,如果唤醒失败,则继续尝试唤醒当前节点的后继节点。
  } while (!transferForSignal(first) &&(first = firstWaiter) != null);
}

transferForSignal 方法

final boolean transferForSignal(Node node) {
   //更新节点的状态为 0,如果更新失败,只有一种可能就是节点被 CANCELLED 了
  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
   return false;
  //调用 enq,把当前节点添加到AQS 队列。并且返回返回按当前节点的上一个节点,也就是原tail 节点
  Node p = enq(node);
  int ws = p.waitStatus;
  // 如果上一个节点的状态被取消了, 或者尝试设置上一个节点的状态为 SIGNAL 
 //失败了(SIGNAL 表示: 他的 next 节点需要停止阻塞)
  if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
   // 唤醒节点上的线程
   LockSupport.unpark(node.thread);
  //如果 node 的 prev 节点已经是signal 状态,那么被阻塞的 ThreadA 的唤醒工作由 AQS 队列来完成
  return true;
}

执行完 doSignal 以后,会把 condition 队列中的节点转移到 AQS队列上,这个时候会判断 ThreadAprev 节点也就是 head 节点的 waitStatus。

如果大于 0 或者设置 SIGNAL 失败,表示点被设置成了 CANCELLED 状态。这个时候会唤醒ThreadA这个线程。否则就基于 AQS队列的机制来唤醒,也就是等到ThreadB释放锁之后来唤醒 ThreadA

逻辑结构图如下:

从lock、await、signal,release的整个过程

Condition等待通知的本质

总的来说,Condition的本质就是等待队列和同步队列的交互:

当一个持有锁的线程调用Condition.await()时,它会执行以下步骤:

  1. 构造一个新的等待队列节点加入到等待队列队尾

  2. 释放锁,也就是将它的同步队列节点从同步队列队首移除

  3. 自旋,直到它在等待队列上的节点移动到了同步队列(通过其他线程调用signal())或被中断

  4. 阻塞当前节点,直到它获取到了锁,也就是它在同步队列上的节点排队排到了队首。

当一个持有锁的线程调用Condition.signal()时,它会执行以下操作:

从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点CANCELLED,就尝试唤醒下一个节点;如果再CANCELLED则继续迭代。

对每个节点执行唤醒操作时,首先将节点加入同步队列,此时await()操作的步骤3的解锁条件就已经开启了。然后分两种情况讨论:

  1. 如果先驱节点的状态为CANCELLED(>0) 或设置先驱节点的状态为SIGNAL失败,那么就立即唤醒当前节点对应的线程,此时await()方法就会完成步骤3,进入步骤4。

  2. 如果成功把先驱节点的状态设置为了SIGNAL,那么就不立即唤醒了。等到先驱节点成为同步队列首节点并释放了同步状态后,会自动唤醒当前节点对应线程的,这时候await()的步骤3才执行完成,而且有很大概率快速完成步骤4。

总结

用一张图来总结:

线程 awaitThread 先通过lock.lock()方法获取锁成功后调用了 condition.await 方法进入等待队列,而另一个线程signalThread 通过 lock.lock()方法获取锁成功后调用了 condition.signal 或者 signalAll 方法,使得线程 awaitThread 能够有机会移入到同步队列中。

当其他线程释放 lock 后使得线程 awaitThread 能够有机会获取 lock,从而使得线程 awaitThread 能够从 await 方法中退出执行后续操作。如果 awaitThread 获取 lock 失败会直接进入到同步队列。

阻塞:await()方法中,在线程释放锁资源之后,如果节点不在 AQS 等待队列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁;

释放:signal()后,节点会从 condition 队列移动到 AQS 等待队列,则进入正常锁的获取流程。

关注公众号“Java后端技术全栈”

免费获取500G最新学习资料

(0)

相关推荐

  • 诚之和:如何理解Java并发之同步器设计

    这篇文章主要讲解了"如何理解Java并发之同步器设计",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何理解Java并发之 ...

  • ReentrantLock源码分析

    转自:https://blog.csdn.net/qq_37682665/article/details/114363445 目录 ReentrantLock 使用 核心源码解析 时序图 类图 Ree ...

  • 学习JUC源码(3)——Condition等待队列(源码分析结合图文理解)

    前言 在Java多线程中的wait/notify通信模式结尾就已经介绍过,Java线程之间有两种种等待/通知模式,在那篇博文中是利用Object监视器的方法(wait(),notify().notif ...

  • 快速掌握并发编程---深入学习ThreadLocal

    生活中的ThreadLocal 考试题只有一套,老师把考试题打印出多份,发给每位考生,然后考生各自写各自的试卷.考生之间不能相互交头接耳(会当做作弊).各自写出来的答案不会影响他人的分数. 注意:考试 ...

  • 快速掌握并发编程---线程池的原理和实战

    池 上图是装水的池子--水池. 流行池化技术,那么到底什么是池化技术呢? 池化技术简单点来说,就是提前保存大量的资源,以备不时之需.在机器资源有限的情况下,使用池化技术可以大大的提高资源的利用率,提升 ...

  • 快速掌握并发编程---ArrayBlockingQueue 底层原理和实战

    背景 在JDK1.5的时候,在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全"传输"数据的问题.通过这些高效并且线程安全的队列类,为我 ...

  • 中游体育:平蛙快速晋级波浪式蛙泳的学习技巧

    国外一般都是将蛙泳放到蝶泳之后进行教学,而国内由于普及型培训班的存在,所以大部分爱好者都是从平式蛙泳开始入门,当我们游到一定的阶段和水平,肯定不满足于现有的技术,都希望自己能像专业游泳运动员一样游出漂 ...

  • Java并发编程之内置锁(synchronized)

    简介 synchronized在JDK5.0的早期版本中是重量级锁,效率很低,但从JDK6.0开始,JDK在关键字synchronized上做了大量的优化,如偏向锁.轻量级锁等,使它的效率有了很大的提 ...

  • Java并发编程之线程的创建

    简介 线程是基本的调度单位,它被包含在进程之中,是进程中的实际运作单位,它本身是不会独立存在.一个进程至少有一个线程,进程中的多个线程共享进程的资源. Java中创建线程的方式有多种如继承Thread ...

  • Java并发编程实战(5)- 线程生命周期

    在这篇文章中,我们来聊一下线程的生命周期. 目录 概述 操作系统中的线程生命周期 Java中的线程生命周期 Java线程状态转换 运行状态和阻塞状态之间的转换 运行状态和无时限等待状态的切换 运行状态 ...

  • 学习编程和学习中西文打字的本质区别,在这里

    问:张老师好,我在使用fwrite函数的时候 发现数据块写到文件里 总是显示 ""烫烫",但是再用fread读出来显示到显示器上 格式是正确的,麻烦帮我看一下 谢谢! 代 ...

  • Java并发编程实战(4)- 死锁

    概述 在上一篇文章中,我们讨论了如何使用一个互斥锁去保护多个资源,以银行账户转账为例,当时给出的解决方法是基于Class对象创建互斥锁. 这样虽然解决了同步的问题,但是能在现实中使用吗?答案是不可以, ...