0%

kafka启动和关闭

kafka启动

kafka依赖于zookeeper,在启动kafka之前需要先启动zookeeper,kafka中包含有zookeeper启动脚本,为防止kafka与zookeeper版本问题,使用kafka中自带的zookeeper。

我使用的是windows版本的 所以跳转到了bin/windows目录下

zookeeper启动

zookeeper.properties配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 存储快照文件的目录,默认情况下事务日志也会存储在该目录 默认值为/tmp/zookeeper
dataDir=E:\\zookeeper\\data
# 事务日志输出目录 默认值为/tmp/zookeeper
dataLogDir=E:\\zookeeper\\logs
# zookeeper的一个时间单元 默认2000ms
# tickTime=
# LF初始通信时限 连接时间 集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数(tickTime的数量),
# 超过则认为该Follower连接失败 默认值为10
# initLimit=
# LF同步通信时限 Leader与Follower之间通信请求和应答的时间长度。
# 如果 follower 在设置的时间内不能与leader 进行通信,那么此 follower 将被丢弃
# syncLimit=
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

启动zookeeper

1
>zookeeper-server-start ../../config/zookeeper.properties

有一点需要注意的是,如果kafka或zookeeper的目录文件夹名包含空格,或导致在执行命令的时候出现问题,所以最好是放到没有空格的文件夹目录下

1
2
3
系统找不到指定的路径。
命令语法不正确。
错误: 找不到或无法加载主类 Files\kafka_2.11-2.2.0.logs

启动kafka

1
2
3
4
>kafka-server-start ../../config/server.properties

#为了不启动一个就占用一个命令行窗口,可以使用-daemon来后台运行
kafka-server-start -daemon ../../config/server.properties

启动脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
if [ $# -lt 1 ];
then
# 必须指定server.properties 可以使用-daemon来使程序以守护进程的方式后台运行 还可以使用--override property=value覆盖默认配置
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
fi
base_dir=$(dirname $0)

# 日志文件
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi
# JVM内存
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}

COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

该脚本也就是执行kafka.Kafka类的main方法

kafka启动源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def main(args: Array[String]): Unit = {
try {
val serverProps = getPropsFromArgs(args)
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

try {
if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
new LoggingSignalHandler().register()
} catch {
case e: ReflectiveOperationException =>
warn("Failed to register optional signal handler that logs a message when the process is terminated " +
s"by a signal. Reason for registration failure is: $e", e)
}

// attach shutdown handler to catch terminating signals as well as normal termination
Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
override def run(): Unit = kafkaServerStartable.shutdown()
})
// 启动
kafkaServerStartable.startup()
kafkaServerStartable.awaitShutdown()
}
catch {
case e: Throwable =>
fatal("Exiting Kafka due to fatal exception", e)
Exit.exit(1)
}
Exit.exit(0)
}

在Kafka的main中调用kafkaServerStartable.startup()方法进行启动

1
2
3
4
5
6
7
8
9
def startup() {
try server.startup()
catch {
case _: Throwable =>
// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
fatal("Exiting Kafka.")
Exit.exit(1)
}
}

在KafkaServerStartable的startup方法调用server.startup(),这个server是KafkaServer的实例,所以真正执行的是KafkaServer的startup()方法,完成一系列组件的初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
代理状态图
* +-----------+
* |Not Running|
* +-----+-----+
* |
* v
* +-----+-----+
* |Starting +--+
* +-----+-----+ | +----+------------+
* | +>+RecoveringFrom |
* v |UncleanShutdown |
* +-------+-------+ +-------+---------+
* |RunningAsBroker| |
* +-------+-------+<-----------+
* |
* v
* +-----+------------+
* |PendingControlled |
* |Shutdown |
* +-----+------------+
* |
* v
* +-----+----------+
* |BrokerShutting |
* |Down |
* +-----+----------+
* |
* v
* +-----+-----+
* |Not Running|
* +-----------+
*
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def startup() {
try {
info("starting")

if (isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")

if (startupComplete.get)
return

val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
// 设置代理的状态变迁
/**
* 代理状态
* NotRunning 状态值0 代理未启动
* Starting 状态值1 代理正在启动
* RecoveringFromUncleanShutdown 状态值2 代理非正常关闭,在${log.dir}配置的每个路径下存在.kafka_cleanshutdown文件
* RunningAsBroker 状态值3 代理已正常启动
* PendingControlledShutdown 状态值6 kafkaController被关闭
* BrokerShuttingDown 状态值7 代理正在准备关闭
*/
brokerState.newState(Starting)

/* setup zookeeper */
// 创建zookeeper的连接,并创建下列存储元数据的目录节点
//ConsumerPathZNode.path, /consumers // old consumer path 旧版消费者启动后会在zookeeper的该节点路径下创建一个消费者组的节点
//BrokerIdsZNode.path, /brokers/ids kafka每启动一个KafkaServer会在该目录下创建一个broker.id的子节点
//TopicsZNode.path, /brokers/topics 存储动态修改主题级别的配置信息
//ConfigEntityChangeNotificationZNode.path, /config/changes 动态修改配置时存储相应的信息
//DeleteTopicsZNode.path, /admin/delete_topics 对主题进行删除操作时保存待删除主题的信息
//BrokerSequenceIdZNode.path, /brokers/seqid 辅助生成代理id,当用户没有配置broker.id时,zookeeper会自动生成一个全局唯一id,每次自动生成时会从该文件获取当前代理的id最大值,然后加1
//IsrChangeNotificationZNode.path, /isr_change_notification 保存kafka副本isr列表变化通知的相应路径
//ProducerIdBlockZNode.path, /latest_producer_id_block
//LogDirEventNotificationZNode.path /log_dir_event_notification
initZkClient(time)

/* Get or create cluster_id */
// 生成一个uuid作为cluster.id,存储在/cluster/id中
_clusterId = getOrGenerateClusterId(zkClient)
info(s"Cluster ID = $clusterId")

/* generate brokerId */
val (brokerId, initialOfflineDirs) = getBrokerIdAndOfflineDirs
config.brokerId = brokerId
logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
this.logIdent = logContext.logPrefix

// initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
// applied after DynamicConfigManager starts.
config.dynamicConfig.initialize(zkClient)

/* start scheduler */
// 启动任务调度器 基于ScheduledThreadPoolExecutor实现的,在kafkaServer启动时回构造一个线程总数为${background.threads}的线程池,该配置项默认为10,负责副本管理及日志管理调度
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
kafkaScheduler.startup()

/* create and configure metrics */
// 注册Kafka的metrics信息,在KafkaServer启动时将一些动态的JMX Beans进行注册,以便于对Kafka进行跟踪监控
val reporters = new util.ArrayList[MetricsReporter]
reporters.add(new JmxReporter(jmxPrefix))
val metricConfig = KafkaServer.metricConfig(config)
metrics = new Metrics(metricConfig, reporters, time, true)

/* register broker metrics */
_brokerTopicStats = new BrokerTopicStats

// 用于限流的QuotaManagers
quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala)

logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)

/* start log manager */
// 实例化并启动日志管理器 LogManager
logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
logManager.startup()

metadataCache = new MetadataCache(config.brokerId)
// Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
// This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)

// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
// 实例化并启动SocketServer服务
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup(startupProcessors = false)

/* start replica manager */
// 实例化并启动副本管理器ReplicaManager 负责管理分区副本
replicaManager = createReplicaManager(isShuttingDown)
replicaManager.startup()

val brokerInfo = createBrokerInfo
val brokerEpoch = zkClient.registerBroker(brokerInfo)

// Now that the broker id is successfully registered, checkpoint it
checkpointBrokerId(config.brokerId)

/* start token manager */
tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
tokenManager.startup()

/* start kafka controller */
// 实例化并启动控制器KafkaController 每个代理对应一个KafkaController实例,KafkaController启动后会从KafkaController中选出一个节点作为Leader控制器
kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, threadNamePrefix)
kafkaController.startup()

