0%

kafka之网络通信

网络通信

kafkaServer启动时,初始化启动了一个SocketServer服务,用于接受客户端的连接、处理客户端请求、发送响应等,同时创建一个KafkaRequestHandlerPool用于管理KafkaRequestHandler。

SocketServer是基于Java NIO实现的网络通信组件,线程模型为:一个Acceptor线程负责接收客户端所有的连接;{num.network.threads}个Processor线程,每个Processor有多个Selector,负责从每个连接中读取请求;{num.io.threads}个KafkaRequestHandler线程处理请求,并将产生的请求返回给Processor线程。

KafkaRequestHandler是由KafkaRequestHandlerPool管理,在Processor和KafkaRequestHandler之间通过RequestChannel来缓冲请求,每个KafkaRequestHandler从RequestChannel.requestQueue接受RequestChannel.Request,并把Request交由KafkaApis的handle()方法处理,经处理后把对应的Response存进RequestChannel.responseQueues队列

网络线程模型

Acceptor

broker会在它所监听的每一个端口运行一个Acceptor线程,Acceptor线程会采用轮询的方式将入栈请求公平的发送到网络线程池Processor中。

1
2
3
4
5
private[kafka] class Acceptor(val endPoint: EndPoint,
val sendBufferSize: Int,
val recvBufferSize: Int,
brokerId: Int,
connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup

其继承了AbstractServerThread,是一个线程类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def run() {
// 注册SelectionKey.OP_ACCEPT事件
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
var currentProcessor = 0
while (isRunning) {
try {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable) {
val processor = synchronized {
currentProcessor = currentProcessor % processors.size
processors(currentProcessor)
}
// 交给Processor处理
accept(key, processor)
} else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")

// round robin to the next processor thread, mod(numProcessors) will be done later
currentProcessor = currentProcessor + 1
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
// to a select operation on a specific channel or a bad request. We don't want
// the broker to stop responding to requests from other clients in these scenarios.
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
} finally {
debug("Closing server socket and selector.")
CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
shutdownComplete()
}
}

Processor

网络线程池Processor接收到客户和其他broker发送来的消息后,会把消息放入到请求队列中,然后由IO线程池进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private[kafka] class Processor(val id: Int,
time: Time,
maxRequestSize: Int,
requestChannel: RequestChannel,
connectionQuotas: ConnectionQuotas,
connectionsMaxIdleMs: Long,
failedAuthenticationDelayMs: Int,
listenerName: ListenerName,
securityProtocol: SecurityProtocol,
config: KafkaConfig,
metrics: Metrics,
credentialProvider: CredentialProvider,
memoryPool: MemoryPool,
logContext: LogContext) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup

Processor继承了AbstractServerThread,是一个线程类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
override def run() {
// 使用CountDownLatch来唤醒阻塞线程
startupComplete()
try {
while (isRunning) {
try {
// setup any new connections that have been queued up
configureNewConnections()
// register any new responses for writing
// 处理Response
processNewResponses()
// 将已接收完成的数据包、发送成功的请求添加至selector维护的相应队列中completedReceives
poll()
// 处理已接受完成的数据包队列completedReceives
processCompletedReceives()
// 处理已发送完成的队列completedSends
processCompletedSends()
// 处理断开连接的队列disconnected
processDisconnected()
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
// reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. These exceptions are caught and
// processed by the individual methods above which close the failing channel and continue processing other
// channels. So this catch block should only ever see ControlThrowables.
case e: Throwable => processException("Processor got uncaught exception.", e)
}
}
} finally {
debug("Closing selector - processor " + id)
CoreUtils.swallow(closeAll(), this, Level.ERROR)
shutdownComplete()
}
}

RequestChannel

为了给Processor线程与Handler线程之间通信提供数据缓冲,是通信过程中Request与Response缓存的通道,是Processor与Handler线程交换数据的地方。

1
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)

其内部是一个ArrayBlockingQueue队列用于缓存Processor添加的Request队列

KafkaRequestHandler

KafkaRequestHandlerPool实例化时会创建{num.io.threads}个KafkaRequestHandler线程来进行处理请求,也是实际处理请求的地方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def run() {
while (!stopped) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = time.nanoseconds
// 从requestChannel中读取Request
val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
val idleTime = endTime - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)

req match {
case RequestChannel.ShutdownRequest => // shutdown请求
debug(s"Kafka request handler $id on broker $brokerId received shut down command")
shutdownComplete.countDown()
return

case request: RequestChannel.Request => // 正常请求
try {
request.requestDequeueTimeNanos = endTime
trace(s"Kafka request handler $id on broker $brokerId handling request $request")
// 根据request.header.apiKey路由到不同的方法进行处理
apis.handle(request)
} catch {
case e: FatalExitError =>
shutdownComplete.countDown()
Exit.exit(e.statusCode)
case e: Throwable => error("Exception when handling request", e)
} finally {
request.releaseBuffer()
}

case null => // continue
}
}
shutdownComplete.countDown()
}

欢迎关注我的其它发布渠道