0%

kafka之延迟操作组件

延迟操作组件

该组件可以辅助kafka其他组件完成相应的功能

DelayedOperation

kafka将一些不立即执行而要等待满足一定条件才触发完成的操作成为延迟操作,并将这类操作定义为一个抽象类DelaydOperation,DelayedOperation是一个基于事件启动有失效时间的TimerTask,TimerTask实现了Runnable接口,维护了一个TimerTaskEntry对象,TimerTaskEntry绑定了一个TimerTask,TimerTaskEntry被添加到TimerTaskList中,TimerTaskList是一个环形双向链表,按失效时间排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
abstract class DelayedOperation(override val delayMs: Long,
lockOpt: Option[Lock] = None) extends TimerTask with Logging {
// 控制某个延迟操作
private val completed = new AtomicBoolean(false)
private val tryCompletePending = new AtomicBoolean(false)
// Visible for testing
private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock)

/*
* 在条件满足时,检测延迟任务是否未被执行。若未被执行,则先调用canel方法解除该延迟操作与TimerTaskEntry的绑定,将该延迟操作从TimerTaskList链表中移除,然后调用onComplete()方法让延迟操作执行完成
* Force completing the delayed operation, if not already completed.
* This function can be triggered when
*
* 1. The operation has been verified to be completable inside tryComplete()
* 2. The operation has expired and hence needs to be completed right now
*
* Return true iff the operation is completed by the caller: note that
* concurrent threads can try to complete the same operation, but only
* the first thread will succeed in completing the operation and return
* true, others will still return false
*/
def forceComplete(): Boolean = {
//CAS原子性操作,保证onComplete只会被调用一次
if (completed.compareAndSet(false, true)) {
// cancel the timeout timer
cancel()
onComplete()
true
} else {
false
}
}

/**
* Check if the delayed operation is already completed
*/
def isCompleted: Boolean = completed.get()

/**
* 由子类来实现当延迟操作已到达失效时间的逻辑处理
* kafka通过SystemTimer来定期检测请求是否超时。SystemTimer是kafka实现的底层基于层级时间轮和DelayQueue定时器,维护了一个newFixedThreadPool线程池
* Call-back to execute when a delayed operation gets expired and hence forced to complete.
*/
def onExpiration(): Unit

/**
* 由子类实现,实际业务逻辑
* Process for completing an operation; This function needs to be defined
* in subclasses and will be called exactly once in forceComplete()
*/
def onComplete(): Unit

/**
* 由子类实现,负责检测执行条件是否满足,若满足条件,则调用forceComplete()方法完成延迟操作
* Try to complete the delayed operation by first checking if the operation
* can be completed by now. If yes execute the completion logic by calling
* forceComplete() and return true iff forceComplete returns true; otherwise return false
*
* This function needs to be defined in subclasses
*/
def tryComplete(): Boolean

/**
* Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired
* without blocking.
*
* If threadA acquires the lock and performs the check for completion before completion criteria is met
* and threadB satisfies the completion criteria, but fails to acquire the lock because threadA has not
* yet released the lock, we need to ensure that completion is attempted again without blocking threadA
* or threadB. `tryCompletePending` is set by threadB when it fails to acquire the lock and at least one
* of threadA or threadB will attempt completion of the operation if this flag is set. This ensures that
* every invocation of `maybeTryComplete` is followed by at least one invocation of `tryComplete` until
* the operation is actually completed.
*/
private[server] def maybeTryComplete(): Boolean = {
var retry = false
var done = false
do {
if (lock.tryLock()) {
try {
tryCompletePending.set(false)
done = tryComplete()
} finally {
lock.unlock()
}
// While we were holding the lock, another thread may have invoked `maybeTryComplete` and set
// `tryCompletePending`. In this case we should retry.
retry = tryCompletePending.get()
} else {
// Another thread is holding the lock. If `tryCompletePending` is already set and this thread failed to
// acquire the lock, then the thread that is holding the lock is guaranteed to see the flag and retry.
// Otherwise, we should set the flag and retry on this thread since the thread holding the lock may have
// released the lock and returned by the time the flag is set.
retry = !tryCompletePending.getAndSet(true)
}
} while (!isCompleted && retry)
done
}

