kafka是一个高吞吐量、分布式的发布-订阅系统,核心模块使用Scala语言开发,开源的、轻量级的、分布式的、可分区和有复制备份的、基于zookeeper协调管理的分布式流平台的功能强大的消息系统。
基本结构
生产者负责生产消息,将消息写入kafka集群。
消费者从kafka集群中拉取消息。
offset在0.9之前存储在zookeeper,0.9之后存储在本地topic中
基本概念
代理Broker kafka集群就是由一个或多个kafka实例构成的,我们将每个kafka实例称为代理(Broker),也称之为kafka服务器(kafkaServer),kafka集群一般包括一台或多台服务器,可以在一台服务器上配置一个或多个代理。每一个代理都有唯一标识id,在一个kafka集群中,每增加一个代理就需要为这个代理配置一个与该集群中其他代理不同的id,保证在整个kafka集群中唯一,这个id就是broker.id
主题Topic kafka将一组消息抽象归纳为一个主题(Topic),生产者将消息发送到特定主题,消费者订阅主题或主题的某些分区来进行消费。 每个topic有多个partition,每个partition由多个segment组成
消息 消息(Record)是kafka通信的基本单位,由一个固定长度的消息头和一个可变长度的消息体构成
分区Partition 为了扩展性,一个topic可以分布到多个broker上,一个topic分为一个或多个分区(Partition)。每个分区由一系列有序、不可变的消息组成,是一个有序队列,任何发布到该分区的消息都会直接追加到log文件的尾部。每个分区在物理上对应为一个文件夹,分区命名规则为主题名称后接”-“连接符,之后再接分区编号,分区编号从0开始,编号最大值为分区总数减一,维护了一个offsets的标识,用于标识分区中的每条记录。
分区数越多吞吐量越高,分区是kafka保证消息被顺序消费以及对消息进行负载均衡的基础。kafka只能保证一个分区之内消息的有序性,并不能保证跨分区消息的有序性。
假如只有一个Broker,创建一个topic名为test1,该topic有两个分区,那么此时会有
1
2test1-0
test1-1两个文件夹
副本Replica 为了保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,每个分区有一个或多个副本(Replica),一个leader和若干个follower,分区的副本分布在集群的不同代理上,以提高可用性。从存储角度上分析,分区的每个副本在逻辑上抽象为一个日志对象,一一对应。
Leader副本和Follower副本 由于kafka副本的存在,需要保证一个分区的多个副本之间的数据一致性,kafka会选择该分区的一个副本作为Leader副本,其他副本为Follower副本,只有Leader副本才负责处理客户端读写请求,Follower只负责从Leader副本同步数据,当leader发生故障时,某个follower会成为新的leader
偏移量Offset 任何发布到分区的消息会被直接追加到日志文件(分区目录下一”.log”后缀的文件)的尾部,而每条消息在日志文件中的位置都会对应一个按序递增的偏移量。偏移量是一个分区下严格有序的逻辑值,消费者可以通过控制消息偏移量来对消息进行消费。
日志段 一个日志又被划分为多个日志段(LogSegment),日志段是kafka日志对象分片的最小单位。为了防止log日志过大导致数据检索效率低下。一个日志段对应磁盘上一个具体日志文件和两个索引文件。两个索引文件分别是”.index”和”.timeindex”,分别表示消息偏移量索引文件和消息时间戳索引文件。一个partition在物理上是由一个或多个segment构成的
1
2
3
4
5
6
7
8
9| --test1-0
| --00000000000000000000.index
| --00000000000000000000.log
| --00000000000000368769.index
| --00000000000000368769.log
| --00000000000000737337.index
| --00000000000000737337.log
| --00000000000001105814.index
| --00000000000001105814.logPartition全局的第一个Segment从0开始,后续每个Segment文件名为上一个Segment最后一条消息的offset值
当某个segment上的消息条数达到配置值或者消息发布事件超过阈值后,segment上的消息会被flush到磁盘,只有flush到磁盘的消息才会被消费者消费,segment达到一定大小后就会创建新的segment
生产者Producer 生产者(Producer)负责将消息发送给代理
消费者Consumer 和消费者组Consumer Group 消费者(Comsumer)以拉取(pull)的方式拉取数据
采用pull而不是push是为了适应消费速率不同的消费者,不过如果kafka没有数据,消费者会一直拉取返回空数据,设置timeout使得如果没有数据可供消费,消费者会等待一段时间返回
1
consumer.poll(Duration.ofSeconds(10));
在kafka中每一个消费者都属于一个消费者组(CommsumerGroup),可以为每个消费者指定一个消费者组,通过group.id配置,每个消费者也有一个全局唯一的id,通过client.id配置,如果没有指定client.id,会默认生成,格式为${groupId}-${hostName}-${timestamp}-${UUID前8位},同一个主题的一条消息只能被同一个消费者组下的某一个消费者消费,但不同的消费者组的消费者可同时消费该消息。消费者组是kafka用来实现对一个主题消息进行广播和单播的手段,实现消息广播只需执行各消费者均属于不同的消费者组,消息单播只需让各消费者属于同一个消费者组。
可以通过增加消费者组的消费者来进行水平扩展提升消费能力,每个消费者组中的消费者接收一个分区的数据,所以如果消费者组中的消费者数量比分区多的话,那么多出来的消费者是空闲的
这里就存在一个问题,当一个新的消费者加入消费者组,该消费者会消费一个或多个分区,而这些分区之前是由其他消费者负责的,且如果有消费者离开消费者组,也会将其负责的分区分配给其他的消费者,这种现象称为重平衡。在重平衡期间,所有的消费者都不能消费消息,会使得整个消费者组短暂不可用,这个机制是怎么做到的?
在kafka中有一个组协调器的组件,消费者定期发送心跳到组协调器来保持在消费者组内存活,如果消费者超过一定时间没有发送心跳,那么该消费者的session会话就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡
AR Assigned Replicas已分配副本,每个分区的所有副本集合被称为已分配副本(AR)
ISR In-Sync Replicas保持同步的副本,ISR是AR的子集,kafka在zookeeper中动态维护了一个ISR(In-sync-Replica),即保持同步的副本列表,该列表中保存的是与Leader副本保持消息同步的所有副本对应的代理节点id,如果一个Follower副本宕机或落后太多,该节点会从ISR列表中移除。每个Partition都有一个ISR,这个ISR里一定会有Leader自己,因为Leader数据肯定是最新的,然后与Leader保持同步的Follower也会在ISR列表中
根据参数
replica.lag.time.max.ms
,默认10s,含义是当Follower超过10s没发送Fetch请求同步Leader时,就会认为不同步而被踢出ISRzookeeper kafka利用zookeeper保存相关元数据信息,包括代理节点信息、kafka集群信息、消费偏移量信息、主题信息、分区状态信息、分区副本分配方案信息、动态配置信息等。kafka在启动或运行过程中会在zookeeper上创建相应节点来保存元数据信息,kafka通过监听机制在这些节点注册相应监听器来监听节点元数据变化,从而由zookeeper负责管理和维护kafka集群
kafka特性
消息持久化 依赖于文件系统来存储和缓存消息,使用顺序写的方式
机械磁盘的顺序读的速度是随机读的400倍,顺序写是随机写的100倍。可以达到几十M/s,速度还是很快的
高吞吐量 通过sendfile和pagecache来实现zero copy机制,顺序读写的特性使得使用普通磁盘就可以做到很大的吞吐,每秒可达百万级别的消息
扩展性
多客户端支持
kafka streams 流处理
安全机制
数据备份 每个topic可以指定副本数
高可用 通过replica和ISR机制保证了数据的高可用
轻量级
消息压缩 支持Gzip、Snappy、LZ4三种压缩方式,把多条消息放在一起组成MessageSet,再把MessageSet放到一条消息中,从而提高压缩比率来提高吞吐量。
应用场景
消息队列用来解决应用解耦、异步通信、流量控制等问题
- 消息系统
- 应用监控
- 网站用户行为追踪
- 流处理
- 持久性日志
若是想要体验最新版(SNAPSHOT版本),可以查看master分支目前的版本号,在项目中引用该版本,需要在pom.xml中增加对应的快照仓库
1 | <repository> |