/** * Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired * without blocking. * * If threadA acquires the lock and performs the check for completion before completion criteria is met * and threadB satisfies the completion criteria, but fails to acquire the lock because threadA has not * yet released the lock, we need to ensure that completion is attempted again without blocking threadA * or threadB. `tryCompletePending` is set by threadB when it fails to acquire the lock and at least one * of threadA or threadB will attempt completion of the operation if this flag is set. This ensures that * every invocation of `maybeTryComplete` is followed by at least one invocation of `tryComplete` until * the operation is actually completed. */ private[server] defmaybeTryComplete(): Boolean = { var retry = false var done = false do { if (lock.tryLock()) { try { tryCompletePending.set(false) done = tryComplete() } finally { lock.unlock() } // While we were holding the lock, another thread may have invoked `maybeTryComplete` and set // `tryCompletePending`. In this case we should retry. retry = tryCompletePending.get() } else { // Another thread is holding the lock. If `tryCompletePending` is already set and this thread failed to // acquire the lock, then the thread that is holding the lock is guaranteed to see the flag and retry. // Otherwise, we should set the flag and retry on this thread since the thread holding the lock may have // released the lock and returned by the time the flag is set. retry = !tryCompletePending.getAndSet(true) } } while (!isCompleted && retry) done }
/* * run() method defines a task that is executed on timeout */ overridedefrun(): Unit = { if (forceComplete()) onExpiration() } }
objectDelayedOperationPurgatory{
privatevalShards = 512// Shard the watcher list to reduce lock contention
/** * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations. */ finalclassDelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, timeoutTimer: Timer, brokerId: Int = 0, purgeInterval: Int = 1000, reaperEnabled: Boolean = true, timerEnabled: Boolean = true) extendsLoggingwithKafkaMetricsGroup{ /* a list of operation watching keys */ privateclassWatcherList{ val watchersByKey = newPool[Any, Watchers](Some((key: Any) => newWatchers(key)))
val watchersLock = newReentrantLock()
/* * Return all the current watcher lists, * note that the returned watchers may be removed from the list by other threads */ defallWatchers= { watchersByKey.values } }
/** 首先检测操作是否完成,如果未完成,则迭代watchersForKey */ deftryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = { assert(watchKeys.nonEmpty, "The watch key list can't be empty")
//检测是否完成 var isCompletedByMe = operation.tryComplete() if (isCompletedByMe) returntrue
var watchCreated = false for(key <- watchKeys) { // If the operation is already completed, stop adding it to the rest of the watcher list. if (operation.isCompleted) returnfalse watchForOperation(key, operation)
if (!watchCreated) { watchCreated = true estimatedTotalOperations.incrementAndGet() } }
isCompletedByMe = operation.maybeTryComplete() if (isCompletedByMe) returntrue
// if it cannot be completed by now and hence is watched, add to the expire queue also if (!operation.isCompleted) { if (timerEnabled) timeoutTimer.add(operation) if (operation.isCompleted) { // cancel the timer task operation.cancel() } }
false }
/** * Check if some delayed operations can be completed with the given watch key, * and if yes complete them. * * @return the number of completed operations during this process */ defcheckAndComplete(key: Any): Int = { val wl = watcherList(key) val watchers = inLock(wl.watchersLock) { wl.watchersByKey.get(key) } if(watchers == null) 0 else watchers.tryCompleteWatched() }
/** * Return the total size of watch lists the purgatory. Since an operation may be watched * on multiple lists, and some of its watched entries may still be in the watch lists * even when it has been completed, this number may be larger than the number of real operations watched */ defwatched: Int = { watcherLists.foldLeft(0) { case (sum, watcherList) => sum + watcherList.allWatchers.map(_.countWatched).sum } }
/** * Return the number of delayed operations in the expiry queue */ defdelayed: Int = timeoutTimer.size
/** * Cancel watching on any delayed operations for the given key. Note the operation will not be completed */ defcancelForKey(key: Any): List[T] = { val wl = watcherList(key) inLock(wl.watchersLock) { val watchers = wl.watchersByKey.remove(key) if (watchers != null) watchers.cancel() else Nil } }
/* * Return the watch list of the given key, note that we need to * grab the removeWatchersLock to avoid the operation being added to a removed watcher list */ privatedefwatchForOperation(key: Any, operation: T) { val wl = watcherList(key) inLock(wl.watchersLock) { val watcher = wl.watchersByKey.getAndMaybePut(key) watcher.watch(operation) } }
/* * Remove the key from watcher lists if its list is empty */ privatedefremoveKeyIfEmpty(key: Any, watchers: Watchers) { val wl = watcherList(key) inLock(wl.watchersLock) { // if the current key is no longer correlated to the watchers to remove, skip if (wl.watchersByKey.get(key) != watchers) return
/** * Shutdown the expire reaper thread */ defshutdown() { if (reaperEnabled) expirationReaper.shutdown() timeoutTimer.shutdown() }
/** * A linked list of watched delayed operations based on some key */ privateclassWatchers(val key: Any) { private[this] val operations = newConcurrentLinkedQueue[T]()
// count the current number of watched operations. This is O(n), so use isEmpty() if possible defcountWatched: Int = operations.size
defisEmpty: Boolean = operations.isEmpty
// add the element to watch // 将DelayedOperation添加到operations集合中 defwatch(t: T) { operations.add(t) }
// traverse the list and try to complete some watched elements // 迭代operations集合中的DelayedOperation,通过isCompleted检测是否已经执行完成。若完成,则从operations集合中移除,否则调用maybeTryComplete尝试执行完成 deftryCompleteWatched(): Int = { var completed = 0
val iter = operations.iterator() while (iter.hasNext) { val curr = iter.next() if (curr.isCompleted) { // 执行完成 // another thread has completed this operation, just remove it // 移除 iter.remove() } elseif (curr.maybeTryComplete()) {//尝试执行完成,如果执行完成,则移除 iter.remove() completed += 1 } } // 为空,表示全部完成 将该watchers从pool中移除 if (operations.isEmpty) removeKeyIfEmpty(key, this)
completed }
defcancel(): List[T] = { val iter = operations.iterator() val cancelled = newListBuffer[T]() while (iter.hasNext) { val curr = iter.next() curr.cancel() iter.remove() cancelled += curr } cancelled.toList }
// traverse the list and purge elements that are already completed by others //与tryCompleteWatched基本相同,只是不会尝试对未完成的DelayedOperation执行完成 defpurgeCompleted(): Int = { var purged = 0
val iter = operations.iterator() while (iter.hasNext) { val curr = iter.next() if (curr.isCompleted) { iter.remove() purged += 1 } }
if (operations.isEmpty) removeKeyIfEmpty(key, this)
// Trigger a purge if the number of completed but still being watched operations is larger than // the purge threshold. That number is computed by the difference btw the estimated total number of // operations and the number of pending delayed operations. if (estimatedTotalOperations.get - delayed > purgeInterval) { // now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to // clean up watchers. Note that, if more operations are completed during the clean up, we may end up with // a little overestimated total number of operations. estimatedTotalOperations.getAndSet(delayed) debug("Begin purging watch lists") val purged = watcherLists.foldLeft(0) { case (sum, watcherList) => sum + watcherList.allWatchers.map(_.purgeCompleted()).sum } debug("Purged %d elements from watch lists.".format(purged)) } }
/** * A background reaper to expire delayed operations that have timed out */ privateclassExpiredOperationReaperextendsShutdownableThread( "ExpirationReaper-%d-%s".format(brokerId, purgatoryName), false) {
/** * A delayed produce operation that can be created by the replica manager and watched * in the produce operation purgatory */ classDelayedProduce(delayMs: Long, // 延迟时间 produceMetadata: ProduceMetadata,//记录了本次ProduceRequest的ack信息以及对应分区对消息追加处理结果信息 replicaManager: ReplicaManager, responseCallback: Map[TopicPartition, PartitionResponse] => Unit, lockOpt: Option[Lock] = None) extendsDelayedOperation(delayMs, lockOpt) {
// first update the acks pending variable according to the error code produceMetadata.produceStatus.foreach { case (topicPartition, status) => if (status.responseStatus.error == Errors.NONE) { // Timeout error state will be cleared when required acks are received status.acksPending = true status.responseStatus.error = Errors.REQUEST_TIMED_OUT } else { status.acksPending = false }
trace(s"Initial partition status for $topicPartition is $status") }
/** * The delayed produce operation can be completed if every partition * it produces to is satisfied by one of the following: * * Case A: This broker is no longer the leader: set an error in response * Case B: This broker is the leader: * B.1 - If there was a local error thrown while checking if at least requiredAcks * replicas have caught up to this operation: set an error in response * B.2 - Otherwise, set the response with no error. */ overridedeftryComplete(): Boolean = { // check for each partition if it still has pending acks produceMetadata.produceStatus.foreach { case (topicPartition, status) => trace(s"Checking produce satisfaction for $topicPartition, current status $status") // skip those partitions that have already been satisfied if (status.acksPending) { val (hasEnough, error) = replicaManager.getPartition(topicPartition) match { caseSome(partition) => if (partition eq ReplicaManager.OfflinePartition) (false, Errors.KAFKA_STORAGE_ERROR) else partition.checkEnoughReplicasReachOffset(status.requiredOffset) caseNone => // Case A (false, Errors.UNKNOWN_TOPIC_OR_PARTITION) } // Case B.1 || B.2 if (error != Errors.NONE || hasEnough) { status.acksPending = false status.responseStatus.error = error } } }
// check if every partition has satisfied at least one of case A or B if (!produceMetadata.produceStatus.values.exists(_.acksPending)) forceComplete() else false }
overridedefonExpiration() { produceMetadata.produceStatus.foreach { case (topicPartition, status) => if (status.acksPending) { debug(s"Expiring produce request for partition $topicPartition with status $status") DelayedProduceMetrics.recordExpiration(topicPartition) } } }
/** * 回调responseCallback * Upon completion, return the current response status along with the error code per partition */ overridedefonComplete() { val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus) responseCallback(responseStatus) } }
/** * The operation can be completed if: * * Case A: This broker is no longer the leader for some partitions it tries to fetch * Case B: This broker does not know of some partitions it tries to fetch * Case C: The fetch offset locates not on the last segment of the log * Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes * Case E: The partition is in an offline log directory on this broker * Case F: This broker is the leader, but the requested epoch is now fenced * * Upon completion, should return whatever data is available for each valid partition */ overridedeftryComplete(): Boolean = { var accumulatedSize = 0 fetchMetadata.fetchPartitionStatus.foreach { case (topicPartition, fetchStatus) => val fetchOffset = fetchStatus.startOffsetMetadata val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch try { if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) { val partition = replicaManager.getPartitionOrException(topicPartition, expectLeader = fetchMetadata.fetchOnlyLeader) val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader)
val endOffset = fetchMetadata.fetchIsolation match { caseFetchLogEnd => offsetSnapshot.logEndOffset caseFetchHighWatermark => offsetSnapshot.highWatermark caseFetchTxnCommitted => offsetSnapshot.lastStableOffset }
// Go directly to the check for Case D if the message offsets are the same. If the log segment // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case C. if (endOffset.messageOffset != fetchOffset.messageOffset) { if (endOffset.onOlderSegment(fetchOffset)) { // Case C, this can happen when the new fetch operation is on a truncated leader debug(s"Satisfying fetch $fetchMetadata since it is fetching later segments of partition $topicPartition.") return forceComplete() } elseif (fetchOffset.onOlderSegment(endOffset)) { // Case C, this can happen when the fetch operation is falling behind the current segment // or the partition has just rolled a new segment debug(s"Satisfying fetch $fetchMetadata immediately since it is fetching older segments.") // We will not force complete the fetch request if a replica should be throttled. if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId)) return forceComplete() } elseif (fetchOffset.messageOffset < endOffset.messageOffset) { // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition) val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes) if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId)) accumulatedSize += bytesAvailable } } } } catch { case _: KafkaStorageException => // Case E debug(s"Partition $topicPartition is in an offline log directory, satisfy $fetchMetadata immediately") return forceComplete() case _: UnknownTopicOrPartitionException => // Case B debug(s"Broker no longer knows of partition $topicPartition, satisfy $fetchMetadata immediately") return forceComplete() case _: FencedLeaderEpochException => // Case F debug(s"Broker is the leader of partition $topicPartition, but the requested epoch " + s"$fetchLeaderEpoch is fenced by the latest leader epoch, satisfy $fetchMetadata immediately") return forceComplete() case _: NotLeaderForPartitionException => // Case A debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicPartition, fetchMetadata)) return forceComplete() } }
// Case D if (accumulatedSize >= fetchMetadata.fetchMinBytes) forceComplete() else false }
overridedefonExpiration() { if (fetchMetadata.isFromFollower) DelayedFetchMetrics.followerExpiredRequestMeter.mark() else DelayedFetchMetrics.consumerExpiredRequestMeter.mark() }
/** * Upon completion, read whatever data is available and pass to the complete callback */ overridedefonComplete() { val logReadResults = replicaManager.readFromLocalLog( replicaId = fetchMetadata.replicaId, fetchOnlyFromLeader = fetchMetadata.fetchOnlyLeader, fetchIsolation = fetchMetadata.fetchIsolation, fetchMaxBytes = fetchMetadata.fetchMaxBytes, hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit, readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo }, quota = quota)
val fetchPartitionData = logReadResults.map { case (tp, result) => tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records, result.lastStableOffset, result.info.abortedTransactions) }