/*
* run() method defines a task that is executed on timeout
*/
override def run(): Unit = {
if (forceComplete())
onExpiration()
}
}

object DelayedOperationPurgatory {

private val Shards = 512 // Shard the watcher list to reduce lock contention

def apply[T <: DelayedOperation](purgatoryName: String,
brokerId: Int = 0,
purgeInterval: Int = 1000,
reaperEnabled: Boolean = true,
timerEnabled: Boolean = true): DelayedOperationPurgatory[T] = {
val timer = new SystemTimer(purgatoryName)
new DelayedOperationPurgatory[T](purgatoryName, timer, brokerId, purgeInterval, reaperEnabled, timerEnabled)
}

}

DelayedOperationPurgatory

DelayedOperationPurgatory是一个对DelayedOperation管理的辅助类,以泛型的形式将一个DelayedOperation添加到其内部维护的Pool[Any,Watchers]类型的watchesForKey对象中,同时将DelayedOperation添加到SystemTimer中

Watchers是DelayedOperationPurgatory的内部类,底层是ConcurrentLinkedQueue,ConcurrentLinkedQueue类型的operations属性,保存DelayedOperation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
/**
* A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations.
*/
final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
timeoutTimer: Timer,
brokerId: Int = 0,
purgeInterval: Int = 1000,
reaperEnabled: Boolean = true,
timerEnabled: Boolean = true)
extends Logging with KafkaMetricsGroup {
/* a list of operation watching keys */
private class WatcherList {
val watchersByKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))

val watchersLock = new ReentrantLock()

/*
* Return all the current watcher lists,
* note that the returned watchers may be removed from the list by other threads
*/
def allWatchers = {
watchersByKey.values
}
}

private val watcherLists = Array.fill[WatcherList](DelayedOperationPurgatory.Shards)(new WatcherList)
private def watcherList(key: Any): WatcherList = {
watcherLists(Math.abs(key.hashCode() % watcherLists.length))
}

// the number of estimated total operations in the purgatory
private[this] val estimatedTotalOperations = new AtomicInteger(0)

/* background thread expiring operations that have timed out */
private val expirationReaper = new ExpiredOperationReaper()

private val metricsTags = Map("delayedOperation" -> purgatoryName)

newGauge(
"PurgatorySize",
new Gauge[Int] {
def value: Int = watched
},
metricsTags
)

newGauge(
"NumDelayedOperations",
new Gauge[Int] {
def value: Int = delayed
},
metricsTags
)

if (reaperEnabled)
expirationReaper.start()

/**
* Check if the operation can be completed, if not watch it based on the given watch keys
*
* Note that a delayed operation can be watched on multiple keys. It is possible that
* an operation is completed after it has been added to the watch list for some, but
* not all of the keys. In this case, the operation is considered completed and won't
* be added to the watch list of the remaining keys. The expiration reaper thread will
* remove this operation from any watcher list in which the operation exists.
*
* @param operation the delayed operation to be checked
* @param watchKeys keys for bookkeeping the operation
* @return true iff the delayed operations can be completed by the caller
*/
def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
assert(watchKeys.nonEmpty, "The watch key list can't be empty")

// The cost of tryComplete() is typically proportional to the number of keys. Calling
// tryComplete() for each key is going to be expensive if there are many keys. Instead,
// we do the check in the following way. Call tryComplete(). If the operation is not completed,
// we just add the operation to all keys. Then we call tryComplete() again. At this time, if
// the operation is still not completed, we are guaranteed that it won't miss any future triggering
// event since the operation is already on the watcher list for all keys. This does mean that
// if the operation is completed (by another thread) between the two tryComplete() calls, the
// operation is unnecessarily added for watch. However, this is a less severe issue since the
// expire reaper will clean it up periodically.

