跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 任务调度

  • 消息队列

    • 消息中间件(MQ)介绍
    • Kafka

      • Kafka基础使用
      • Kafka深入

        • Kafka架构与实战
        • Kafka高级特性解析
        • Kafka高级特性-消费者
        • Kafka高级特性-主题
        • Kafka高级特性-分区
        • Kafka高级特性-物理存储
        • Kafka高级特性-稳定性
        • Kafka高级特性-延时队列
        • Kafka高级特性-重试队列
        • Kafka集群与运维
        • Kafka源码剖析
        • 源码剖析-Broker启动流程
        • 源码剖析-Topic创建流程
        • 源码剖析-Producer生产者流程
        • 源码剖析-Consumer消费者流程
        • 源码剖析-消息存储机制
        • 源码剖析-SocketServer
        • 源码剖析-KafkaRequestHandlerPool
        • 源码剖析-LogManager
          • 4.9.1 清除日志片段
          • 4.9.2 日志片段刷盘
          • 4.9.3 将当前broker上各个分区的恢复点写到文本文件
          • 4.9.4 将当前broker上各个分区起始偏移量写到文本文件
          • 4.9.5 删除日志片段
          • 4.9.6 clearner
        • 源码剖析-ReplicaManager
        • 源码剖析-OffsetManager
        • 源码剖析-KafkaApis
        • 源码剖析-KafkaController
        • 源码剖析-KafkaHealthcheck
        • 源码剖析-DynamicConfigManager
        • 源码剖析-分区消费模式
        • 源码剖析-组消费模式
        • 源码剖析-同步发送模式
        • 源码剖析-异步发送模式
    • RabbitMQ

    • RocketMQ

  • Zookeeper

  • java组件
  • 消息队列
  • Kafka
  • Kafka深入
Revin
2023-07-31
目录

源码剖析-LogManager

# 4.9 Kafka源码剖析之LogManager

  1. kafka日志管理子系统的入口。日志管理器负责日志的创建、抽取、和清理。
  2. 所有的读写操作都代理给具体的Log实例。
  3. 日志管理器在一个或多个目录维护日志。新的日志创建到拥有最少log的目录中。
  4. 分区不移动。
  5. 通过一个后台线程通过定期截断多余的日志段来处理日志保留。

启动Kafka服务器的脚本:

Kafka_Page294_002

main方法中创建KafkaServerStartable对象:

Kafka_Page294_003

该类中包含KakfaServer对象,startup方法调用的是KafkaServer的startup方法:

Kafka_Page295_001

KafkaServer的startup方法中,启动了LogManager:

Kafka_Page295_002

Kafka_Page295_003