adminManager = new AdminManager(config, metrics, metadataCache, zkClient)

/* start group coordinator */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
// 实例化并启动组协调器GroupCoordinator kafka会从代理中选出一个组协调器,对消费者进行管理,当消费者或者订阅的分区发生变化时进行平衡操作
groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM)
groupCoordinator.startup()

/* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)
transactionCoordinator.startup()

/* Get the authorizer and initialize it if one is specified.*/
// 权限认证组件
authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
authZ.configure(config.originals())
authZ
}

val fetchManager = new FetchManager(Time.SYSTEM,
new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))

/* start processing requests */
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager)

// Handler线程池 创建${num.io.threads}个KafkaRequestHandler
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)

socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager)

controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
}

Mx4jLoader.maybeLoad()

/* Add all reconfigurables for config change notification before starting config handlers */
config.dynamicConfig.addReconfigurables(this)

/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController),
ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))

// Create the config manager. start listening to notifications
// 实例化动态配置管理器 注册监听zookeeper的/config路径下各子节点的信息
dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
dynamicConfigManager.startup()

socketServer.startDataPlaneProcessors()
socketServer.startControlPlaneProcessor()
// 当前代理的状态设置为RunningAsBroker,表示当前KafkaServer已正常启动完成
brokerState.newState(RunningAsBroker)
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, metrics)
info("started")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}

日志配置

日志的配置文件是config/log4j.properties

日志规划:

  • controller.log: KafkaController运行时日志

  • kafka-authorizer.log:kafka权限认证相关操作

  • kafka-request.log:kafka相应网络请求日志

  • kafka-Server-gc.log:kafka运行过程,进行gc操作时的日志

  • log-cleaner.log:kafka日志清理操作相关统计信息

  • server.log:kafkaServer运行日志

  • state-change.log:kafka分区角色切换等状态转换日志

查看信息

查看当前zookeeper信息 进入zookeeper客户端

1
>zookeeper-shell localhost:2181

查看brokers信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
ls /brokers/ids
// 由于当前我只开了一个kafka,所以有一个
[0]

get /controller
{"version":1,"brokerid":0,"timestamp":"1589255159885"}
cZxid = 0x2e
ctime = Tue May 12 11:45:59 CST 2020
mZxid = 0x2e
mtime = Tue May 12 11:45:59 CST 2020
pZxid = 0x2e
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1001506f5890002
dataLength = 54
numChildren = 0


get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://localhost:9092"],"jmx_port":-1,"host":"localhost","timestamp":"1589255159581","port":9092,"version":4}
cZxid = 0x2d
ctime = Tue May 12 11:45:59 CST 2020
mZxid = 0x2d
mtime = Tue May 12 11:45:59 CST 2020
pZxid = 0x2d
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x1001506f5890002
dataLength = 188
numChildren = 0

kafka关闭

1
>kafka-server-stop

关闭脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
SIGNAL=${SIGNAL:-TERM}

# 查找进程名为kafka的进程PID
if [[ $(uname -s) == "OS/390" ]]; then
if [ -z $JOBNAME ]; then
JOBNAME="KAFKSTRT"
fi
PIDS=$(ps -A -o pid,jobname,comm | grep -i $JOBNAME | grep java | grep -v grep | awk '{print $1}')
else
PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')
fi

if [ -z "$PIDS" ]; then
echo "No kafka server to stop"
exit 1
else
kill -s $SIGNAL $PIDS
fi

欢迎关注我的其它发布渠道