Java并发之AQS原理剖析
优质文章,第一时间送达
作者 | Yanci丶
来源 | urlify.cn/IFJ3Mb
概述:
AbstractQueuedSynchronizer
,可以称为抽象队列同步器。
AQS有独占模式和共享模式两种:
独占模式:
公平锁:
非公平锁:
共享模式:
数据结构:
基本属性:
/**
* 同步等待队列的头结点
*/
private transient volatile Node head;
/**
* 同步等待队列的尾结点
*/
private transient volatile Node tail;
/**
* 同步资源状态
*/
private volatile int state;
内部类:
static final class Node { /** * 标记节点为共享模式 */ static final Node SHARED = new Node(); /** * 标记节点为独占模式 */ static final Node EXCLUSIVE = null;
static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3;
/** * CANCELLED: 值为1,表示当前的线程被取消 * SIGNAL: 值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark; * CONDITION: 值为-2,表示当前节点在等待condition,也就是在condition队列中; * PROPAGATE: 值为-3,表示当前场景下后续的acquireShared能够得以执行; * 0: 表示当前节点在sync队列中,等待着获取锁。 * 表示当前节点的状态值 */ volatile int waitStatus;
/** * 前置节点 */ volatile Node prev;
/** * 后继节点 */ volatile Node next;
/** * 节点同步状态的线程 */ volatile Thread thread;
/** * 存储condition队列中的后继节点 */ Node nextWaiter;
/** * 是否为共享模式 */ final boolean isShared() { return nextWaiter == SHARED; }
/** * 获取前驱结点 */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; }
Node() { // Used to establish initial head or SHARED marker }
Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; }
Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; }}
主要方法解析:
tryAcquire/tryAcquireShared(int arg)
独占/共享模式获取锁;由子类实现,仅仅获取锁,获取锁失败时不进行阻塞排队。
tryRelease/tryReleaseShared(int arg)
独占/共享模式释放锁;由子类实现,仅仅释放锁,释放锁成功不对后继节点进行唤醒操作。
acquire/acquireShared(int arg)
独占/共享模式获取锁,如果线程被中断唤醒,会返回线程中断状态,不会抛异常中止执行操作(忽略中断)。
acquireInterruptibly/acquireSharedInterruptibly(int arg)
独占/共享模式获取锁,线程如果被中断唤醒,则抛出InterruptedException异常(中断即中止)。
tryAcquireNanos/tryAcquireSharedNanos(int arg, long nanosTimeout)
独占/共享时间中断模式获取锁,线程如果被中断唤醒,则抛出InterruptedException异常(中断即中止);如果超出等待时间则返回加锁失败。
release/releaseShared(int arg)
独占/共享模式释放锁。
addWaiter(Node mode)
将给定模式节点进行入队操作。
private Node addWaiter(Node mode) {
// 根据指定模式,新建一个当前节点的对象
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
// 将当前节点的前置节点指向之前的尾结点
node.prev = pred;
// 将当前等待的节点设置为尾结点(原子操作)
if (compareAndSetTail(pred, node)) {
// 之前尾结点的后继节点设置为当前等待的节点
pred.next = node;
return node;
}
}
enq(node);
return node;
}
enq(final Node node)
将节点设置为尾结点。注意这里会进行自旋操作,确保节点设置成功。因为等待的线程需要被唤醒操作;如果操作失败,当前节点没有与其他节点没有引用指向关系,一直就不会被唤醒(除非程序代码中断线程)。
private Node enq(final Node node) { for (;;) { Node t = tail; // 判断尾结点是否为空,尾结点初始值是为空 if (t == null) { // Must initialize // 尾结点为空,需要初始化 if (compareAndSetHead(new Node())) tail = head; } else { // 设置当前节点设置为尾结点 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued(final Node node, int arg)
已经在队列当中的节点,准备阻塞获取锁。在阻塞前会判断前置节点是否为头结点,如果为头结点;这时会尝试获取下锁(因为这时头结点有可能会释放锁)。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 当前节点的前置节点
final Node p = node.predecessor();
// 入队前会先判断下该节点的前置节点是否是头节点(此时头结点有可能会释放锁);然后尝试去抢锁
// 在非公平锁场景下有可能会抢锁失败,这时候会继续往下执行 阻塞线程
if (p == head && tryAcquire(arg)) {
//如果抢到锁,将头节点后移(也就是将该节点设置为头结点)
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果前置节点不是头结点,或者当前节点抢锁失败;通过shouldParkAfterFailedAcquire判断是否应该阻塞
// 当前置节点的状态为SIGNAL=-1,才可以安全被parkAndCheckInterrupt阻塞线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 该线程已被中断
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire(Node pred, Node node)
检查和更新未能获取锁节点的状态,返回是否可以被安全阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 获取前置节点的状态 if (ws == Node.SIGNAL) /* * 前置节点的状态waitStatus为SIGNAL=-1,当前线程可以安全的阻塞 */ return true; if (ws > 0) { /* * 如果前置节点的状态waitStatus>0,即waitStatus为CANCELLED=1(无效节点),需要从同步状态队列中取消等待(移除队列) */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 将前置状态的waitStatus修改为SIGNAL=-1,然后当前节点才可以被安全的阻塞 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt()
阻塞当前节点,返回当前线程的中断状态。
1 private final boolean parkAndCheckInterrupt() {
2 LockSupport.park(this); //阻塞
3 return Thread.interrupted();
4 }
cancelAcquire(Node node)
取消进行的获取锁操作,在非忽略中断模式下,线程被中断唤醒抛异常时会调用该方法。
// 将当前节点的状态设置为CANCELLED,无效的节点,同时移除队列 private void cancelAcquire(Node node) { if (node == null) return;
node.thread = null; Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev;
Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); }
node.next = node; // help GC } }
hasQueuedPredecessors()
判断当前线程是否应该排队。
1.第一种结果——返回true:(1.1和1.2同时存在,1.2.1和1.2.2有一个存在)
1.1 h != t为true,说明头结点和尾结点不相等,表示队列中至少有两个不同节点存在,至少有一点不为null。
1.2 ((s = h.next) == null || s.thread != Thread.currentThread())为true
1.2.1 (s = h.next) == null为true,表示头结点之后没有后续节点。
1.2.2 (s = h.next) == null为false,s.thread != Thread.currentThread()为true
表示头结点之后有后续节点,但是头节点的下一个节点不是当前线程
2.第二种结果——返回false,无需排队。(2.1和2.2有一个存在)
2.1 h != t为false,即h == t;表示h和t同时为null或者h和t是同一个节点,无后续节点。
2.2 h != t为true,((s = h.next) == null || s.thread != Thread.currentThread())为false
表示队列中至少有两个不同节点存在,同时持有锁的线程为当前线程。
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}