AQS源码解析
AQS源码解析
⽂章⽬录
⼀、AQS简介
AQS(AbstractQuenedSynchronizer)是抽象的队列式同步器框架,是除了java⾃带的synchronized关键字之外的锁机制。其底层采⽤乐观锁,⼤量使⽤了CAS操作,同时采⽤⾃旋⽅式重试,以实现轻量级和⾼效地获取锁。
AQS的核⼼思想
如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的⼯作线程,并将共享资源设置为锁定状态。如果被请求的共享资源被占⽤,那么就需要⼀套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是⽤CLH队列锁实现的,即将暂时获取不到锁的线程加⼊到队列中。
CLH队列
CLH(Craig,Landin,and Hagersten)队列:⼀个虚拟的双向队列,虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系。 AQS是将每⼀条请求共享资源的线程封装成⼀个CLH锁队列的⼀个结点(Node),来实现锁的分配。
总的来说,AQS维护了⼀个CLH队列,以及⼀个⽤volatile修饰的state(共享资源),其中state由线程通过CAS去改变。其⼤致框架如下图所⽰():
AQS本⾝是基于模板⽅法模式设计的,在使⽤时⽆需关注具体的维护和实现(如获取资源失败、⼊队、出队、唤醒等),只需要重写获取和释放共享资源state的⽅法即可。⽬前AQS定义了两种资源共享⽅式:Exclusive(独占,如:ReentrantLock)和Share(共享,如:Semaphore、CountDownLatch)。
主要的重写⽅法有:
tryAcquire(int):独占式尝试获取资源
tryRelea(int):独占式尝试释放资源
tryAcquireShared(int):共享式获取同步状态
tryReleaShared(int):共享式释放同步状态
isHeldExclusively():是否在独占模式下被线程占⽤,通常只有⽤到Condition时才需要实现
注意:
这些⽅法都没有定义成abstract,这保证了独占模式下只⽤实现tryAcquire和tryRelea,共享模式下只⽤实现
tryAcquireShared和tryReleaShared。
⽬前实现了AQS的组件有:ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch、CyclicBarrier等。⼆、AQS的节点类及state状态
Node节点类
static final class Node {
// waitStatus值
static final int CANCELLED =1;
static final int SIGNAL =-1;
static final int CONDITION =-2;
static final int PROPAGATE =-3;
// 节点的状态
volatile int waitStatus;
// 节点的前驱
volatile Node prev;
// 节点的后继
volatile Node next;
// 节点封装的线程
volatile Thread thread;
}
// 头节点和尾节点
private transient volatile Node head;
private transient volatile Node tail;
注意:
head头节点是⼀个哑结点或者是当前持有锁的线程,真正的等待线程是从第⼆个节点开始的。
waitStatus表⽰当前节点的等待状态,总共只会有五种取值:
1. CANCELLED:当前节点已取消,也是唯⼀⼤于0的状态。
2. SIGNAL:表⽰当前节点的后继节点在等待唤醒,在后继节点找到安全点时,会更新其前驱的状态为此。
3. CONDITION:节点调⽤了Condition的await(),正在等待。
4. PROPAGATE:共享模式下,可以唤醒多个后继节点
90后经典电视剧5. 0:初始默认状态,在源码的注释中有提到。
同步状态
private volatile int state;
一句表达回家的心情
该属性表⽰了锁的状态,state为0表⽰锁没有被占⽤,state⼤于0表⽰当前已有线程持有该锁,⼤于1则说明存在重⼊的情况。
state的⽅法
// getter/tter⽅法
protected final int getState(){
return state;
}
protected final void tState(int newState){
state = newState;
}
// state 通过 CAS 进⾏设置
protected final boolean compareAndSetState(int expect,int update){
pareAndSwapInt(this, stateOfft, expect, update);
}
持有锁的线程信息
// 当前持有锁的线程
private transient Thread exclusiveOwnerThread;
该属性来源于AQS继承的抽象类AbstractOwnableSynchronizer,⽤于记录占⽤锁的线程是哪⼀个。在监视器锁中,通过ObjectMonitor对象的_owner属性,⽽在AQS中,则通过exclusiveOwnerThread属性。
三、主要⽅法源码解析
3.1 acquire(int)——独占模式
// 独占模式的acquire,忽略中断
public final void acquire(int arg){
if(!tryAcquire(arg)&&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
lfInterrupt();
}
acquire()⽅法是AQS的核⼼⽅法之⼀,它是独占模式下线程获取共享资源的顶层⼊⼝,在获得资源之前,是不响应中断的。它的具体流程如下:
1. 先调⽤⾃定义的tryAcquire() 获取资源,成功则结束整个逻辑。
2. 没能直接获取资源,调⽤addWaiter() ⽅法通过CAS操作将当前包装线程的节点进⾏尾插。
3. 执⾏acquireQueued() 逻辑,它主要是实现节点进⼊队列,安全地进⼊阻塞状态,直到其它线程唤醒,并不断尝试获取资源。每次被唤醒,它都
会进⼊循环,如果⾃⼰是第⼆个节点(即head后⾯⼀个),才能尝试请求资源,只有获取到资源后才会返回。如果在等待过程中被中断过,会返回true。
4. acquireQueued()调⽤的shouldParkAfterFailedAcquire() 保证线程被阻塞前能够找到安全的前驱,并设置前为SIGNAL。
5. acquireQueued()调⽤的parkAndCheckInterrupt()让线程进⼊阻塞状态,如果有中断,返回true。
6. 如果线程在等待过程中被中断,它是不响应的,但会记录中断。在获取资源后才调⽤lfInterrupt() 进⾏⾃我中断。
7. acquire⽅法的基本流程图如下所⽰(以及):
tryAcquire(int)
// 该逻辑需要⾃定义实现
protected boolean tryAcquire(int arg){
throw new UnsupportedOperationException();
}
addWaiter(Node) 及 enq(Node)
由下⾯的代码,可以发现该⽅法不仅保证是CAS⼊队,也负责在队列为空时, 初始化队列,即队列是延时初始化(懒加载) 的。
// 节点⼊队
private Node addWaiter(Node mode){
Node node =new Node(Thread.currentThread(), mode);
// 通过 CAS 进⾏⼀次快速尝试,尾插⼊队
Node pred = tail;
if(pred != null){
node.prev = pred;
if(compareAndSetTail(pred, node)){
< = node;
return node;
}
}
// 1、队列是个空队列
// 2、未能 CAS 快速插⼊,则通过 enq() ⼊队
enq(node);
return node;
}
// 循环 CAS ⼊队
private Node enq(final Node node){
// 循环 CAS 直到⼊队成功
for(;;){
Node t = tail;
// 队列为空,创建⼀个空节点作为 head
// 然后进⼊下⼀个循环进⾏尾插
if(t == null){
if(compareAndSetHead(new Node()))
tail = head;
}el{
// 队列不空,CAS 尾插
node.prev = t;
if(compareAndSetTail(t, node)){
< = node;
return t;
}
用塑料瓶做手工}
}
}
尾分叉问题:
在enq进⾏尾插,三个步骤不是原⼦操作。其中第⼀步容易成功,第⼆步CAS在并发条件下有可能失败,第三步只有在第⼆步成功的条件下才执⾏。
所以,当有⼤量的线程在同时⼊队时,只有⼀个线程能完整地完成这三步,⽽其它线程只能完成第⼀步(即,node.prev = t),于是就出现了尾分叉。
基于上述原因,⼀个节点如果能⼊队(完成了第⼆步),则它的prev属性⼀定是有值的,但是它的next属性可能暂时还没有值。
因此,在AQS相关的源码中,常常会出现从尾节点逆向遍历链表。
acquireQueued(Node, int)
线程在队列中阻塞,并且不断尝试获取资源,成功则返回,记录中断
红烧肉的做法大全final boolean acquireQueued(final Node node,int arg){
// 标记资源是否获取成功
boolean failed =true;
try{
// 记录阻塞过程中的中断
boolean interrupted =fal;
// ⽆限循环,直到获取成功
for(;;){
// 拿到前驱节点
final Node p = node.predecessor();
// 只有前驱是 head 时,才有资格去尝试获取资源
if(p == head &&tryAcquire(arg)){
// 获取成功,将当前节点设为头节点
// 即队列的头节点为当前运⾏的线程
红辣椒炒鸡蛋
tHead(node);
< = null;// help GC
failed =fal;
// 返回中断信息
return interrupted;
}
// 没能拿到资源时,找到安全前驱节点,并进⼊阻塞状态,直到被唤醒
if(shouldParkAfterFailedAcquire(p, node)&&parkAndCheckInterrupt())
万圣节快乐
// 记录阻塞过程中是否被中断
interrupted =true;
}
}finally{
// 没有成功获取资源,则取消结点在队列中的等待
if(failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire(Node, Node)
线程阻塞前的状态检查。
SIGNAL:⼀个节点的waitStatus为SIGNAL(由后继节点设置),说明它的后继节点已经被挂起了,当释放锁或者放弃获取锁时,该节点还要唤醒它的后继节点。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node){
// 前驱的状态
int ws = pred.waitStatus;
大龙虾的做法
if(ws == Node.SIGNAL)
// 如果前驱直接是 SIGNAL,直接返回,并进⾏阻塞操作
return true;
// 前驱状态为 CANCELLED,需要找到还在队列中等待的有效节点
if(ws >0){
// 循环⼀直查找到有效节点为⽌
do{
4月28node.prev = pred = pred.prev;
}while(pred.waitStatus >0);
< = node;
}el{
// 通过 CAS 设置前驱的状态为 SIGNAL
// 保证前驱释放资源时能够唤醒⾃⼰
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return fal;
}
parkAndCheckInterrupt()