Skip to content

Latest commit

 

History

History
655 lines (561 loc) · 20.6 KB

Lock锁组件.md

File metadata and controls

655 lines (561 loc) · 20.6 KB

类图结构

J.U.C 的锁组件中 类相对较少,从 JDK 相应的包中也能看出来,下图标记了其中最主要的几个接口和类,也是本文要分析的重点。

avatar

下图 将这几个接口和类 以类图的方式展现出来,其中包含了它们所声明的主要方法。

avatar

Lock 组件

Lock 组件的结构很简单,只有一个接口和一个实现类,源码如下。

publicinterfaceLock { /** * 获取锁 */voidlock(); /** * 获取锁,除非当前线程中断 */voidlockInterruptibly() throwsInterruptedException; /** * 只有当调用时 锁是空闲的情况下,才获取锁 */booleantryLock(); /** * 如果锁在给定的等待时间内空闲且当前线程未被中断,则获取该锁 */booleantryLock(longtime, TimeUnitunit) throwsInterruptedException; /** * 释放锁 */voidunlock(); } publicclassReentrantLockimplementsLock, java.io.Serializable { /** 提供所有实现机制的同步器,ReentrantLock 的主要方法都依赖于该对象进行实现 */privatefinalSyncsync; /** * ReentrantLock锁 的同步控制基础。它的两个子类分别实现了公平锁和非公平锁,如下。 */abstractstaticclassSyncextendsAbstractQueuedSynchronizer { privatestaticfinallongserialVersionUID = -5179523762034025860L; abstractvoidlock(); /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */finalbooleannonfairTryAcquire(intacquires) { finalThreadcurrent = Thread.currentThread(); intc = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); returntrue; } } elseif (current == getExclusiveOwnerThread()) { intnextc = c + acquires; if (nextc < 0) // overflowthrownewError("Maximum lock count exceeded"); setState(nextc); returntrue; } returnfalse; } protectedfinalbooleantryRelease(intreleases) { intc = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) thrownewIllegalMonitorStateException(); booleanfree = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); returnfree; } finalbooleanisLocked() { returngetState() != 0; } } /** * 非公平锁,基于上面的 Sync类 */staticfinalclassNonfairSyncextendsSync { privatestaticfinallongserialVersionUID = 7316153563782823691L; finalvoidlock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); elseacquire(1); } protectedfinalbooleantryAcquire(intacquires) { returnnonfairTryAcquire(acquires); } } /** * 公平锁,基于上面的 Sync类 */staticfinalclassFairSyncextendsSync { privatestaticfinallongserialVersionUID = -3000897897090466540L; finalvoidlock() { acquire(1); } protectedfinalbooleantryAcquire(intacquires) { finalThreadcurrent = Thread.currentThread(); intc = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); returntrue; } } elseif (current == getExclusiveOwnerThread()) { intnextc = c + acquires; if (nextc < 0) thrownewError("Maximum lock count exceeded"); setState(nextc); returntrue; } returnfalse; } } /** * 无参初始化时,默认实例化 非公平锁 */publicReentrantLock() { sync = newNonfairSync(); } /** * 可通过参数fair 控制实例化的是 公平锁还是非公平锁 */publicReentrantLock(booleanfair) { sync = fair ? newFairSync() : newNonfairSync(); } publicvoidlock() { sync.lock(); } publicbooleantryLock() { returnsync.nonfairTryAcquire(1); } publicbooleantryLock(longtimeout, TimeUnitunit) throwsInterruptedException { returnsync.tryAcquireNanos(1, unit.toNanos(timeout)); } publicvoidunlock() { sync.release(1); } publicbooleanisLocked() { returnsync.isLocked(); } publicfinalbooleanisFair() { returnsyncinstanceofFairSync; } }

ReadWriteLock 组件

ReadWriteLock 组件的结构也很简单,与上面的 Lock 组件 不同的是,它提供了 公平的读锁写锁,以及非公平的读锁写锁。

publicinterfaceReadWriteLock { /** * 获取一个 读锁 */LockreadLock(); /** * 获取一个 写锁 */LockwriteLock(); } publicclassReentrantReadWriteLockimplementsReadWriteLock, java.io.Serializable { /** 由内部类提供的读锁 */privatefinalReentrantReadWriteLock.ReadLockreaderLock; /** 由内部类提供的写锁 */privatefinalReentrantReadWriteLock.WriteLockwriterLock; /** 提供所有实现机制的同步器 */finalSyncsync; /** * 默认创建 非公平的读锁写锁 */publicReentrantReadWriteLock() { this(false); } /** * 由参数 fair 指定读锁写锁是公平的还是非公平的 */publicReentrantReadWriteLock(booleanfair) { sync = fair ? newFairSync() : newNonfairSync(); readerLock = newReadLock(this); writerLock = newWriteLock(this); } /** * 获取写锁 * 获取读锁 */publicReentrantReadWriteLock.WriteLockwriteLock() { returnwriterLock; } publicReentrantReadWriteLock.ReadLockreadLock() { returnreaderLock; } abstractstaticclassSyncextendsAbstractQueuedSynchronizer { protectedfinalbooleantryRelease(intreleases) { if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); intnextc = getState() - releases; booleanfree = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); returnfree; } protectedfinalbooleantryAcquire(intacquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */Threadcurrent = Thread.currentThread(); intc = getState(); intw = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0)if (w == 0 || current != getExclusiveOwnerThread()) returnfalse; if (w + exclusiveCount(acquires) > MAX_COUNT) thrownewError("Maximum lock count exceeded"); // Reentrant acquiresetState(c + acquires); returntrue; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) returnfalse; setExclusiveOwnerThread(current); returntrue; } protectedfinalbooleantryReleaseShared(intunused) { Threadcurrent = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0;if (firstReaderHoldCount == 1) firstReader = null; elsefirstReaderHoldCount--; } else { HoldCounterrh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); intcount = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throwunmatchedUnlockException(); } --rh.count; } for (;;) { intc = getState(); intnextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers,// but it may allow waiting writers to proceed if// both read and write locks are now free.returnnextc == 0; } } protectedfinalinttryAcquireShared(intunused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */Threadcurrent = Thread.currentThread(); intc = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; intr = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } elseif (firstReader == current) { firstReaderHoldCount++; } else { HoldCounterrh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); elseif (rh.count == 0) readHolds.set(rh); rh.count++; } return1; } returnfullTryAcquireShared(current); } /** * Performs tryLock for write, enabling barging in both modes. * This is identical in effect to tryAcquire except for lack * of calls to writerShouldBlock. */finalbooleantryWriteLock() { Threadcurrent = Thread.currentThread(); intc = getState(); if (c != 0) { intw = exclusiveCount(c); if (w == 0 || current != getExclusiveOwnerThread()) returnfalse; if (w == MAX_COUNT) thrownewError("Maximum lock count exceeded"); } if (!compareAndSetState(c, c + 1)) returnfalse; setExclusiveOwnerThread(current); returntrue; } /** * Performs tryLock for read, enabling barging in both modes. * This is identical in effect to tryAcquireShared except for * lack of calls to readerShouldBlock. */finalbooleantryReadLock() { Threadcurrent = Thread.currentThread(); for (;;) { intc = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) returnfalse; intr = sharedCount(c); if (r == MAX_COUNT) thrownewError("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } elseif (firstReader == current) { firstReaderHoldCount++; } else { HoldCounterrh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); elseif (rh.count == 0) readHolds.set(rh); rh.count++; } returntrue; } } } finalbooleanisWriteLocked() { returnexclusiveCount(getState()) != 0; } } /** * 非公平锁 */staticfinalclassNonfairSyncextendsSync { finalbooleanwriterShouldBlock() { returnfalse; // writers can always barge } finalbooleanreaderShouldBlock() { /* As a heuristic to avoid indefinite writer starvation, * block if the thread that momentarily appears to be head * of queue, if one exists, is a waiting writer. This is * only a probabilistic effect since a new reader will not * block if there is a waiting writer behind other enabled * readers that have not yet drained from the queue. */returnapparentlyFirstQueuedIsExclusive(); } } /** * 公平锁 */staticfinalclassFairSyncextendsSync { finalbooleanwriterShouldBlock() { returnhasQueuedPredecessors(); } finalbooleanreaderShouldBlock() { returnhasQueuedPredecessors(); } } /** * 读锁 */publicstaticclassReadLockimplementsLock, java.io.Serializable { privatefinalSyncsync; protectedReadLock(ReentrantReadWriteLocklock) { sync = lock.sync; } publicvoidlock() { sync.acquireShared(1); } publicvoidlockInterruptibly() throwsInterruptedException { sync.acquireSharedInterruptibly(1); } publicbooleantryLock() { returnsync.tryReadLock(); } publicbooleantryLock(longtimeout, TimeUnitunit) throwsInterruptedException { returnsync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } publicvoidunlock() { sync.releaseShared(1); } } /** * 写锁 */publicstaticclassWriteLockimplementsLock, java.io.Serializable { privatefinalSyncsync; protectedWriteLock(ReentrantReadWriteLocklock) { sync = lock.sync; } publicvoidlock() { sync.acquire(1); } publicvoidlockInterruptibly() throwsInterruptedException { sync.acquireInterruptibly(1); } publicbooleantryLock( ) { returnsync.tryWriteLock(); } publicbooleantryLock(longtimeout, TimeUnitunit) throwsInterruptedException { returnsync.tryAcquireNanos(1, unit.toNanos(timeout)); } publicvoidunlock() { sync.release(1); } } publicfinalbooleanisFair() { returnsyncinstanceofFairSync; } publicbooleanisWriteLocked() { returnsync.isWriteLocked(); } }

AbstractQueuedSynchronizer

最后看一下抽象类 AbstractQueuedSynchronizer,在同步组件的实现中,AQS 是核心部分,同步组件的实现者通过使用 AQS 提供的模板方法实现同步组件语义,AQS 则实现了对同步状态的管理,以及对阻塞线程进行排队,等待通知等等一些底层的实现处理。AQS 的核心包括:同步队列,独占式锁的获取和释放,共享锁的获取和释放以及可中断锁,超时等待锁获取这些特性的实现,而这些实际上则是 AQS 提供出来的模板方法。源码如下。

publicabstractclassAbstractQueuedSynchronizerextendsAbstractOwnableSynchronizerimplementsjava.io.Serializable { /** * 当共享资源被某个线程占有,其他请求该资源的线程将会阻塞,从而进入同步队列。 * 就数据结构而言,队列的实现方式无外乎两者一是通过数组的形式,另外一种则是链表的形式。 * AQS中的同步队列则是通过链式方式进行实现,下面的内部类Node便是其实现的载体 */staticfinalclassNode { /** Marker to indicate a node is waiting in shared mode */staticfinalNodeSHARED = newNode(); /** Marker to indicate a node is waiting in exclusive mode */staticfinalNodeEXCLUSIVE = null; // 节点从同步队列中取消staticfinalintCANCELLED = 1; // 后继节点的线程处于等待状态,如果当前节点释放同步状态会通知后继节点,// 使得后继节点的线程能够运行;staticfinalintSIGNAL = -1; // 当前节点进入等待队列中staticfinalintCONDITION = -2; // 表示下一次共享式同步状态获取将会无条件传播下去staticfinalintPROPAGATE = -3; // 节点状态volatileintwaitStatus; // 当前节点/线程的前驱节点volatileNodeprev; // 当前节点/线程的后驱节点volatileNodenext; // 加入同步队列的线程引用volatileThreadthread; // 等待队列中的下一个节点NodenextWaiter; finalbooleanisShared() { returnnextWaiter == SHARED; } finalNodepredecessor() throwsNullPointerException { Nodep = prev; if (p == null) thrownewNullPointerException(); elsereturnp; } Node() { // Used to establish initial head or SHARED marker } Node(Threadthread, Nodemode) { // Used by addWaiterthis.nextWaiter = mode; this.thread = thread; } Node(Threadthread, intwaitStatus) { // Used by Conditionthis.waitStatus = waitStatus; this.thread = thread; } } /** * AQS实际上通过头尾指针来管理同步队列,同时实现包括获取锁失败的线程进行入队, * 释放锁时对同步队列中的线程进行通知等核心方法。 */privatetransientvolatileNodehead; privatetransientvolatileNodetail; /** * 获取独占式锁 */publicfinalvoidacquire(intarg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /** * 释放独占式锁 */publicfinalbooleanrelease(intarg) { if (tryRelease(arg)) { Nodeh = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); returntrue; } returnfalse; } /** * 获取可中断式锁 */publicfinalvoidacquireInterruptibly(intarg) throwsInterruptedException { if (Thread.interrupted()) thrownewInterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } /** * 获取共享锁 */publicfinalvoidacquireShared(intarg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } /** * 释放共享锁 */publicfinalbooleanreleaseShared(intarg) { if (tryReleaseShared(arg)) { doReleaseShared(); returntrue; } returnfalse; } }
close