0%

kafka之日志管理器

日志管理器

日志管理器是kafka用来管理所有日志的,负责管理日志的创建与删除、日志检索、日志加载和恢复、检查点及日志文件刷写磁盘以及日志清理等

日志存储

kafka中的消息是以日志的格式进行存储的,所以kafka中的数据就是日志,日志存储在broke下的命名形式为topic-partition(如topic为first的主题,有两个partition,则存在first-0,first-1文件夹下)的文件夹下

kafka日志结构

kafka的消息是以主题为基本单位进行组织的,各个主题之间相互独立。每个主题在逻辑上又由一个或多个分区构成,分区数可以在创建主题的时候指定,也可以在主题创建后修改。每个分区可以有一个或多个副本,从副本中选出一个副本作为Leader,Leader负责与客户端进行读写操作,其他副本作为Follower。生产者将消息发送到Leader副本的代理节点,而Follower副本从Leader副本同步数据。

在存储结构上分区的每个副本在逻辑上对应一个Log对象,每个Log又划分为多个LogSegment,每个LogSegment包括一个日志文件和两个索引文件,其中两个索引文件分别为偏移量索引文件和时间戳索引文件。

分段的原因是为了更好的维护数据,因为kafka中数据很多,如果不分段查找数据时会非常的麻烦,分段之后只需要知道数据在哪个段中,然后在对应的段中查找数据即可,而且分段也方便进行老数据删除

日志文件的名称是以偏移量进行命名的,这样是为了方便知道数据在哪个日志段中(采用了跳跃表的方式)

在Log对象中维护了一个ConcurrentSkipListMap(底层是跳跃表),保存该主题所有分区对应的所有LogSegment

1
2
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

LogSegment中封装有一个FileRecords对象(日志文件),一个OffsetIndex对象(偏移量索引文件)和一个TimeIndex对象(时间戳索引文件)

1
2
3
4
5
6
7
8
class LogSegment private[log] (val log: FileRecords,
val offsetIndex: OffsetIndex,
val timeIndex: TimeIndex,
val txnIndex: TransactionIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val time: Time) extends Logging

在存储结构上每个分区副本对应一个目录,每个分区副本由一个或多个日志段(LogSegment)组成。每个日志段在物理结构上对应一个以.index后缀的偏移量索引文件、一个以.timeindex后缀的时间戳索引文件和一个以.log结尾的日志文件

使用.index后缀的偏移量索引文件是为了方便定位数据,在.index文件中记录了许多偏移量的索引,每隔一个范围区间创建一个索引,这种方式称之为稀疏索引。这样可以避免索引文件过大,从而使得内存中可以保存更多的索引

使用.timeindex文件是为了kafka清理数据准备的,kafka默认是保留七天内的数据的,主要根据timeindex时间索引文件里最大的时间来判断的,如果最大时间与当前时间差值超过7天,那么对应的数据段就会被清理掉

  • log.segment.bytes 日志文件的大小(默认1073741824 1G),如果数据文件大小没有达到log.segment.bytes,但是达到了log.roll.ms或log.roll.hours同样会创建新的日志段

  • log.index.interval.bytes 索引文件跨度 默认4K

日志文件

可以使用命令来查看消息内容

1
kafka-run-class.sh kafka.tools.DumpLogSegments --files 日志文件路径 --print-data-log

payload为消息体的实际内容

偏移量索引文件

偏移量索引文件存储了若干个索引条目,索引条目用来将逻辑偏移量映射成消息在数据文件中的物理位置,每个索引条目由offset和position组成,offset表示与之对应的数据文件中某条消息的offset,position为与之对应的数据文件中某条消息的position。

并不是每条消息都有索引,而是采用了稀疏索引的方式,每隔一定字节的数据建立一条索引,可以通过index.interval.bytes来设置跨度,写入日志时需要判断bytesSinceLastIndexEntry是否大于索引跨度

1
2
3
4
5
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex.append(largestOffset, physicalPosition)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
bytesSinceLastIndexEntry = 0
}

通过索引文件,我们就能够根据指定的偏移量快速定位到消息物理位置。首先根据指定的偏移量,通过二分查找,查询出该偏移量对应消息所在的日志文件和索引文件,然后在索引文件中通过二分查找,查找值小于等于指定偏移量的最大偏移量,最后从查找出的最大偏移量处开始顺序扫描日志文件,直至在日志文件中查询到偏移量于指定偏移量相等的数据

时间戳索引文件

该索引文件与之对应的日志文件文件名相同,后缀为.timeindex,该索引文件包括一个8字节的时间戳字段和一个4字节的偏移量字段,时间戳记录的是该日志段目前为止最大的时间戳,偏移量则记录的是插入新的索引条目时,当前消息的偏移量

时间戳索引文件中的时间戳对应的类型可以是消息创建时间,也可以是消息写入数据文件的时间,也采用了稀疏存储的方式

日志清理

  • log.retention.check.interval.ms 每隔多久检查一次是否进行日志删除
  • log.retention.hours/log.retention.minutes/log.retention.ms 日志保留时长,默认为168小时,即7天
  • log.retention.bytes 基于日志大小来进行删除

欢迎关注我的其它发布渠道