protectedfinalvoidsetState(int newState){ state = newState; }
// 使用CAS设置当前状态,可以保证原子性 protectedfinalbooleancompareAndSetState(int expect, int update){ // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
staticfinallong spinForTimeoutThreshold = 1000L;
// 获取锁 独占锁 publicfinalvoidacquire(int arg){ // tryAcquire需要具体子类去实现,保证线程安全的获取同步状态 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 同步状态获取失败,通过addWaiter将节点加入到同步队列尾部,acquireQueued以无限循环的方式获取同步状态 selfInterrupt(); } // 以不可中断方式获取锁 finalbooleanacquireQueued(final Node node, int arg){ boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取给定节点的前驱节点,这里要么是头部节点head,要么是其他排队的节点 final Node p = node.predecessor(); // 如果node的前驱结点p是head,表示node是第二个节点,可以尝试去获取资源 if (p == head && tryAcquire(arg)) { // 拿到资源后,将head指向该节点 setHead(node); p.next = null; // help GC // 设置获取成功状态 failed = false; return interrupted; } // 走到这里说明锁的状态不可以获取,判断是否可以挂起当前线程,如果判断为真,则可以挂起当前线程,否则继续循环 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 阻塞等待被唤醒 interrupted = true; } } finally { // 如果获取失败则取消获取 if (failed) cancelAcquire(node); } } private Node addWaiter(Node mode){ // 生成该线程对应的Node节点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 获取当前尾结点 Node pred = tail; if (pred != null) { // 尾结点不为空 // 当前节点的前驱节点指向尾结点 node.prev = pred; // 设置当前节点为尾结点 if (compareAndSetTail(pred, node)) { // 旧的尾结点的后继节点指向当前节点 pred.next = node; return node; } } // 如果等待队列为空或者上述CAS操作失败,则自旋CAS插入 //(可能有多个线程并发加入队尾产生竞争) enq(node); return node; } private Node enq(final Node node){ for (;;) { Node t = tail; // 第一次循环,尾结点为null,需要进行初始化 if (t == null) { // Must initialize // 构建一个哨兵节点作为头节点 if (compareAndSetHead(new Node())) // 尾结点同样指向哨兵节点 tail = head; } else { // 尾结点不为空 // 将该节点的前驱节点指向当前的尾结点 node.prev = t; // 设置该节点为尾结点 if (compareAndSetTail(t, node)) { // 原本的尾结点的后继节点设置为当前节点 t.next = node; return t; } } } }
publicfinalvoidacquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) thrownew InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
publicfinalbooleantryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) thrownew InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
// 释放锁 publicfinalbooleanrelease(int arg){ if (tryRelease(arg)) { // 获取head节点 Node h = head; // 如果head节点不为空并且等待状态不为0则唤醒后继节点 if (h != null && h.waitStatus != 0) unparkSuccessor(h); returntrue; } returnfalse; } // 唤醒后继节点 privatevoidunparkSuccessor(Node node){ /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ // 获取给定节点的等待状态 int ws = node.waitStatus; // 将等待状态更新为0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 获取给定节点的后继节点 Node s = node.next; // 后继节点为空或者等待状态为CANCELLED if (s == null || s.waitStatus > 0) { s = null; // 从后向前遍历找到第一个不是CANCELLED状态的节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 唤醒 if (s != null) LockSupport.unpark(s.thread); } // 判断是否可以将当前节点挂起 privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node){ // 获取前驱结点的等待状态 int ws = pred.waitStatus; // 如果等待状态为SIGNAL,表明前驱结点会唤醒当前节点,可以挂起 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ returntrue; // 大于0的状态目前是CANCELLED if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ // 清理队列中已取消的前驱节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 走到这里可能是CONDITION和PROPAGATE或者是0 /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 尝试将前驱结点的等待状态设置为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } returnfalse; }
staticfinalclassNode{ /** Marker to indicate a node is waiting in shared mode */ // 标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的 staticfinal Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ // 标记线程是获取独占锁资源时被挂起后放入AQS队列的 staticfinal Node EXCLUSIVE = null;
publicclassConditionObjectimplementsCondition, java.io.Serializable{ privatestaticfinallong serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ privatetransient Node firstWaiter; /** Last node of condition queue. */ privatetransient Node lastWaiter;
/** * Creates a new {@code ConditionObject} instance. */ publicConditionObject(){ }
// Internal methods
/** * Adds a new waiter to wait queue. * @return its new wait node */ private Node addConditionWaiter(){ Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ privatevoiddoSignal(Node first){ do { // 向后移一位 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
/** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */ privatevoiddoSignalAll(Node first){ lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
/** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. */ privatevoidunlinkCancelledWaiters(){ Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
// public methods
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ // 唤醒条件队列中的下一个节点 publicfinalvoidsignal(){ // 判断当前线程是否持有锁 if (!isHeldExclusively()) thrownew IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
/** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ publicfinalvoidsignalAll(){ if (!isHeldExclusively()) thrownew IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); }
/** * Implements uninterruptible condition wait. * <ol> * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * </ol> */ publicfinalvoidawaitUninterruptibly(){ Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
/* * For interruptible waits, we need to track whether to throw * InterruptedException, if interrupted while blocked on * condition, versus reinterrupt current thread, if * interrupted while blocked waiting to re-acquire. */
/** Mode meaning to reinterrupt on exit from wait */ privatestaticfinalint REINTERRUPT = 1; /** Mode meaning to throw InterruptedException on exit from wait */ privatestaticfinalint THROW_IE = -1;
/** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ privateintcheckInterruptWhileWaiting(Node node){ return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
/** * Throws InterruptedException, reinterrupts current thread, or * does nothing, depending on mode. */ privatevoidreportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) thrownew InterruptedException(); elseif (interruptMode == REINTERRUPT) selfInterrupt(); }
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ publicfinalvoidawait()throws InterruptedException { if (Thread.interrupted()) thrownew InterruptedException(); // 将当前线程添加到条件队列尾部 Node node = addConditionWaiter(); // 在进入条件等待之前先完全释放锁 int savedState = fullyRelease(node); int interruptMode = 0; // 线程一直进行条件等待 while (!isOnSyncQueue(node)) { // 进行条件等待的都在这里挂起,等待被唤醒 LockSupport.park(this); // 当前线程被唤醒后检查是否被中断,如果被中断表示节点取消条件等待,移出条件队列 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
/** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ publicfinallongawaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) thrownew InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); finallong deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); }
/** * Implements absolute timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * </ol> */ publicfinalbooleanawaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) thrownew InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
/** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * </ol> */ publicfinalbooleanawait(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) thrownew InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); finallong deadline = System.nanoTime() + nanosTimeout; boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
// support for instrumentation
/** * Returns true if this condition was created by the given * synchronization object. * * @return {@code true} if owned */ finalbooleanisOwnedBy(AbstractQueuedSynchronizer sync){ return sync == AbstractQueuedSynchronizer.this; }
/** * Queries whether any threads are waiting on this condition. * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}. * * @return {@code true} if there are any waiting threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ protectedfinalbooleanhasWaiters(){ if (!isHeldExclusively()) thrownew IllegalMonitorStateException(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) returntrue; } returnfalse; }
/** * Returns an estimate of the number of threads waiting on * this condition. * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}. * * @return the estimated number of waiting threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ protectedfinalintgetWaitQueueLength(){ if (!isHeldExclusively()) thrownew IllegalMonitorStateException(); int n = 0; for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++n; } return n; }
/** * Returns a collection containing those threads that may be * waiting on this Condition. * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}. * * @return the collection of threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ protectedfinal Collection<Thread> getWaitingThreads(){ if (!isHeldExclusively()) thrownew IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<Thread>(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null) list.add(t); } } return list; } }
publicclassLockSupport{ privateLockSupport(){} // Cannot be instantiated.
privatestaticvoidsetBlocker(Thread t, Object arg){ // Even though volatile, hotspot doesn't need a write barrier here. UNSAFE.putObject(t, parkBlockerOffset, arg); }
// 唤醒线程,如果thread之前因调用park()方法而被挂起,则调用unpark后,该线程会被唤醒;如果Thread之前没有调用park,则调用unpark后,再次调用park方法,会立即返回 publicstaticvoidunpark(Thread thread){ if (thread != null) UNSAFE.unpark(thread); }
staticfinalintnextSecondarySeed(){ int r; Thread t = Thread.currentThread(); if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) { r ^= r << 13; // xorshift r ^= r >>> 17; r ^= r << 5; } elseif ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0) r = 1; // avoid zero UNSAFE.putInt(t, SECONDARY, r); return r; }