jdk源码解析七之ReadWriteLock 灰太狼 2023-02-21 11:42 38阅读 0赞 ### 文章目录 ### * ReadWriteLock * * ReentrantReadWriteLock * * 构造 * 获取读写锁 * 读锁 * * lock * unlock * 写锁 * * lock * unlock * 锁降级 * 总结 # ReadWriteLock # ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L051YW5fRmVuZw_size_16_color_FFFFFF_t_70] 分别维护2个锁,写锁是独占锁,读锁是共享锁,因为读的时间通常比写的时间长,所以写锁优先级比读锁高 ## ReentrantReadWriteLock ## ### 构造 ### public ReentrantReadWriteLock() { //默认独占 this(false); } public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); //静态内部类 readerLock = new ReadLock(this); ///静态内部类 writerLock = new WriteLock(this); } //高16读锁,低16写锁 static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ //返回共享个数 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ //返回独占个数 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } ### 获取读写锁 ### public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } ### 读锁 ### #### lock #### public void lock() { sync.acquireShared(1); } protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ ///获取当前线程 Thread current = Thread.currentThread(); //获取当前锁的数量 int c = getState(); //持有写锁失败 if ( //获取独占锁数量!=0 exclusiveCount(c) != 0 && //且当前拥有独占访问权限的线程不等于当前线程,这里写锁可降级 getExclusiveOwnerThread() != current) return -1; //获取共享锁数量 int r = sharedCount(c); /* 第一次线程A读取,则记录第一个堵得线程以及个数 第二次线程A读取,则当前访问线程个数+1 第三次线程B读取,利用cachedHoldCounter缓存当前线程tid以及访问次数 readHolds可以理解为一级缓存,绑定了每个线程的线程计数器 cachedHoldCounter:二级缓存,缓存上一个线程执行重入锁的次数 */ if (//线程是否应该被阻塞 !readerShouldBlock() && //共享锁数量<最大锁数量 r < MAX_COUNT && //更新共享锁数量 compareAndSetState(c, c + SHARED_UNIT)) { //如果是第一次读取 if (r == 0) { //记录第一个读的线程 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { //如果当前访问的线程是第一个访问的线程,则访问线程数+1 firstReaderHoldCount++; } else { //获取计数器 HoldCounter rh = cachedHoldCounter; //计数器为null或者当前线程id不为正在运行线程id if (rh == null || rh.tid != getThreadId(current)) //获取当前计数线程对应计数器 cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) //设置线程计数器 readHolds.set(rh); //线程重入次数+1 rh.count++; } return 1; } //CAS尚未命中 return fullTryAcquireShared(current); } final int fullTryAcquireShared(Thread current) { /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */ HoldCounter rh = null; for (;;) { int c = getState(); //如果当前有独占锁,且独占线程非当前线程,则返回 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { //写锁没有被获取,且读线程被阻塞 // Make sure we're not acquiring read lock reentrantly //当前第一个线程为读线程,说明当前线程重入 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { //获取缓存的计数器 rh = cachedHoldCounter; //没有缓存计数器,或者计数器线程tid非当前线程tid(也就是说非上一个获取锁的线程) //对写锁的让步,如果第一个获取锁的线程是写锁,那么后续所有线程AQS排队 if (rh == null || rh.tid != getThreadId(current)) { //获取线程计数器 rh = readHolds.get(); //重入锁数量为0 if (rh.count == 0) //删除计数器 readHolds.remove(); } } //重入锁数量为0 if (rh.count == 0) //AQS排队 return -1; } } //读锁数量超过上限,异常 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //设置读锁数量 //当前线程是第一个获取读锁的线程,或是上一个执行任务的线程会执行到这里 //也就是说重入获取读锁的线程才会执行到这里 if (compareAndSetState(c, c + SHARED_UNIT)) { // if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); //重入数量累加 rh.count++; //缓存计数器 cachedHoldCounter = rh; // cache for release } return 1; } } } final boolean readerShouldBlock() { //下一个锁是否是独占锁 return apparentlyFirstQueuedIsExclusive(); } final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; } #### unlock #### protected final boolean tryReleaseShared(int unused) { //获取当前线程 Thread current = Thread.currentThread(); //当前线程为第一个获取读锁的线程 if (firstReader == current) { // assert firstReaderHoldCount > 0; //如果是最后一次释放锁,则直接置空firstReader if (firstReaderHoldCount == 1) firstReader = null; else //头锁重入次数-1 firstReaderHoldCount--; } else { //获取上一个获取读锁的线程计数器 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) //非上一次执行的线程,则获取当前线程计数器 rh = readHolds.get(); int count = rh.count; if (count <= 1) { //删除当前线程计数器 readHolds.remove(); //说明没有之前获取锁,直接释放则异常 if (count <= 0) throw unmatchedUnlockException(); } //线程重入次数-1 --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. //锁为0说明正确释放 return nextc == 0; } } ### 写锁 ### #### lock #### protected final boolean tryAcquire(int acquires) { //获取当前线程 Thread current = Thread.currentThread(); ///获取当前线程状态 int c = getState(); //获取独占锁重入数量 int w = exclusiveCount(c); //如果有线程获取读写锁 if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) //写锁没被占用,或当前拥有独占访问权限的线程不等于当前线程,说明有读锁被占用,返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; //读写锁数量超过最大数量,异常 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire //执行到这里,说明写锁被重入 setState(c + acquires); return true; } //执行到这里说明没有读写锁占用 if ( //写锁是否阻塞 writerShouldBlock() || //修改状态 !compareAndSetState(c, c + acquires)) //如果写锁没有被阻塞,且修改状态失败,则返回false return false; //设置锁被当前线程独占 setExclusiveOwnerThread(current); return true; } 非公平获取锁是否阻塞 final boolean writerShouldBlock() { return false; // writers can always barge } 公平获取锁是否阻塞 final boolean writerShouldBlock() { return hasQueuedPredecessors(); } public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; //判断当前线程是否是等待线程节点的下一个线程 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } #### unlock #### protected final boolean tryRelease(int releases) { //拥有独占锁的不是当前线程,则异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; //独占锁数量为0,则清空锁的持有者 if (free) setExclusiveOwnerThread(null); //设置锁数量 setState(nextc); return free; } ### 锁降级 ### ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L051YW5fRmVuZw_size_16_color_FFFFFF_t_70 1] * class CachedData { * Object data; * volatile boolean cacheValid; * final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); * * void processCachedData() { * rwl.readLock().lock(); * if (!cacheValid) { * // Must release read lock before acquiring write lock * rwl.readLock().unlock(); * rwl.writeLock().lock(); * try { * // Recheck state because another thread might have * // acquired write lock and changed state before we did. * if (!cacheValid) { * data = ... * cacheValid = true; * } * // Downgrade by acquiring read lock before releasing write lock * rwl.readLock().lock(); * } finally { * rwl.writeLock().unlock(); // Unlock write, still hold read * } * } * * try { * use(data); * } finally { * rwl.readLock().unlock(); * } * } * }} ### 总结 ### 当获取写锁的情况下,当前线程依旧能获取读锁,这称之为锁降级 当获取读锁情况下,不能获取写锁 这里始终保证了写锁>读锁等级. 避免了锁等级一样,出现获取锁的乱序问题. 读锁添加的State是SHARED\_UNIT的倍数 这里分别用firstReader记录第一个获取读锁的线程 cachedHoldCounter获取上一个获取读锁的线程计数器 readHolds记录每个线程锁对应的计数器 如果下一个锁是独占锁,且非公平模式下,则后续所有线程都进入AQS等待锁,只有已经获取读锁的依旧可以执行 final boolean readerShouldBlock() { //下一个锁是否是独占锁 return apparentlyFirstQueuedIsExclusive(); } protected final int tryAcquireShared(int unused) { / if (//线程是否应该被阻塞 !readerShouldBlock() &&{ //... } //CAS尚未命中 return fullTryAcquireShared(current); } final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); //如果当前有独占锁,且独占线程非当前线程,则返回 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; / } else if (readerShouldBlock()) { / } //读锁数量超过上限,异常 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //设置读锁数量 //当前线程是第一个获取读锁的线程,或是上一个执行任务的线程会执行到这里 //也就是说重入获取读锁的线程才会执行到这里 if (compareAndSetState(c, c + SHARED_UNIT)) { // if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); //重入数量累加 rh.count++; //缓存计数器 cachedHoldCounter = rh; // cache for release } return 1; } } } ![在这里插入图片描述][20200603194410214.png] 公平模式下直接让下一个线程获取锁 final boolean readerShouldBlock() { return hasQueuedPredecessors(); } public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L051YW5fRmVuZw_size_16_color_FFFFFF_t_70]: https://img-blog.csdnimg.cn/20200603162329411.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L051YW5fRmVuZw==,size_16,color_FFFFFF,t_70 [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L051YW5fRmVuZw_size_16_color_FFFFFF_t_70 1]: https://img-blog.csdnimg.cn/20200603190442750.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L051YW5fRmVuZw==,size_16,color_FFFFFF,t_70 [20200603194410214.png]: https://img-blog.csdnimg.cn/20200603194410214.png
相关 jdk源码解析八之NIO 文章目录 Buffer ByteBuffer MappedByteBuffer DirectByteBu 素颜马尾好姑娘i/ 2023年02月22日 05:12/ 0 赞/ 113 阅读
相关 jdk源码解析八之BIO 文章目录 字节流 InputStream FilterInputStream ByteArrayInputSt 怼烎@/ 2023年02月21日 14:21/ 0 赞/ 30 阅读
相关 jdk源码解析七之Condition 文章目录 Condition newCondition await signal signalAll 素颜马尾好姑娘i/ 2023年02月21日 11:42/ 0 赞/ 40 阅读
相关 jdk源码解析七之ReadWriteLock 文章目录 ReadWriteLock ReentrantReadWriteLock 构造 获取读写锁 灰太狼/ 2023年02月21日 11:42/ 0 赞/ 39 阅读
相关 jdk源码解析三之CopyOnWriteArrayList 文章目录 CopyOnWriteArrayList add remove get 末蓝、/ 2023年02月20日 12:07/ 0 赞/ 23 阅读
相关 jdk源码解析三之ConcurrentHashMap 文章目录 ConcurrentHashMap put 初始化 扩容 ge た 入场券/ 2023年02月20日 12:07/ 0 赞/ 29 阅读
还没有评论,来说两句吧...