/**
* @param logDirs 主题分区目录的File对象
* @param initialOfflineDirs
* @param topicConfigs 主题配置
* @param defaultConfig 主题的默认配置
* @param cleanerConfig 日志清理器配置
* @param ioThreads IO线程数
* @param flushCheckMs
* @param flushRecoveryOffsetCheckpointMs
* @param flushStartOffsetCheckpointMs
* @param retentionCheckMs 检查日志保留的时间
* @param maxPidExpirationMs
* @param scheduler
* @param brokerState
* @param brokerTopicStats
* @param logDirFailureChannel
* @param time 时间
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

Kafka_Page296_001

LogManager的startup方法:

/**
* 启动后台线程们用于将日志刷盘以及日志的清理
*/
def startup() {
   if (scheduler != null) {
   	info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
   	// 用于清除日志片段的调度任务,没有压缩,周期性执行
   	scheduler.schedule("kafka-log-retention",
   	cleanupLogs _,
   	delay = InitialTaskDelayMs,
   	period = retentionCheckMs,
   	TimeUnit.MILLISECONDS)
   	info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
   	// 用于日志片段刷盘的调度任务,周期性执行
   	scheduler.schedule("kafka-log-flusher",
   	flushDirtyLogs _,
   	delay = InitialTaskDelayMs,
   	period = flushCheckMs,
   	TimeUnit.MILLISECONDS)
   	// 用于将当前broker上各个分区的恢复点写到文本文件的调度任务,周期性执行
   	scheduler.schedule("kafka-recovery-point-checkpoint",
   	checkpointLogRecoveryOffsets _,
   	delay = InitialTaskDelayMs,
   	period = flushRecoveryOffsetCheckpointMs,
   	TimeUnit.MILLISECONDS)
   	// 用于将当前broker上各个分区起始偏移量写到文本文件的调度任务,周期性执行
   	scheduler.schedule("kafka-log-start-offset-checkpoint",
   	checkpointLogStartOffsets _,
   	delay = InitialTaskDelayMs,
   	period = flushStartOffsetCheckpointMs,
   	TimeUnit.MILLISECONDS)
   	scheduler.schedule("kafka-delete-logs",
   	deleteLogs _,
   	delay = InitialTaskDelayMs,
   	period = defaultConfig.fileDeleteDelayMs,
   	TimeUnit.MILLISECONDS)
   }
   // 如果配置了日志的清理,则启动清理任务
   if (cleanerConfig.enableCleaner)
   cleaner.startup()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

# 4.9.1 清除日志片段

Kafka_Page297_001

cleanupLogs的具体实现:

Kafka_Page297_002

deleteOldSegments()的实现:

Kafka_Page298_001

Kafka_Page298_002

首先找到所有可以删除的日志片段

然后执行删除

Kafka_Page298_003

Kafka_Page299_001

该方法执行日志片段的异步删除。步骤如下:

  1. 将日志片段的信息从map集合移除,之后再也不读了
  2. 在日志片段的索引和log文件名称后追加.deleted,加标记而已
  3. 调度异步删除操作,执行.deleted文件的真正删除。

异步删除允许在读取文件的同时执行删除,而不需要进行同步,避免了在读取一个文件的同时物理删除引起的冲突。

该方法不需要将IOException转换为KafkaStorageException,因为该方法要么在所有日志加载之前调用,要么在使用中由调用者处理IOException。

Kafka_Page299_002

根据日志片段大小进行删除:

Kafka_Page300_001

shouldDelete是一个函数,作为deleteOldSegments删除日志片段的判断条件。

根据偏移量删除日志片段:

对于当前日志片段是否需要删除,要看它的下一个日志片段的baseOffset是否小于等于日志对外暴露给消费者的日志偏移量,如果小,消费者不用读取,当前日志片段就可以删除。

Kafka_Page300_002

# 4.9.2 日志片段刷盘

在LogManager的startup中,启动了刷盘的线程:

调用flushDirtyLogs方法进行日志刷盘处理。

Kafka_Page301_001

Kafka推荐让操作系统后台进行刷盘,使用副本保证数据高可用,这样效率更高。

因此此种方式不推荐。

Kafka_Page301_002

执行刷盘的方法:

/**
* 日志片段刷盘到offset-1的偏移量位置。
*
* @param offset 从上一个恢复点开始刷盘到该偏移量-1的位置。offset偏移量的不刷盘。
* offset是新的恢复点值。
*/
def flush(offset: Long) : Unit = {
   maybeHandleIOException(s"Error while flushing log for $topicPartition in dir ${dir.getParent} with offset $offset") {
   	// 如果偏移量小于等于该日志的恢复点,则不需要刷盘
   	if (offset <= this.recoveryPoint)
   	return
   	debug(s"Flushing log up to offset $offset, last flushed: $lastFlushTime, current time: ${time.milliseconds()}, " +
   	s"unflushed: $unflushedMessages")
   	// 遍历需要刷盘的日志片段
   	for (segment <- logSegments(this.recoveryPoint, offset))
   	// 执行刷盘
   	segment.flush()
   	lock synchronized {
   		// 检查MMAP是否关闭
   		checkIfMemoryMappedBufferClosed()
   		// 如果偏移量大于恢复点
   		if (offset > this.recoveryPoint) {
   			// 设置新的恢复点,表示到达这个偏移量位置的消息都已经刷盘了
   			this.recoveryPoint = offset
   			// 设置当前时间为刷盘的时间
   			lastflushedTime.set(time.milliseconds)
   		}
   	}
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 4.9.3 将当前broker上各个分区的恢复点写到文本文件

Kafka_Page302_001

方法实现:

Kafka_Page302_002

方法实现:

Kafka_Page303_001

# 4.9.4 将当前broker上各个分区起始偏移量写到文本文件

Kafka_Page303_002

方法实现:

Kafka_Page303_003

写文本文件:

Kafka_Page304_001

# 4.9.5 删除日志片段

Kafka_Page304_002

对标记为删除的日志执行删除的动作:

Kafka_Page305_001

Kafka_Page305_002

# 4.9.6 clearner

如果配置了日志清理,则启动清理任务:

Kafka_Page306_001

Kafka_Page306_002

cleaners是多个CleanerThread集合:

Kafka_Page306_003

最终执行清理的是,压缩:

Kafka_Page306_004

上次更新: 2025/04/03, 11:07:08
源码剖析-KafkaRequestHandlerPool
源码剖析-ReplicaManager

← 源码剖析-KafkaRequestHandlerPool 源码剖析-ReplicaManager→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式