defrun() { // 注册SelectionKey.OP_ACCEPT事件 serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) startupComplete() try { var currentProcessor = 0 while (isRunning) { try { val ready = nioSelector.select(500) if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { try { val key = iter.next iter.remove() if (key.isAcceptable) { val processor = synchronized { currentProcessor = currentProcessor % processors.size processors(currentProcessor) } // 交给Processor处理 accept(key, processor) } else thrownewIllegalStateException("Unrecognized key state for acceptor thread.")
// round robin to the next processor thread, mod(numProcessors) will be done later currentProcessor = currentProcessor + 1 } catch { case e: Throwable => error("Error while accepting connection", e) } } } } catch { // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due // to a select operation on a specific channel or a bad request. We don't want // the broker to stop responding to requests from other clients in these scenarios. case e: ControlThrowable => throw e case e: Throwable => error("Error occurred", e) } } } finally { debug("Closing server socket and selector.") CoreUtils.swallow(serverChannel.close(), this, Level.ERROR) CoreUtils.swallow(nioSelector.close(), this, Level.ERROR) shutdownComplete() } }
overridedefrun() { // 使用CountDownLatch来唤醒阻塞线程 startupComplete() try { while (isRunning) { try { // setup any new connections that have been queued up configureNewConnections() // register any new responses for writing // 处理Response processNewResponses() // 将已接收完成的数据包、发送成功的请求添加至selector维护的相应队列中completedReceives poll() // 处理已接受完成的数据包队列completedReceives processCompletedReceives() // 处理已发送完成的队列completedSends processCompletedSends() // 处理断开连接的队列disconnected processDisconnected() } catch { // We catch all the throwables here to prevent the processor thread from exiting. We do this because // letting a processor exit might cause a bigger impact on the broker. This behavior might need to be // reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would // be either associated with a specific socket channel or a bad request. These exceptions are caught and // processed by the individual methods above which close the failing channel and continue processing other // channels. So this catch block should only ever see ControlThrowables. case e: Throwable => processException("Processor got uncaught exception.", e) } } } finally { debug("Closing selector - processor " + id) CoreUtils.swallow(closeAll(), this, Level.ERROR) shutdownComplete() } }
defrun() { while (!stopped) { // We use a single meter for aggregate idle percentage for the thread pool. // Since meter is calculated as total_recorded_value / time_window and // time_window is independent of the number of threads, each recorded idle // time should be discounted by # threads. val startSelectTime = time.nanoseconds // 从requestChannel中读取Request val req = requestChannel.receiveRequest(300) val endTime = time.nanoseconds val idleTime = endTime - startSelectTime aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)
req match { caseRequestChannel.ShutdownRequest => // shutdown请求 debug(s"Kafka request handler $id on broker $brokerId received shut down command") shutdownComplete.countDown() return
case request: RequestChannel.Request => // 正常请求 try { request.requestDequeueTimeNanos = endTime trace(s"Kafka request handler $id on broker $brokerId handling request $request") // 根据request.header.apiKey路由到不同的方法进行处理 apis.handle(request) } catch { case e: FatalExitError => shutdownComplete.countDown() Exit.exit(e.statusCode) case e: Throwable => error("Exception when handling request", e) } finally { request.releaseBuffer() }