// At this point the only thread that can attempt this operation is this current thread
// Hence it is safe to tryComplete() without a lock
//检测是否完成
var isCompletedByMe = operation.tryComplete()
if (isCompletedByMe)
return true

var watchCreated = false
for(key <- watchKeys) {
// If the operation is already completed, stop adding it to the rest of the watcher list.
if (operation.isCompleted)
return false
watchForOperation(key, operation)

if (!watchCreated) {
watchCreated = true
estimatedTotalOperations.incrementAndGet()
}
}

isCompletedByMe = operation.maybeTryComplete()
if (isCompletedByMe)
return true

// if it cannot be completed by now and hence is watched, add to the expire queue also
if (!operation.isCompleted) {
if (timerEnabled)
timeoutTimer.add(operation)
if (operation.isCompleted) {
// cancel the timer task
operation.cancel()
}
}

false
}

/**
* Check if some delayed operations can be completed with the given watch key,
* and if yes complete them.
*
* @return the number of completed operations during this process
*/
def checkAndComplete(key: Any): Int = {
val wl = watcherList(key)
val watchers = inLock(wl.watchersLock) { wl.watchersByKey.get(key) }
if(watchers == null)
0
else
watchers.tryCompleteWatched()
}

/**
* Return the total size of watch lists the purgatory. Since an operation may be watched
* on multiple lists, and some of its watched entries may still be in the watch lists
* even when it has been completed, this number may be larger than the number of real operations watched
*/
def watched: Int = {
watcherLists.foldLeft(0) { case (sum, watcherList) => sum + watcherList.allWatchers.map(_.countWatched).sum }
}

/**
* Return the number of delayed operations in the expiry queue
*/
def delayed: Int = timeoutTimer.size

/**
* Cancel watching on any delayed operations for the given key. Note the operation will not be completed
*/
def cancelForKey(key: Any): List[T] = {
val wl = watcherList(key)
inLock(wl.watchersLock) {
val watchers = wl.watchersByKey.remove(key)
if (watchers != null)
watchers.cancel()
else
Nil
}
}

/*
* Return the watch list of the given key, note that we need to
* grab the removeWatchersLock to avoid the operation being added to a removed watcher list
*/
private def watchForOperation(key: Any, operation: T) {
val wl = watcherList(key)
inLock(wl.watchersLock) {
val watcher = wl.watchersByKey.getAndMaybePut(key)
watcher.watch(operation)
}
}

/*
* Remove the key from watcher lists if its list is empty
*/
private def removeKeyIfEmpty(key: Any, watchers: Watchers) {
val wl = watcherList(key)
inLock(wl.watchersLock) {
// if the current key is no longer correlated to the watchers to remove, skip
if (wl.watchersByKey.get(key) != watchers)
return

if (watchers != null && watchers.isEmpty) {
wl.watchersByKey.remove(key)
}
}
}

/**
* Shutdown the expire reaper thread
*/
def shutdown() {
if (reaperEnabled)
expirationReaper.shutdown()
timeoutTimer.shutdown()
}

/**
* A linked list of watched delayed operations based on some key
*/
private class Watchers(val key: Any) {
private[this] val operations = new ConcurrentLinkedQueue[T]()

// count the current number of watched operations. This is O(n), so use isEmpty() if possible
def countWatched: Int = operations.size

def isEmpty: Boolean = operations.isEmpty

// add the element to watch
// 将DelayedOperation添加到operations集合中
def watch(t: T) {
operations.add(t)
}

// traverse the list and try to complete some watched elements
// 迭代operations集合中的DelayedOperation,通过isCompleted检测是否已经执行完成
def tryCompleteWatched(): Int = {
var completed = 0

val iter = operations.iterator()
while (iter.hasNext) {
val curr = iter.next()
if (curr.isCompleted) { // 执行完成
// another thread has completed this operation, just remove it
// 移除
iter.remove()
} else if (curr.maybeTryComplete()) {//尝试执行完成,如果执行完成,则移除
iter.remove()
completed += 1
}
}
// 为空,表示全部完成 将该watchers从pool中移除
if (operations.isEmpty)
removeKeyIfEmpty(key, this)

completed
}

def cancel(): List[T] = {
val iter = operations.iterator()
val cancelled = new ListBuffer[T]()
while (iter.hasNext) {
val curr = iter.next()
curr.cancel()
iter.remove()
cancelled += curr
}
cancelled.toList
}

// traverse the list and purge elements that are already completed by others
//与tryCompleteWatched基本相同,只是不会尝试对未完成的DelayedOperation执行完成
def purgeCompleted(): Int = {
var purged = 0

val iter = operations.iterator()
while (iter.hasNext) {
val curr = iter.next()
if (curr.isCompleted) {
iter.remove()
purged += 1
}
}

if (operations.isEmpty)
removeKeyIfEmpty(key, this)

purged
}
}

