跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 任务调度

  • 消息队列

    • 消息中间件(MQ)介绍
    • Kafka

      • Kafka基础使用
      • Kafka深入

        • Kafka架构与实战
        • Kafka高级特性解析
        • Kafka高级特性-消费者
        • Kafka高级特性-主题
        • Kafka高级特性-分区
        • Kafka高级特性-物理存储
        • Kafka高级特性-稳定性
        • Kafka高级特性-延时队列
        • Kafka高级特性-重试队列
        • Kafka集群与运维
        • Kafka源码剖析
        • 源码剖析-Broker启动流程
        • 源码剖析-Topic创建流程
        • 源码剖析-Producer生产者流程
        • 源码剖析-Consumer消费者流程
        • 源码剖析-消息存储机制
        • 源码剖析-SocketServer
        • 源码剖析-KafkaRequestHandlerPool
        • 源码剖析-LogManager
        • 源码剖析-ReplicaManager
        • 源码剖析-OffsetManager
        • 源码剖析-KafkaApis
        • 源码剖析-KafkaController
        • 源码剖析-KafkaHealthcheck
        • 源码剖析-DynamicConfigManager
        • 源码剖析-分区消费模式
        • 源码剖析-组消费模式
        • 源码剖析-同步发送模式
        • 源码剖析-异步发送模式
    • RabbitMQ

    • RocketMQ

  • Zookeeper

  • java组件
  • 消息队列
  • Kafka
  • Kafka深入
Revin
2023-07-31

源码剖析-组消费模式

# 4.17 Kafka源码剖析之组消费模式

组消费模式指的是在消费者消费消息的时候,使用组协调器的再平衡机制自动分配要消费的分区(们)。

此时需要在消费者的配置中指定消费组ID,同时如果需要,设置偏移量重置的策略。

然后消费者订阅主题,就可以消费消息了。

Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "mycsmr" +System.currentTimeMillis());
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 设置消费组id
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "csmr_grp_01");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Collections.singleton("tp_demo_01"));
ConsumerRecords<String, String> records = consumer.poll(1000);
records.forEach(record -> {
   System.out.println(record.topic() + "\t"
   + record.partition() + "\t"
   + record.offset() + "\t"
   + record.key() + "\t"
   + record.value());
}
);
// 最后关闭消费者
consumer.close();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

consumer.subscribe 方法的实现:

Kafka_Page358_001

上面方法中第一个参数是订阅的主题集合,第二个参数是一个监听器,当发送再平衡的时候消费者想要执行的操作。

默认是NoOpConsumerRebalanceListener,即什么都不做:

NoOpConsumerRebalanceListener的实现:

Kafka_Page359_001

订阅方法的实现:

Kafka_Page359_002

subscriptions的订阅操作实现:

Kafka_Page360_001

就是对SubscriptionState的操作:

Kafka_Page360_002

用户的poll的操作调用pollOnce方法:

Kafka_Page360_003

pollOnce的实现:

Kafka_Page361_001

coordinator.poll负责周期性地向broker提交偏移量信息。

上面方法中updateFetchPositions方法表示:如果订阅的主题分区没有偏移量信息,则更新主题分区的偏移量信息,这样就知道消费的时候从哪里开始消费了:

Kafka_Page361_002

上图中的fetcher.resetOffsetsIfNeeded方法的实现:

Kafka_Page362_001

resetOffsets的具体实现:

Kafka_Page362_002

Kafka_Page363_001

上述的实现表示:首先根据重置策略重置主题分区的偏移量请求类型,然后发送请求,真正从主题的分区中获取偏移量。

其中上图中的

Kafka_Page363_002

需要向broker发请求,获取主题分区的偏移量,更新偏移量的值:

Kafka_Page363_003

发送请求的实现:

Kafka_Page363_004

Kafka_Page364_001

发送的请求是ListOffsetRequest请求:

Kafka_Page364_002

该请求在Broker中的处理:

Kafka_Page365_001

具体处理:

Kafka_Page365_002

该方法的实现:

Kafka_Page365_003

如果是最晚的,直接设置最晚的偏移量,如果不是最晚的,则需要根据主题分区以及时间戳查找:

Kafka_Page365_004

查找的逻辑:

Kafka_Page366_001

对于消费者,向指定的broker发送ListOffsetRequest请求,获取指定主题分区的偏移量和时间戳信息:

Kafka_Page366_002

调用handleListOffsetResponse处理获取的偏移量信息:

Kafka_Page367_001

Kafka_Page367_002

complete方法用于完成请求。当complete方法调用之后,successed方法返回true。

同时偏移量信息可以通过value方法获取:

Kafka_Page367_003

即:变量offsetsByTimes的值就是下图中future.value()的值。此时各个主题分区的偏移量已经设置好了:

Kafka_Page367_004

Kafka_Page367_005

pollOnce方法:

Kafka_Page368_001

在更新主题分区的偏移量之后,就可以发送请求消费消息了:

Kafka_Page368_002

对于组消费,还需要定期将偏移量提交到 __consumer_offsets 主题中:

Kafka_Page368_003

poll方法的实现:

Kafka_Page368_004

Kafka_Page369_001

如果是自动提交消费者偏移量到broker的 __consumer_offsets 主题,则maybeAutoCommitOffsetsAsync的实现:

Kafka_Page369_002

doAutoCommitOffsetsAsync的实现:

Kafka_Page369_003

commitOffsetsAsync的实现:

Kafka_Page370_001

在异步提交消费者偏移量的时候,如果组协调器已知,直接发送

如果未知,则异步提交等待,查找组协调器,等找到之后,异步提交消费者偏移量:

Kafka_Page370_002

上图中sendOffsetCommitRequest的实现:

  1. 首先查找消费组协调器
  2. 然后创建偏移量提交请求对象
  3. 发送请求

Kafka_Page371_001

Kafka_Page371_002

在KafkaServer处理的时候:

Kafka_Page371_003

handleOffsetCommitRequest的实现:

Kafka_Page371_004

Kafka_Page372_001

消费组协调器的处理:

Kafka_Page372_002

Kafka_Page372_003

doCommitOffsets的实现:

Kafka_Page372_004

Kafka_Page372_005

storeOffsets的实现:

其中:

Kafka_Page373_001

Kafka_Page373_002

Kafka_Page373_003

appendForGroup的实现如下,将当前消费组的偏移量消息追加到 __consumer_offsets 的指定分区中:

Kafka_Page373_004

上次更新: 2025/04/03, 11:07:08
源码剖析-分区消费模式
源码剖析-同步发送模式

← 源码剖析-分区消费模式 源码剖析-同步发送模式→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式