AQS 骑猪看日落 2022-12-07 01:53 154阅读 0赞 ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3pzeDE1NzMyNg_size_16_color_FFFFFF_t_70_pic_center] 抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架。 它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源时会进入此队列)。 state的访问方式有三种: * getState() * setState() * compareAndSetState() AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch) 不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现**共享资源state的获取与释放方式**即可,至于线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法: * tryAcquire(int):独占方式。尝试获取资源,成功则返回true。 * tryRelease(int):独占方式。尝试释放资源,成功则返回true。 * tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 * tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。 * isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。 以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将其置为1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state为0为止,其他线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加)。但要注意,获取多少次就要释放多少次,这样才能保证state回到0。 再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(N与线程个数一致)。这N个线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会await()函数返回,继续后续操作。 一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。 ## Node结点 ## Node结点是对每一个等待获取资源的线程的封装。包括线程 `volatile Thread thread;` ,等待状态 `volatile int waitStatus` * CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。 * SINGAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。 * CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。 * PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。 * 0:新节点入队时的默认状态 负值表示结点处于有效等待状态,而正值表示结点已被取消。 static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } ## acquire(int) ## 独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,整个过程忽略中断的影响(中断后还是会去尝试获取锁,获取到后返回,否则进入等待队列)。 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { //如果等待期间被中断过,设置一个中断状态 selfInterrupt(); } } static void selfInterrupt() { Thread.currentThread().interrupt(); } * tryAcquire:尝试获取资源,如果获取到了则返回true * addWaiter:将该线程加入等待队列的尾部,并标记为独占模式。返回包含当前线程的Node * acquireQueued:使线程阻塞在等待队列中获取资源,一直获取到资源后才返回(如果在整个等待中被中断过,则返回true,否则返回false)。 * 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才进行自我中断selfInterrupt(),将中断补上。 ## tryAcquire(int) ## 尝试获取独占资源,如果获取成功,则返回true。 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } 这里直接抛出异常,没有具体实现。因为AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现。 这里没有定义成abstract,是因为独占模式下只需实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryRealeaseShared。如果都定义成abstract,那么每个模式也要去实现另一个模式下的接口。Doug Lea站在开发者的角度,尽量减少不必要的工作量。 ## addWaiter(Node) ## 将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。 nc: nodeCurrent,保存当前线程的node private Node addWaiter(Node mode) { //根据当前线程和给定模式构造Node //mode有两种:EXCLUSIVE(独占)和SHARED(共享) Node node = new Node(Thread.currentThread(), mode); //尝试快速方式将node放到队尾 //tail为队尾,赋值为pred Node pred = tail; //判断pred是否为空,其实就是判断队尾是否有节点,其实只要队列被初始化了队尾肯定不为空 //换言之就是判断队列有没有初始化 //如果pred不为null即队列初始化了 if (pred != null) { //直接将nc的上一个节点设置成pred,即原来的队尾 node.prev = pred; //cas,确保nc入队时是原子操作,尝试设置tail=nc if (compareAndSetTail(pred, node)) { //将pred的下一个节点设置成nc,nc成为队尾了 pred.next = node; 返回nc return node; } } //如果上面的if没有成立就会执行到这里 //表示队列没有初始化,或者队列初始化了多线程下tail没有设置成功 enq(node); //返回nc return node; } enq将将node加入队尾 这里CAS自旋volatile变量,是一种很经典的用法。 private Node enq(final Node node) { //cas自旋,直到成功加入队尾 for (;;) { //队尾赋值给t Node t = tail; //第一次循环的时候t==null,代表队列没有初始化 if (t == null) { // Must initialize //new Node()就是实例化一个thread为null,waitStatus为0的结点,下面简称nn //cas设置head=nn,时候队列中只有一个nn if (compareAndSetHead(new Node())) { //设置tail=head,即为nn //然后第一次循环结束,接着执行第二次循环,第二次循环写在了下边 tail = head; } } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } //第二次循环 private Node enq(final Node node) { for (;;) { //此时tail=nn,所以t=nn Node t = tail; //此时队列已经初始化t==null不成立 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) { tail = head; } } else { //此时队列已经初始化了 // 将nc入队 // 将nc的前一个节点设置为nn(tail) node.prev = t; //尝试入队,直到入队成功,否则一直循环然后进行cas if (compareAndSetTail(t, node)) { t.next = node; //返回t,即nn,死循环结束,end(node)方法返回 //这个返回其实就是为了终止循环,返回出去的t没有意义 return t; } } } } ## acquireQueued(Node,int) ## 通过tryAcquire()和addWaiter(),该线程获取资源失败,并且已经被放入等待队列尾部了。 acquireQueued方法接下来要做的就是:让线程进入等待状态,直到拿到资源(当其他线程释放资源后唤醒该线程时,该线程会尝试去拿资源),然后做自己的事儿。 final boolean acquireQueued(final Node node, int arg) { //是否成功拿到了资源,true代表没有拿到 boolean failed = true; try { //等待过程中是否被中断过 boolean interrupted = false; //自旋 for (;;) { //获取nc的上一个节点 final Node p = node.predecessor(); //如果上一个节点为头部,即nc为第二个节点,便有资格执行tryAcquire尝试获取资源 //只有nc为第二个节点,第一个排队的情况下才会尝试获取资源 //如果获取到了资源就不去park了(head节点的线程释放了资源),否则去park if (p == head && tryAcquire(arg)) { //如果nc获取到了资源则将nc设置为head,nc的thread设置为null //所以说head节点的线程获取到了资源 setHead(node); p.next = null; // help GC failed = false; //返回等待过程中是否被中断过 return interrupted; } //此时nc的上一个节点不是head,或者是head但是head的线程还没有释放资源 //先将上一个节点设置成-1,然后进入等待状态,直到unpark或中断, //然后进入下一次循环尝试获取资源 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){ interrupted = true; } } } finally { //如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了), //那么取消结点在队列中的等待。 if (failed) { cancelAcquire(node); } } } //将node设置为头节点,thread设置为null private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } //前驱节点是否park了,如果不是当前节点就不能park //park之前需要把上一个节点的状态改为park状态(SIGNAL) //为什么需要改变上一个节点的park状态?因为每个node都有一个状态,默认为0,表示无状态 //-1表示在park,这个状态只能是由别人来修改,因为如果已经park了,就没有办法执行代码了 //所以只能别人来改,每次都是自己的后一个节点把自己改成-1状态 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //前驱节点的状态 int ws = pred.waitStatus; if (ws == Node.SIGNAL) { //如果已经告诉前驱节点释放资源后通知自己一下,那就可以安心休息了 return true; } if (ws > 0) { //将取消的节点剔除 //如果前驱节点状态为CANCELLED,则一直往前找,直到找到一个正常等待的状态,并排在它的后边 //那些CANCELLED的节点,形成一个无引用链,会被GC do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //ws<=0 //如果前驱状态正常,就把前驱的状态设置成SIGNAL,告诉他释放资源后或取消时通知自己一下 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); //当前线程执行此处后进行等待 return Thread.interrupted();//unpark或中断后会执行,检查并清除中断状态 } ## setHead(Node) ## //将node设置为头节点,thread设置为null private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } ## release(int) ## 独占模式下线程是否共享资源的顶层入口。会释放指定量的资源,如果彻底释放了(state=0),会唤醒等待队列里的其他线程来获取资源。 public final boolean release(int arg) { //尝试释放资源 if (tryRelease(arg)) { //如果成功释放了资源则unpark节点 Node h = head; if (h != null && h.waitStatus != 0) { unparkSuccessor(h); } return true; } return false; } ## tryRelease(int) ## 尝试释放资源,释放了则返回true。正常来说,tryRelease都会成功,因为这是独占模式,该线程来释放资源,那么他肯定拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题(因为只有当前线程可以执行代码)。 尝试释放指定量的资源。返回true代表state=0,完全释放了 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } ## unparkSuccessor(Node) ## 唤醒后边的节点 //node为head节点,node一般为当前线程所在的节点 private void unparkSuccessor(Node node) { int ws = node.waitStatus; //如果head节点为正常状态,尝试将其设置为0 if (ws < 0) { compareAndSetWaitStatus(node, ws, 0); } //找到第二个节点 Node s = node.next; //第二个节点为null或被取消了, if (s == null || s.waitStatus > 0) { s = null; //从后往前遍历,找到等待队列中最前边的有效的节点 for (Node t = tail; t != null && t != node; t = t.prev) { if (t.waitStatus <= 0) { s = t; } } } //如果第二个节点为有效节点则unpark第二个节点对应的线程, //否则找到等待队列中最前边的一个有效的节点来unpark //如果有效的节点不为空,则unpark节点对应的线程 if (s != null) { LockSupport.unpark(s.thread); } } ## acquireShared(int)—共享 ## 共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) { doAcquireShared(arg); } } 1. tryAcquireShared()尝试获取资源,成功则直接返回; 2. 失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。 ## tryAcquireShare(int) ## tryAcquireShared需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。 尝试以共享模式进行获取。此方法应查询对象的状态是否允许以共享模式获取对象,如果允许则获取对象。该方法始终由执行获取的线程调用。如果此方法报告失败,则acquire方法可以将线程排队(如果尚未排队),直到其他某个线程释放该信号为止。 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } ## doAcquireShared(int) ## private void doAcquireShared(int arg) { // 加入队列尾部 // Node.SHARED,new Node() final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { //将head指向自己,还有剩余资源可以再唤醒之后的线程 setHeadAndPropagate(node, r); p.next = null; // help GC // 如果等待过程中被打断过,此时将中断补上 if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 和acquireQueued()很相似。对,其实流程并没有太大区别。只不过这里将补中断的selfInterrupt()放到doAcquireShared()里了,而独占模式是放到acquireQueued()之外,其实都一样。 ## setHeadAndPropagate(Node,int) ## 此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还回去唤醒后继节点。 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; //将node设置为头节点,thread设置为null setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) { doReleaseShared(); } } } ## releaseShared(int) ## 共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。 public final boolean releaseShared(int arg) { // 尝试释放资源 if (tryReleaseShared(arg)) { // 唤醒后继节点 doReleaseShared(); return true; } return false; } 跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,\*\*共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。\*\*例如,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值。 ## tryReleaseShared(int) ## protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } ## doReleaseShared() ## 唤醒后继节点 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //唤醒后继 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } ## 其他-------- ## ### hasQueuedPredecessors() ### 查询是否有任何线程比当前线程等待的时间更长(如果没有会尝试去获取资源),这个方法的调用等同于(但是或许比以下方法更有效): getFirstQueuedThread() != Thread.currentThread() && hasQueuedThreads() 注意由于打断和超时导致的取消可能随时发生,因此,返回true不能保证某些其他线程将在当前线程之前获取锁。 同样,在此方法返回false后,另一个线程也可能在排队竞赛中胜出,因为队列是空的。 如果当前线程之前有一个排队的线程,则返回true;如果当前线程位于队列的开头或队列为空,则返回false。 如果返回false,就去cas尝试加锁 public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } 当第一个线程tf来的时候:**tail和head都没有初始化,所以都为null**,即队列还没有初始化。所以h!=t为false,此时短路,后边的就都不执行了,直接返回**false**。 除了队列没有初始化时h!=t为false,**当队列中只有一个head结点的时候**,head==tail,所以h!=t,直接返回**false**。什么时候会出现这种情况?假设原队列里边有5个线程在排队,当前面四个都执行完的时候,轮到第5个线程得到锁的时候,他会把自己设置成头部,而尾部又没有,此时队列总只有一个head。 **如果队列初始化了并且有排队的节点**,此时h!=t为true,并且h.next不为null,(s=h.next)==null的值为false,继续判断s对应的线程是否为当前线程,如果**是当前线程则返回false**,true && ( false || false)整体返回false。 如果s对应的线程**不为当前线程**,true && ( false || true),**则整体返回true**。 [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3pzeDE1NzMyNg_size_16_color_FFFFFF_t_70_pic_center]: /images/20221123/eaab99e09f364ea995c7324c89fa1a9f.png
相关 AQS详解 前言:之前AQS、ReentrantLock、CountLatchDown大概原理都看懂了,面试的时候一问能说个大概,感觉这样也就行了,跟背课文一样背源码也没什么意思,但是根据 今天药忘吃喽~/ 2022年12月11日 15:29/ 0 赞/ 176 阅读
相关 AQS ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ub 骑猪看日落/ 2022年12月07日 01:53/ 0 赞/ 155 阅读
相关 AQS 1 AQS抽象的队列同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountD ﹏ヽ暗。殇╰゛Y/ 2022年11月09日 03:55/ 0 赞/ 182 阅读
相关 AQS简介 AQS简介 > AQS,AbstractQueuedSynchronizer,即队列同步器 > AQS即是AbstractQueuedSynchronizer,一个用 末蓝、/ 2022年05月25日 00:16/ 0 赞/ 184 阅读
相关 aqs学习 aqs是一个很重要的并发框架,熟悉之后可以很方便的构造自己的并发工具。 ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text 「爱情、让人受尽委屈。」/ 2022年04月24日 13:24/ 0 赞/ 181 阅读
相关 AQS AQS 继承AOS(含独占线程) volatile int state 组合使用 可重写tryAcquire tryRelease tryAcquireShared tryR àì夳堔傛蜴生んèń/ 2022年04月14日 03:37/ 0 赞/ 219 阅读
相关 AQS详解 AQS的介绍 AQS的全称为(AbstractQueuedSynchronizer),这个类在java.util.concurrent.locks包下面。 ![在这里插 电玩女神/ 2022年02月28日 11:20/ 0 赞/ 283 阅读
相关 AQS笔记 AQS笔记 概述 AQS 是 `AbstractQueueSynchronized` 抽象同步队列的简称,它是实现同步器的基础组件,并发包中锁的底层就是使 我不是女神ヾ/ 2022年02月21日 15:36/ 0 赞/ 203 阅读
相关 AQS架构 AQS,全称是AbstractQueuedSynchronizer,中文译为抽象队列式同步器 AQS架构: ![watermark_type_ZmFuZ3poZW5 水深无声/ 2021年12月10日 01:07/ 0 赞/ 301 阅读
还没有评论,来说两句吧...