AQS-CountDownLatch&CyclicBarrier&Semaphore 淩亂°似流年 2022-12-08 14:24 162阅读 0赞 # AQS-CountDownLatch&CyclicBarrier&Semaphore # ### 文章目录 ### * AQS-CountDownLatch&CyclicBarrier&Semaphore * * CountDownLatch * * 源码分析 * Semaphore * * 源码分析 * CyclicBarrier * * 源码分析 ## CountDownLatch ## CountDownLatch是JUC包下的一个基于AQS实现的并发工具类,利用他可以实现类似计数器的功能,比如有一个任务A,他要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现 简单使用demo: public static void main(String[] args) throws Exception{ final CountDownLatch countDownLatch = new CountDownLatch(2); Thread thread1 = new Thread(()->{ try { System.out.println("线程一执行中。。。。"); Thread.sleep(2000); System.out.println("线程一执行完成。"); countDownLatch.countDown(); }catch (Exception e){ } },"t1"); Thread thread2 = new Thread(()->{ try { System.out.println("线程二执行中。。。。"); Thread.sleep(2000); System.out.println("线程二执行完成。"); countDownLatch.countDown(); }catch (Exception e){ } },"t2"); thread1.start(); thread2.start(); countDownLatch.await(); System.out.println("----------线程一二执行完成,继续执行主线程"); } 执行结果: ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MTkyMjI4OQ_size_16_color_FFFFFF_t_70_pic_center] CountDownLatch中最重要的三个方法: public void countDown() { }; //将count值减1 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行 public void await() throws InterruptedException { }; //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行 ### 源码分析 ### **构造方法:** 传入计数大小,并且实例化同步器 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } **同步器实现:** // 基于AQS实现 private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } // 获取共享锁 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 释放共享锁 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } **await方法:** public void await() throws InterruptedException { // await方法就是可中断的获取共享锁 sync.acquireSharedInterruptibly(1); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1;// getState中获取的state,state是我们传入的count,即count != 0返回-1 } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // 返回-1后就会将其线程封装为节点链接到等待队列中,自旋获取共享锁 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } **countDown方法** public void countDown() { // countDown方法其实就是释放共享锁 sync.releaseShared(1); } ## Semaphore ## Semaphore是和CountDownLatch一样,也是JUC包下的并发工具类,他可以控制并发访问资源的线程数通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。 public class Test { public static void main(String[] args) { int N = 8; //工人数 Semaphore semaphore = new Semaphore(5); //机器数目 for(int i=0;i<N;i++) new Worker(i,semaphore).start(); } static class Worker extends Thread{ private int num; private Semaphore semaphore; public Worker(int num,Semaphore semaphore){ this.num = num; this.semaphore = semaphore; } @Override public void run() { try { semaphore.acquire(); System.out.println("工人"+this.num+"占用一个机器在生产..."); Thread.sleep(2000); System.out.println("工人"+this.num+"释放出机器"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } } 执行结果: 工人0占用一个机器在生产... 工人1占用一个机器在生产... 工人2占用一个机器在生产... 工人4占用一个机器在生产... 工人5占用一个机器在生产... 工人0释放出机器 工人2释放出机器 工人3占用一个机器在生产... 工人7占用一个机器在生产... 工人4释放出机器 工人5释放出机器 工人1释放出机器 工人6占用一个机器在生产... 工人3释放出机器 工人7释放出机器 工人6释放出机器 ### 源码分析 ### 构造方法: public Semaphore(int permits) { // 直接传入许可证数目 sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { // 传入许可证加是否是公平锁的标志,等待时间越久的越先获取许可 sync = fair ? new FairSync(permits) : new NonfairSync(permits); } 获取许可: public void acquire() throws InterruptedException { // 获取共享锁 sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { // 等待队列该线程前面有节点在等待,阻塞 if (hasQueuedPredecessors()) return -1; // 无线程等待,许可证减少 int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } **释放许可证** public void release() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int releases) { // 当前许可加上释放的许可,CAS更新后返回true for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } ## CyclicBarrier ## 通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。 public class Test { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); for(int i=0;i<N;i++) new Writer(barrier).start(); } static class Writer extends Thread{ private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据..."); try { Thread.sleep(5000); //以睡眠来模拟写入数据操作 System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println("所有线程写入完毕,继续处理其他任务..."); } } } 结果: 线程Thread-0正在写入数据... 线程Thread-3正在写入数据... 线程Thread-2正在写入数据... 线程Thread-1正在写入数据... 线程Thread-2写入数据完毕,等待其他线程写入完毕 线程Thread-0写入数据完毕,等待其他线程写入完毕 线程Thread-3写入数据完毕,等待其他线程写入完毕 线程Thread-1写入数据完毕,等待其他线程写入完毕 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务... CountDownLatch 可以实现多个线程的协调,在所有指定线程完成任务后,主线程才继续任务,但是CountDownLatch 有个缺点就是,不可重用,每次都需要创建新的CountDownLatch 实例 ### 源码分析 ### **构造方法** 当parties个线程准备就绪后即都调用await方法后,执行barrierAction public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } 准备就绪后,啥事不干 public CyclicBarrier(int parties) { this(parties, null); } **await方法** 有一个我们常用的方法 await,还有一个内部类,Generation ,仅有一个参数,有什么作用呢? 在 CyclicBarrier 中,有一个 “代” 的概念,因为 CyclicBarrier 是可以复用的,那么每次所有的线程通过了栅栏,就表示一代过去了,就像我们的新年一样。当所有人跨过了元旦,日历就更新了。 CyclicBarrier 支持在所有线程通过栅栏的时候,执行一个线程的任务。 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 锁住 lock.lock(); try { // 当前代 final Generation g = generation; // 如果这代损坏了,抛出异常 if (g.broken) throw new BrokenBarrierException(); // 如果线程中断了,抛出异常 if (Thread.interrupted()) { // 将损坏状态设置为 true // 并通知其他阻塞在此栅栏上的线程 breakBarrier(); throw new InterruptedException(); } // 获取下标 int index = --count; // 如果是 0 ,说明到头了 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; // 执行栅栏任务 if (command != null) command.run(); ranAction = true; // 更新一代,将 count 重置,将 generation 重置. // 唤醒之前等待的线程 nextGeneration(); // 结束 return 0; } finally { // 如果执行栅栏任务的时候失败了,就将栅栏失效 if (!ranAction) breakBarrier(); } } for (;;) { try { // 如果没有时间限制,则直接等待,直到被唤醒 if (!timed) trip.await(); // 如果有时间限制,则等待指定时间 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // g == generation >> 当前代 // ! g.broken >>> 没有损坏 if (g == generation && ! g.broken) { // 让栅栏失效 breakBarrier(); throw ie; } else { // 上面条件不满足,说明这个线程不是这代的. // 就不会影响当前这代栅栏执行逻辑.所以,就打个标记就好了 Thread.currentThread().interrupt(); } } // 当有任何一个线程中断了,会调用 breakBarrier 方法. // 就会唤醒其他的线程,其他线程醒来后,也要抛出异常 if (g.broken) throw new BrokenBarrierException(); // g != generation >>> 正常换代了 // 一切正常,返回当前线程所在栅栏的下标 // 如果 g == generation,说明还没有换代,那为什么会醒了? // 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。 // 正是因为这个原因,才需要 generation 来保证正确。 if (g != generation) return index; // 如果有时间限制,且时间小于等于0,销毁栅栏,并抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MTkyMjI4OQ_size_16_color_FFFFFF_t_70_pic_center]: /images/20221123/3c580bbd45634dceb53662443195e38b.png
还没有评论,来说两句吧...