def advanceClock(timeoutMs: Long) {
timeoutTimer.advanceClock(timeoutMs)

// Trigger a purge if the number of completed but still being watched operations is larger than
// the purge threshold. That number is computed by the difference btw the estimated total number of
// operations and the number of pending delayed operations.
if (estimatedTotalOperations.get - delayed > purgeInterval) {
// now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to
// clean up watchers. Note that, if more operations are completed during the clean up, we may end up with
// a little overestimated total number of operations.
estimatedTotalOperations.getAndSet(delayed)
debug("Begin purging watch lists")
val purged = watcherLists.foldLeft(0) {
case (sum, watcherList) => sum + watcherList.allWatchers.map(_.purgeCompleted()).sum
}
debug("Purged %d elements from watch lists.".format(purged))
}
}

/**
* A background reaper to expire delayed operations that have timed out
*/
private class ExpiredOperationReaper extends ShutdownableThread(
"ExpirationReaper-%d-%s".format(brokerId, purgatoryName),
false) {

override def doWork() {
advanceClock(200L)
}
}
}

DelayedProduce

这个类时为了协助ReplicaManager完成延迟操作的,ReplicaManager的主要功能是负责将生产者发送的消息写入Leader副本、管理Follower副本与Leader副本之间同步以及副本角色之间的转换,在消息写入Leader副本的时候需要DelayedProduce协助

在ReplicaManager.appendRecords()方法中当ProduceRequest的acks为-1时,会创建一个DelayedProduce对象。

当生产者调用KafkaProducer.send()方法后,KafkaApis.handleProduceRequest()方法会调用ReplicaManager.appendRecords()方法将消息追加到相应分区的Leader副本中。

ProduceRequest的acks为-1时,意味着生产者需要等待该分区的所有副本都与Leader副本同步完成之后才会进行下一条消息的发送。DelayedOperation的作用就是控制在各分区各Follower副本和Leader副本同步完成后再向生产者应答

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* A delayed produce operation that can be created by the replica manager and watched
* in the produce operation purgatory
*/
class DelayedProduce(delayMs: Long, // 延迟时间
produceMetadata: ProduceMetadata,//记录了本次ProduceRequest的ack信息以及对应分区对消息追加处理结果信息
replicaManager: ReplicaManager,
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
lockOpt: Option[Lock] = None)
extends DelayedOperation(delayMs, lockOpt) {

// first update the acks pending variable according to the error code
produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
if (status.responseStatus.error == Errors.NONE) {
// Timeout error state will be cleared when required acks are received
status.acksPending = true
status.responseStatus.error = Errors.REQUEST_TIMED_OUT
} else {
status.acksPending = false
}

trace(s"Initial partition status for $topicPartition is $status")
}

