protectedfinalbooleantryReleaseShared(int releases){ for (;;) { // 获取当前信号量值 int current = getState(); // 信号量+所释放的值 int next = current + releases; // 溢出 if (next < current) // overflow thrownew Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) // 更新信号量值 returntrue; } }
protectedinttryAcquireShared(int acquires){ for (;;) { // 查看队列中是否有等待的 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
protectedbooleantryReleaseShared(int releases){ // Decrement count; signal when transition to zero // 循环进行CAS,直到完成减一操作 for (;;) { int c = getState(); // 如果当前状态为0,则直接返回 if (c == 0) returnfalse; // 使用CAS让计算器减一 int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } // CountDownLatch中的队列同步器 privatefinal Sync sync;
// 这个数就是线程需要等待完成的操作数目 publicCountDownLatch(int count){ if (count < 0) thrownew IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
/** The lock for guarding barrier entry */ // 同步操作锁 privatefinal ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ // 线程拦截器 privatefinal Condition trip = lock.newCondition(); /** The number of parties */ // 每次拦截的线程数 privatefinalint parties; /* The command to run when tripped */ // 换代前执行的任务,当count减为0时表示当前代结束,需要转到下一代,在转到下一代之前会将所有阻塞的线程唤醒,在唤醒所有线程之前会执行该任务 privatefinal Runnable barrierCommand; /** The current generation */ // 栅栏的当前代 private Generation generation = new Generation();
/** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ // 计数器,初始值为parties,每次await减一,直到为0 privateint count; // parties需要达到的数量 publicCyclicBarrier(int parties){ this(parties, null); } // 达到该数量时,执行该Runnable publicCyclicBarrier(int parties, Runnable barrierAction){ if (parties <= 0) thrownew IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
privateintdowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; // broken记录当前屏障是否被打破 if (g.broken) thrownew BrokenBarrierException(); // 如果线程被中断,则会通过 breakBarrier 方法将 broken 设置为true,也就是说,如果有线程收到中断通知,直接就打破屏障,停止 CyclicBarrier, 并唤醒所有线程
if (Thread.interrupted()) { breakBarrier(); thrownew InterruptedException(); } // 如果index==0,表示所有的线程都到了屏障点,此时执行初始化时传递的任务 int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; // 执行任务 if (command != null) command.run(); ranAction = true; // 激活其他因调用await方法而被阻塞的线程,并重置CyclicBarrier,转到下一代 nextGeneration(); return0; } finally { // 确保在任务未成功执行能将所有线程唤醒 if (!ranAction) breakBarrier(); } }
// index != 0,说明当前不是最后一个线程调用await方法 // loop until tripped, broken, interrupted, or timed out for (;;) { try { // 没有设置超时时间 if (!timed) // 进行等待 trip.await(); // 设置了超时时间 elseif (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 条件等待被中断,则判断是否有其他线程已经使屏障破坏。若没有则进行屏障破坏处理,并抛出异常;否则再次中断当前线程 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } // 如果线程因打破栅栏而被唤醒则抛出异常 if (g.broken) thrownew BrokenBarrierException(); // 如果线程因换代操作被唤醒则返回计数器的值 if (g != generation) return index; // 如果线程因时间到了被唤醒则打破栅栏抛出异常 if (timed && nanos <= 0L) { breakBarrier(); thrownew TimeoutException(); } } } finally { lock.unlock(); } }
privatevoidnextGeneration(){ // signal completion of last generation // 唤醒条件队列所有线程 trip.signalAll(); // set up next generation // 设置计数器的值为需要拦截的线程数 count = parties; // 重新设置栅栏代次 generation = new Generation(); }
// 表示已完成了当前阶段 // 到达进行减一操作,会进行阻塞,等待其他线程到达 publicintarriveAndAwaitAdvance(){ // Specialization of doArrive+awaitAdvance eliminating some reads/paths final Phaser root = this.root; for (;;) { long s = (root == this) ? state : reconcileState(); int phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; int counts = (int)s; int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); if (unarrived <= 0) thrownew IllegalStateException(badArrive(s)); if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= ONE_ARRIVAL)) { if (unarrived > 1) return root.internalAwaitAdvance(phase, null); if (root != this) return parent.arriveAndAwaitAdvance(); long n = s & PARTIES_MASK; // base of next state int nextUnarrived = (int)n >>> PARTIES_SHIFT; if (onAdvance(phase, nextUnarrived)) n |= TERMINATION_BIT; elseif (nextUnarrived == 0) n |= EMPTY; else n |= nextUnarrived; int nextPhase = (phase + 1) & MAX_PHASE; n |= (long)nextPhase << PHASE_SHIFT; if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n)) return (int)(state >>> PHASE_SHIFT); // terminated releaseWaiters(phase); return nextPhase; } } }