本文将详细介绍 ReentrantLock 的实现原理。
在进入源码分析之前,我先提出如下观点:希望大家纠正与讨论:
如果一个节点的状态设置为Node.SIGNAL,则说明它有后继节点,并处于阻塞状态。 ReentantLock的head节点,如果不为空,在该节点代表的线程为锁的占有者。这是对CLH算法的改进之处。众所周知,CLH算法的head节点为假节点,不代表任何线程。 ReentantLock几个编码技巧值得借鉴: 利用内部类实现功能扩展,使得java.util.concurrent.locks包类数量少,十分清晰。 利用了模板模式,AbstractQueuedSynchronizer就是锁机制的模板(CLH算法的一个变种)。 本文重点关注如下几个方法的实现:
lock() unlock() lockInterruptibly() 进入源码分析之前,希望读者带着如下问题边看边想:
问题1:一个线程用lock方法申请锁而被阻塞后,调用线程的interput方法,会发生什么情况,能中断锁的获取吗?
问题2:什么是CLH算法,RenntrantLock针对CLH算法做了哪些变化。
问题3:Node.CANCEL状态的节点在什么时候会删除。
1、ReentrantLock#lock 方法详解 如下摘录自 ReentrantLock.NonFairSync
final void lock() { if (compareAndSetState(0, 1)) // @1 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); // @2 } 代码@1 首先线程请求锁时,第一步,直接通过锁的状态 state, 如果 state 为0,通过 CAS 尝试去获取锁,如果获取,直接返回,这里就是所谓的不公平,先抢占,然后再尝试排队。如果当前锁被占用,则尝试申请锁, 进入代码 @2;
继续查看 acquire(1)方法,该方法存在于 AbstractQueuedSynchronizer 类,该类是 java.util.concurent.locks 锁的队列机制实现类,基于CLH 算法的变体的基本思想。,附上 AbstractQueuedSynchronizer 的 acquire 方法源码。
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //那我们先进入到 tryAcquire(arg)方法,查看获取锁的逻辑,该方法不阻塞。
protected boolean tryAcquire(int arg) { // 说明,该方法在具体的子类中实现。
throw new UnsupportedOperationException();
} 我们一路跟踪进来,发现尝试获取锁的代码在 ReentrantLock内部类 Sync 汇总,Sync 是 NonFairSync 和 FairSync 的父类。
/** * Performs non-fair tryLock. tryAcquire is * implemented in subclasses, but both need nonfair * try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // @1 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // @2 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } 该方法,尝试获取锁,如果成功获取锁,则返回true,否则,返回false;
重点关注 代码@1, 再次查看 锁的 state,该字段,表示该锁被占用的次数,如果为0,表示没有线程持有该锁,如果 大于1,表示同一个线程,多次请求锁;也就是可重入锁的实现原理。
代码@2:进一步说明可重入锁的实现机制。再次回到上文提到的 AbstractQueuedSynchronizer的 acquire(arg)方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
}
如果 tryAcquire(arg) 返回 true,则不会执行 acquireQueued,表示成功获取锁,如果 tryAcquire(arg) 返回 false, 说明没有成功获取锁,则加入请求队列中。接着请看 addWaiter (Node.EXCLUSIVE) 方法。
addWaiter 中涉及的逻辑,就是 CLH 思想的实现,故在 AbstractQueuedSynchronizer 中,源码如下:
/** * Creates and enqueues node for current thread and given mode. * 创建并入队一节点,为当前线程和给定的模式, Node.EXCLUSIVE 独占模式 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { //@1 start node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //@1 end enq(node); return node; } 对于上面的代码@1,处说,如果当前该锁的尾部节点不为空时,只需要原子性的将新增节点放入原先的尾部,然后更新锁的 tail 属性即可。如果尾部节点不为空,说明有线程已经在该锁上等待,那如果尾部为空,是什么情况呢?尾部为空,表示没有线程持有锁,为什么该获取锁没有成功呢?我们不妨设想一下,该线程在没有执行到 addWaiter 时,尾部不为空,无法获取锁,当执行到 addWaiter 时,别的线程释放了锁,导致尾部为空,可以重新获取锁了;(其实这个就是并发编程的魅力,与 synchronized 关键字不同的机制);为了解答上述疑问,我们进入到 enq(node) 方法中一探究竟。
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize @1 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } 使用自旋来加入,众所周知,CLH算法,需要初始化一个假的 head 节点,也就是 head 节点并不代表一个等待获取锁的对象,AbstractQueuedSynchronzier 选择初始化 head,tail 的时机为第一次产生锁争用的时候。@1处为初始化head,tail,设置成功后,初始化后,再将新添加的节点放入到队列的尾部,然后该方法会返回原先的尾节点。addWaiter方法执行后,继续回到acquire(args)方法处:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 接下来,查看 acquireQueued 方法,addWaiter 方法返回的是代表当前线程的 Node 节点。
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // @1 if (p == head && tryAcquire(arg)) { // @2 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() ) //@3 interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 首先@1,获取该节点的 node 的上一个节点。
@2如果node的前节点是head,因为head初始化时,都是假节点,不代表有线程拥有锁,所以,再次尝试获取锁,如果获取锁,则将锁的 head 设置为当前获取锁的线程的 Node,然后返回 false。返回 false, 则代表 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 的结果为 false,直接返回,并不需要设置中断标记。如果当前节点不是head的话,则说明该锁被别的线程占用了,那就需要等待其他线程释放该锁,具体,我们看一下shouldParkAfterFailedAcquire,为了更好的理解 shouldParkAfterFailedAcquire, 我们先看一下parkAndCheckInterrupt 方法。
/** * Convenience method to park and then check if interrupted * 阻塞该线程,然等待唤醒后,会返回 当前线程的中断位; * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block
该方法,如果返回true,则代表该线程将被阻塞。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // @1
if (ws == Node.SIGNAL) // @2
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { // @3
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do { // @4 start
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); //@4 end
pred.next = node; // @5
} else { // @6
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
只有前置节点的状态为 0 或 PROPAGATE,,才能进入到该代码块,表明我们需要一个信号,但暂不挂起线程,调用者需要重 试一次,确保它不能获取到锁,从而阻塞该线程。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
@1 首先获取前置节点的 waitStatus。
@2 如果前置节点的waitStatus = Node.SIGNAL,那么当前节点,直接阻塞,说明状态是一个信号,如果前置节点状态为 Node.SIGNAL,那么后续节点应该阻塞的信号量,为什么这么说,情况代码@6,一个节点,新增的时候,为 0 正常。
@3,ws > 0 ,则代表前置节点已取消。
@4 处的代码,就是当前 Node 的第一个不为取消状态的前置节点,重构 CLH 队列后,返回 false, 再次进入到 acquireQueued 的无限循环中,又继续 acquireQueued 的流程,继续尝试获取锁,获取锁,或者阻塞。
@6,如果前置节点为0或 PROPAGATE(可传播),如果前置节点为0,还没有其他节点通过(prev)来判断该 prev 的后继节点是否需要阻塞过,所以,通过 CAS 设置前置节点为 Node.SIGNAL, 重试获取锁过程,避免不必要的线程阻塞。
至此,获取锁的过程就结束了,为了直观体现上述获取锁的过程,现给出如下流程图:
2、ReentrantLock unlock public void unlock() {
sync.release(1);
}
//代码直接进入到AbstractQueuedSynchronzier 的 relase方法。 /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { @1 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 直接看代码 tryRelease(arg)方法:tryRelease 方法,是由具体的子类实现的,故将目光转移到 NonFairSync 类的 tryRelease() 方法。
protected final boolean tryRelease(int releases) { int c = getState() - releases; // @1 if (Thread.currentThread() != getExclusiveOwnerThread()) //@2 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // @3 free = true; setExclusiveOwnerThread(null); } setState(c); //@4 return free; } 代码@1,首先,计算持有锁的次数=当前被持有锁的次数-减去释放的锁的数量;
代码@2,判断当前锁的持有线程释放与释放锁的线程是否相同,否则,直接抛出运行时异常
代码@3,如果释放锁后,占有次数为0,则代表该锁被释放,设置锁的占有线程为null,
代码@4,设置锁的state,如果返回true,表示锁被释放,如果返回false,表示,锁继续被该线程占有(重入了多次,就需要释放多次)。再次回到release方法,如果tryRelease方法返回true,表示可以释放锁,
public final boolean release(int arg) { if (tryRelease(arg)) { @1 Node h = head; if (h != null && h.waitStatus != 0) // @2 unparkSuccessor(h); return true; } return false; } 代码@2为什么需要判断 h!=null && h.waitStatus != 0的判断呢?,在讲解获取锁的时候,方法 shouldParkAfterFailedAcquire 中对于代码@6处的讲解,其实不难发现,一个节点在请求锁时,只有当它的前驱节点的waitStatus=Node.SIGNAL时,才会阻塞。如果 head为空,则说明 CLH 队列为空,压根就不会有线程阻塞,故无需执行 unparkSuccessor(h), 同样的道理,如果根节点的waitStatus=0,则说明压根就没有 head 后继节点判断是否要绑定的逻辑,故也没有线程被阻塞这一说。原来一个更重要的原因:改进后的CLH,head如果不为空,该节点代表获取锁的那个线程对于的Node,请看获取锁代码acquireQueued中的代码@2处,如果获得锁,setHead(node);知道这一点,就不难理解为什么在释放锁时调用unparkSuccessor(h)时,参数为head了。
现在将目光转移到 AbstractQueuedSynchronizer. unparkSuccessor(h)方法中:
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
/
private void unparkSuccessor(Node node) {
/
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0) // @1
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) { //@2 start
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
} // @2 end
if (s != null) // @3
LockSupport.unpark(s.thread);
}
代码@1,目前waitStatus > 0表示取消,等于0表示正常(新建),该步骤主要是 为了保护,避免重复释放。
代码@2 start-end,此处,主要是从占有锁的节点,往后找,找到第一个没有被取 消的节点,然后唤醒它所代表的线程。这里为什么要从尾部寻址呢?
代码@3,唤醒线程,释放锁的逻辑代码已经结束,那调用LockSupport.unpark(s.thread)后,会进入到哪呢?此时,请再次进入获取锁代码的 acquireQueue方法和shouldParkAfterFailedAcquire方法,先解读如下:
当LockSupport.unpark(s.thread)事,那acquireQueued的代码@3处parkAndCheckInterrupt方法会解除阻塞,继续放下执行,进入到 acquireQueued的for循环处:此时会有两种情况
HEAD --> Node ... > 其中Node 为 LockSupport.unpark 中的 s; HEAD --> A Cancel Node --> Node(s) 如果为第一种情况,直接进入 @2去尝试获取锁。
如果为第二种情况,shouldParkAfterFailedAcquire(prev,node) 中的 prev 为一个取消的节点,然后会重构整个 CLH 链表,删除Node 到 head 节点直接的取消节点,使得被唤醒线程的节点的上一个节点为 head,从而满足@2处的条件,进入获取锁方法。至此, lock 方法与 unlock 方法流通畅。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // @1 if (p == head && tryAcquire(arg)) { // @2 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() ) //@3 interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 与shouldParkAfterFailedAcquire方法: / private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // @1 if (ws == Node.SIGNAL) // @2 / * This node has already set status asking a release * to signal it, so it can safely park. / return true; if (ws > 0) { // @3 / * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { // @4 start node.prev = pred = pred.prev; } while (pred.waitStatus > 0); //@4 end
pred.next = node; // @5
} else { // @6
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
只有前置节点的状态为 0 或 PROPAGATE,,才能进入到该代码块,表明我们需要一个信号,但暂不挂起线程,调用者需要重 试一次,确保它不能获取到锁,从而阻塞该线程。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
为了方便大家理解,给出一个简要的释放锁的流程图:
3、ReentrantLock lockInterruptibly 源码分析 void lockInterruptibly() throws InterruptedException; 首先先提一个问题: void lock(),通过该方法去获取锁,如果锁被占用,线程阻塞,如果调用被阻塞线程的 interupt()方法,会取消获取锁吗?答案是否定的。
首先需要知道 LockSupport.park 会响应中断,但不会抛出 InterruptedException。
接下来,我们就从lockInterruptibly()方法入手,一步一步解析,并分析与lock方法的差异。
首先进入的是AbstractQueuedSynchronizer的acquireInterruptibly方法。
/** * Acquires in exclusive mode, aborting if interrupted. * Implemented by first checking interrupt status, then invoking * at least once {@link #tryAcquire}, returning on * success. Otherwise the thread is queued, possibly repeatedly * blocking and unblocking, invoking {@link #tryAcquire} * until success or the thread is interrupted. This method can be * used to implement method {@link Lock#lockInterruptibly}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. * @throws InterruptedException if the current thread is interrupted / public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); // @1 } 如果尝试获取锁失败后,进入获取锁并等待锁逻辑,doAcquireInterruptibly /* * Acquires in exclusive interruptible mode. * @param arg the acquire argument */ private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); // @1 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // @2 setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); //@3 } } finally { if (failed) cancelAcquire(node); //@4 } } 整个获取锁的逻辑与 lock 方法一样,唯一的区别在于 @3 处,如果 parkAndCheckInterrupt 如果是通过 t.interupt 方法,使LockSupport.park 取消阻塞的话,会抛出 InterruptedException,停止尝试获取锁,然后将添加的节点取消,那重点关注一下cancelAcquire(node);
/** * Cancels an ongoing attempt to acquire. * * @param node the node */ private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0) // @1
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next; //@2
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) { // @3
compareAndSetNext(pred, predNext, null);
} else { // @4
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) { // @5
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else { // @6
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
代码@1:此处的目的就是, 设置prev的值为从当前取消节点往head节点方向,第一个未取消节点。并将中间的取消节点脱离这条链。
代码@2 Node predNext = pred.next;
代码@3 如果被取消的节点是尾节点的话,那么将pred设置为尾节点,compareAndSetTail(node, pred),如果设置失败,说明,有别的线程在申请锁,使得尾部节点发生了变化,那这样的话,我当前节点取消的工作,就到此可以结束了;如果设置成功了,既然pred是尾节点,那么再次将pred的next域设置为null;当然也能设置失败,表明又有新的线程在申请说,创建了节点。所以取消操作,也到此结束。
代码@4,如果取消的节点,不是尾部节点的话,这时,需要维护CLH链,请看代码@5
代码@5,首先pred不是head节点,接下来判断是否需要设置pred.next = 当前待取消节点的next。如果pred.waitStatus == Node.SIGNAL, 或者试图将 pred.waitStatus = Node.SIGNAL 状态成功,并且pred.thread 的线程不为空;此时进一步判断待取消的节点的 next 不为空,并且状态为非取消的时,将 pred.next 设置为 node.next;该取消节点被删除。
代码@6,如果pred为head,执行一次唤醒操作。
处于Node.CANCEL状态节点的删除发生在shouldParkAfterFailedAcquire,一处就发生在cancelAcquire方法。