/**
* The delayed produce operation can be completed if every partition
* it produces to is satisfied by one of the following:
*
* Case A: This broker is no longer the leader: set an error in response
* Case B: This broker is the leader:
* B.1 - If there was a local error thrown while checking if at least requiredAcks
* replicas have caught up to this operation: set an error in response
* B.2 - Otherwise, set the response with no error.
*/
override def tryComplete(): Boolean = {
// check for each partition if it still has pending acks
produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
trace(s"Checking produce satisfaction for $topicPartition, current status $status")
// skip those partitions that have already been satisfied
if (status.acksPending) {
val (hasEnough, error) = replicaManager.getPartition(topicPartition) match {
case Some(partition) =>
if (partition eq ReplicaManager.OfflinePartition)
(false, Errors.KAFKA_STORAGE_ERROR)
else
partition.checkEnoughReplicasReachOffset(status.requiredOffset)
case None =>
// Case A
(false, Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
// Case B.1 || B.2
if (error != Errors.NONE || hasEnough) {
status.acksPending = false
status.responseStatus.error = error
}
}
}

// check if every partition has satisfied at least one of case A or B
if (!produceMetadata.produceStatus.values.exists(_.acksPending))
forceComplete()
else
false
}

override def onExpiration() {
produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
if (status.acksPending) {
debug(s"Expiring produce request for partition $topicPartition with status $status")
DelayedProduceMetrics.recordExpiration(topicPartition)
}
}
}

/**
* 回调responseCallback
* Upon completion, return the current response status along with the error code per partition
*/
override def onComplete() {
val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)
responseCallback(responseStatus)
}
}

object DelayedProduceMetrics extends KafkaMetricsGroup {

private val aggregateExpirationMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)

private val partitionExpirationMeterFactory = (key: TopicPartition) =>
newMeter("ExpiresPerSec",
"requests",
TimeUnit.SECONDS,
tags = Map("topic" -> key.topic, "partition" -> key.partition.toString))
private val partitionExpirationMeters = new Pool[TopicPartition, Meter](valueFactory = Some(partitionExpirationMeterFactory))

def recordExpiration(partition: TopicPartition) {
aggregateExpirationMeter.mark()
partitionExpirationMeters.getAndMaybePut(partition).mark()
}
}

DelayedFetch

在FetchRequest处理时进行延迟操作

在kafka中只有消费者或是Follower副本会发起FetchRequest请求。FecthRequest是由KafkaApis.handleFetchRequest()方法处理的,该方法中会调用ReplicaManager.fetchMessages()方法从相应分区的Leader副本拉取消息,在ReplicaManager.fetchMessages()方法中会创建DelayedFetch延迟操作

在拉取数据时之所以需要延迟操作,是为了让本次拉取消息获取到足够的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class DelayedFetch(delayMs: Long,
fetchMetadata: FetchMetadata,//包含了制定本次拉取操作获取数据的最小及最大字节数字段、是否只从Leader副本读取以及只读HW之前的数据的标志字段、一个用来标识是消费者还是Follower副本的replicaId字段、用来记录本次从每个分区拉取结果的fetchPartitionsStatus字段
replicaManager: ReplicaManager,
quota: ReplicaQuota,
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit)
extends DelayedOperation(delayMs) {

/**
* The operation can be completed if:
*
* Case A: This broker is no longer the leader for some partitions it tries to fetch
* Case B: This broker does not know of some partitions it tries to fetch
* Case C: The fetch offset locates not on the last segment of the log
* Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
* Case E: The partition is in an offline log directory on this broker
* Case F: This broker is the leader, but the requested epoch is now fenced
*
* Upon completion, should return whatever data is available for each valid partition
*/
override def tryComplete(): Boolean = {
var accumulatedSize = 0
fetchMetadata.fetchPartitionStatus.foreach {
case (topicPartition, fetchStatus) =>
val fetchOffset = fetchStatus.startOffsetMetadata
val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch
try {
if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
val partition = replicaManager.getPartitionOrException(topicPartition,
expectLeader = fetchMetadata.fetchOnlyLeader)
val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader)

val endOffset = fetchMetadata.fetchIsolation match {
case FetchLogEnd => offsetSnapshot.logEndOffset
case FetchHighWatermark => offsetSnapshot.highWatermark
case FetchTxnCommitted => offsetSnapshot.lastStableOffset
}

// Go directly to the check for Case D if the message offsets are the same. If the log segment
// has just rolled, then the high watermark offset will remain the same but be on the old segment,
// which would incorrectly be seen as an instance of Case C.
if (endOffset.messageOffset != fetchOffset.messageOffset) {
if (endOffset.onOlderSegment(fetchOffset)) {
// Case C, this can happen when the new fetch operation is on a truncated leader
debug(s"Satisfying fetch $fetchMetadata since it is fetching later segments of partition $topicPartition.")
return forceComplete()
} else if (fetchOffset.onOlderSegment(endOffset)) {
// Case C, this can happen when the fetch operation is falling behind the current segment
// or the partition has just rolled a new segment
debug(s"Satisfying fetch $fetchMetadata immediately since it is fetching older segments.")
// We will not force complete the fetch request if a replica should be throttled.
if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId))
return forceComplete()
} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
// we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition)
val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId))
accumulatedSize += bytesAvailable
}
}
}
} catch {
case _: KafkaStorageException => // Case E
debug(s"Partition $topicPartition is in an offline log directory, satisfy $fetchMetadata immediately")
return forceComplete()
case _: UnknownTopicOrPartitionException => // Case B
debug(s"Broker no longer knows of partition $topicPartition, satisfy $fetchMetadata immediately")
return forceComplete()
case _: FencedLeaderEpochException => // Case F
debug(s"Broker is the leader of partition $topicPartition, but the requested epoch " +
s"$fetchLeaderEpoch is fenced by the latest leader epoch, satisfy $fetchMetadata immediately")
return forceComplete()

case _: NotLeaderForPartitionException => // Case A
debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicPartition, fetchMetadata))
return forceComplete()
}
}

// Case D
if (accumulatedSize >= fetchMetadata.fetchMinBytes)
forceComplete()
else
false
}

override def onExpiration() {
if (fetchMetadata.isFromFollower)
DelayedFetchMetrics.followerExpiredRequestMeter.mark()
else
DelayedFetchMetrics.consumerExpiredRequestMeter.mark()
}

/**
* Upon completion, read whatever data is available and pass to the complete callback
*/
override def onComplete() {
val logReadResults = replicaManager.readFromLocalLog(
replicaId = fetchMetadata.replicaId,
fetchOnlyFromLeader = fetchMetadata.fetchOnlyLeader,
fetchIsolation = fetchMetadata.fetchIsolation,
fetchMaxBytes = fetchMetadata.fetchMaxBytes,
hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit,
readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo },
quota = quota)

val fetchPartitionData = logReadResults.map { case (tp, result) =>
tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
result.lastStableOffset, result.info.abortedTransactions)
}

responseCallback(fetchPartitionData)
}
}

满足以下条件之一则表示可完成延迟操作执行

  • 拉取消息的分区不存在

  • Leader副本发生了迁移,当前代理不再是Leader副本

  • 日志段发生了切割,请求拉取的消息偏移量已不在活跃段内,同时Leader副本没有处在限流处理的状态

  • 累积拉取的消息数已超过了最小字节数限制

DelayedJoin

DelayedJoin是协助组协调器在消费组准备平衡操作时进行相应的处理。当消费者组的状态转换为PreparingRebalance时,准备进行平衡操作,在组协调器的prepareRebalance()方法中会创建一个DelayedJoin对象,并交由DelayedOperationPurgatory负责监视管理

在消费者组进行平衡操作时之所以需要DelayedJoin处理,是为了让组协调器等待当前消费组下所有的消费者都请求加入消费组,即发起了JoinGourpRequest请求。每次组协调器处理完JoinGroupRequest时都会检测DelayedJoin是否满足完成执行的条件

只有当所有消费者均已申请加入消费组才算是满足了执行条件

DelayedHeartbeat

用于协助消费者与组协调器心跳检测相关的延迟操作

DelayedCreateTopics

在创建主题时,需要为主题的每个分区分配到Leader之后,才调用回调函数将创建主题结果返回给客户端