源码剖析-Consumer消费者流程
# 4.5 Kafka源码剖析之Consumer消费者流程
# 4.5.1 Consumer示例
KafkaConsumer
消费者的根本目的是从Kafka服务端拉取消息,并交给业务逻辑进行处理。
开发人员不必关心与Kafka服务端之间网络连接的管理、心跳检测、请求超时重试等底层操作
也不必关心订阅Topic的分区数量、分区Leader副本的网络拓扑以及消费组的Rebalance等细节,另外还提供了自动提交offset的功能。
案例:
public static void main(String[] args) throws InterruptedException {
// 是否自动ᨀ交
Boolean autoCommit = false;
// 是否异步ᨀ交
Boolean isSync = true;
Properties props = new Properties();
// kafka地址,列表格式为host1:port1,host2:port2,…,无需添加所有的集群地址,kafka会根据ᨀ供的地址发现其他的地址(建议多ᨀ供几个,以防ᨀ供的服务器关闭)
props.put("bootstrap.servers", "localhost:9092");
// 消费组
props.put("group.id", "test");
// 开启自动ᨀ交offset
props.put("enable.auto.commit", autoCommit.toString());
// 1s自动ᨀ交
props.put("auto.commit.interval.ms", "1000");
// 消费者和群组协调器的最大心跳时间,如果超过该时间则认为该消费者已经死亡或者故障,需要踢出消费者组
props.put("session.timeout.ms", "60000");
// 一次poll间隔最大时间
props.put("max.poll.interval.ms", "1000");
// 当消费者读取偏移量无效的情况下,需要重置消费起始位置,默认为latest(从消费者启动后生成的记录),另外一个选项值是 earliest,将从有效的最小位移位置开始消费
props.put("auto.offset.reset", "latest");
// consumer端一次拉取数据的最大字节数
props.put("fetch.max.bytes", "1024000");
// key序列化方式
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// value序列化方式
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "lagou_edu";
// 订阅topic列表
consumer.subscribe(Arrays.asList(topic));
while (true) {
// 消息拉取
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());
}
if (!autoCommit) {
if (isSync) {
// 处理完成单次消息以后,ᨀ交当前的offset,如果失败会一直重试直至成功
consumer.commitSync();
} else {
// 异步ᨀ交
consumer.commitAsync((offsets, exception) -> {
exception.printStackTrace();
System.out.println(offsets.size());
}
);
}
}
TimeUnit.SECONDS.sleep(3);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
Kafka服务端并不会记录消费者的消费位置,而是由消费者自己决定如何保存如何记录其消费的offset。在Kafka服务端中添加了一个名为“__consumer_offsets"的内部topic来保存消费者提交的offset,当出现消费者上、下线时会触发Consumer Group进行Rebalance操作,对分区进行重新分配,待Rebalance操作完成后。消费者就可以读取该topic中记录的offset,并从此offset位置继续消费。当然,使用该topic记录消费者的offset只是默认选项,开发人员可以根据业务需求将offset记录在别的存储中。
在消费者消费消息的过程中,提交offset的时机非常重要,因为它决定了消费者故障重启后的消费位置。在上面的示例中,我们通过将 enable.auto.commit 选项设置为true可以起到自动提交offset的功能, auto.commit.interval.ms 选项则设置了自动提交的时间间隔。每次在调用KafkaConsumer.poll() 方法时都会检测是否需要自动提交,并提交上次 poll() 方法返回的最后一个消息的offset。为了避免消息丢失,建议poll()方法之前要处理完上次poll()方法拉取的全部消息。
KafkaConsumer中还提供了两个手动提交offset的方法,分别是 commitSync() 和 commitAsync() ,它们都可以指定提交的offset值,区别在于前者是同步提交,后者是异步提交。
# 4.5.2 KafkaConsumer实例化
了解了 KafkaConsumer 的基本使用,开始深入了解 KafkaConsumer 原理和实现,先看一下构造方法核心逻辑
private KafkaConsumer(ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
try {
// 获取client.id,如果为空则默认生成一个,默认:consumer-1
String clientId =config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (clientId.isEmpty())
clientId = "consumer-" +CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
// 获取消费组名
String groupId =config.getString(ConsumerConfig.GROUP_ID_CONFIG);
LogContext logContext = new LogContext("[Consumer clientId=" +clientId + ", groupId=" + groupId + "] ");
this.log = logContext.logger(getClass());
log.debug("Initializing the Kafka consumer");
this.requestTimeoutMs =config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
int sessionTimeOutMs =config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
int fetchMaxWaitMs =config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
if (this.requestTimeoutMs <= sessionTimeOutMs ||this.requestTimeoutMs <= fetchMaxWaitMs)
throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
this.time = Time.SYSTEM;
// 与生产者逻辑相同
Map<String, String> metricsTags =Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricsTags);
List<MetricsReporter> reporters =config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG
,
MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs =config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
// 消费者拦截器
// load interceptors and make sure they get clientId
Map<String, Object> userProvidedConfigs = config.originals();
userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG,clientId);
List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs,false)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
// key反序列化
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
this.keyDeserializer.configure(config.originals(), true);
} else {
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
this.keyDeserializer = keyDeserializer;
}
// value反序列化
if (valueDeserializer == null) {
this.valueDeserializer =config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
,
Deserializer.class);
this.valueDeserializer.configure(config.originals(),false);
} else {
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
this.valueDeserializer = valueDeserializer;
}
ClusterResourceListeners clusterResourceListeners =configureClusterResourceListeners(keyDeserializer, valueDeserializer,reporters, interceptorList);
this.metadata = new Metadata(retryBackoffMs,config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
true, false, clusterResourceListeners);
List<InetSocketAddress> addresses =ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
String metricGrpPrefix = "consumer";
ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
ChannelBuilder channelBuilder =ClientUtils.createChannelBuilder(config);
// 事务隔离级别
IsolationLevel isolationLevel = IsolationLevel.valueOf(
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics,metricsRegistry.fetcherMetrics);
// 网络组件
NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),metrics, time, metricGrpPrefix, channelBuilder, logContext),
this.metadata,
clientId,
100, // a fixed large enough value will suffice for max in-flight requests
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
time,
true,
new ApiVersions(),
throttleTimeSensor,
logContext);
// 客户端
this.client = new ConsumerNetworkClient(
logContext,
netClient,
metadata,
time,
retryBackoffMs,
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
// offset重置策略,默认是自动ᨀ交
OffsetResetStrategy offsetResetStrategy =OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
this.subscriptions = new SubscriptionState(offsetResetStrategy);
this.assignors = config.getConfiguredInstances(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
PartitionAssignor.class);
// offset协调者
this.coordinator = new ConsumerCoordinator(logContext,
this.client,
groupId,
config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
assignors,
this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
this.time,
retryBackoffMs,
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
// 拉取器
this.fetcher = new Fetcher<>(
logContext,
this.client,
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
this.keyDeserializer,
this.valueDeserializer,
this.metadata,
this.subscriptions,
metrics,
metricsRegistry.fetcherMetrics,
this.time,
this.retryBackoffMs,
isolationLevel);
// 打印用户设置,但是没有使用的配置项
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Kafka consumer initialized");
}
catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
close(0, true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka consumer",t);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
初始化参数配置
- client.id、group.id、消费者拦截器、key/value序列化、事务隔离级别
初始化网络客户端 NetworkClient
初始化消费者网络客户端 ConsumerNetworkClient
初始化offset提交策略,默认自动提交
初始化消费者协调器 ConsumerCoordinator
初始化拉取器 Fetcher
# 4.5.3 订阅Topic
下面我们先来看一下subscribe方法都有哪些逻辑:
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
// 轻量级锁
acquireAndEnsureOpen();
try {
if (topics == null) {
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
} else if (topics.isEmpty()) {
// topics为空,则开始取消订阅的逻辑
this.unsubscribe();
} else {
// topic合法性判断,包含null或者空字符串直接抛异常
for (String topic : topics) {
if (topic == null || topic.trim().isEmpty())
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}
// 如果没有消费协调者直接抛异常
throwIfNoAssignorsConfigured();
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
// 开始订阅
this.subscriptions.subscribe(new HashSet<>(topics), listener);
// 更新元数据,如果metadata当前不包括所有的topics则标记强制更新
metadata.setTopics(subscriptions.groupSubscription());
}
}
finally {
release();
}
}
public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
if (listener == null)
throw new IllegalArgumentException("RebalanceListener cannot be null");
// 按照指定的Topic名字进行订阅,自动分配分区
setSubscriptionType(SubscriptionType.AUTO_TOPICS);
// 监听
this.listener = listener;
// 修改订阅信息
changeSubscription(topics);
}
private void changeSubscription(Set<String> topicsToSubscribe) {
if (!this.subscription.equals(topicsToSubscribe)) {
// 如果使用AUTO_TOPICS或AUTO_PARTITION模式,则使用此集合记录所有订阅的Topic
this.subscription = topicsToSubscribe;
// Consumer Group中会选一个Leader,Leader会使用这个集合记录Consumer Group中所有消费者订阅的Topic,而其他的Follower的这个集合只会保存自身订阅的Topic
this.groupSubscription.addAll(topicsToSubscribe);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
- KafkaConsumer不是线程安全类,开启轻量级锁,topics为空抛异常,topics是空集合开始取消订阅,再次判断topics集合中是否有非法数据,判断消费者协调者是否为空。开始订阅对应topic。listener默认为 NoOpConsumerRebalanceListener ,一个空操作
轻量级锁:分别记录了当前使用KafkaConsumer的线程id和重入次数,KafkaConsumer的acquire()和release()方法实现了一个”轻量级锁“,它并非真正的锁,仅是检测是否有多线程并发操作KafkaConsumer而已
每一个KafkaConsumer实例内部都拥有一个SubscriptionState对象,subscribe内部调用了subscribe方法,subscribe方法订阅信息记录到 SubscriptionState ,多次订阅会覆盖旧数据。
更新metadata,判断如果metadata中不包含当前groupSubscription,开始标记更新(后面会有更新的逻辑),并且消费者侧的topic不会过期
# 4.5.4 消息消费过程
下面KafkaConsumer的核心方法poll是如何拉取消息的,先来看一下下面的代码:
# 4.5.4.1 poll
public ConsumerRecords<K, V> poll(long timeout) {
// 使用轻量级锁检测kafkaConsumer是否被其他线程使用
acquireAndEnsureOpen();
try {
// 超时时间小于0抛异常
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
// 订阅类型为NONE抛异常,表示当前消费者没有订阅任何topic或者没有分配分区
if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
// poll for new data until the timeout expires
long start = time.milliseconds();
long remaining = timeout;
do {
// 核心方法,拉取消息
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
// 如果拉取到了消息,发送一次消息拉取的请求,不会阻塞不会被中断
// 在返回数据之前,发送下次的 fetch 请求,避免用户在下次获取数据时线程 block
if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
client.pollNoWakeup();
// 经过拦截器处理后返回
if (this.interceptors == null)
return new ConsumerRecords<>(records); else
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
long elapsed = time.milliseconds() - start;
// 拉取超时就结束
remaining = timeout - elapsed;
}
while (remaining > 0);
return ConsumerRecords.empty();
}
finally {
release();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
使用轻量级锁检测kafkaConsumer是否被其他线程使用
检查超时时间是否小于0,小于0抛出异常,停止消费
检查这个 consumer 是否订阅的相应的 topic-partition
调用 pollOnce() 方法获取相应的 records
在返回获取的 records 前,发送下一次的 fetch 请求,避免用户在下次请求时线程 block 在pollOnce() 方法中
如果在给定的时间(timeout)内获取不到可用的 records,返回空数据
这里可以看出,poll 方法的真正实现是在 pollOnce 方法中,poll 方法通过 pollOnce 方法获取可用的数据
# 4.5.4.2 pollOnce
// 除了获取新数据外,还会做一些必要的 offset-commit和reset-offset的操作
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
client.maybeTriggerWakeup();
// 1. 获取 GroupCoordinator 地址并连接、加入 Group、sync Group、自动commit, join 及 sync 期间 group 会进行 rebalance
coordinator.poll(time.milliseconds(), timeout);
// 2. 更新订阅的 topic-partition 的 offset(如果订阅的 topic-partition list 没有有效的 offset 的情况下)
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
// 3. 获取 fetcher 已经拉取到的数据
Map<TopicPartition, List<ConsumerRecord<K, V>>> records =fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
// 4. 发送 fetch 请求,会从多个 topic-partition 拉取数据(只要对应的 topic-partition 没有未完成的请求)
fetcher.sendFetches();
long now = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(now),timeout);
// 5. 调用 poll 方法发送请求(底层发送请求的接口)
client.poll(pollTimeout, now, new PollCondition() {
@Override
public boolean shouldBlock() {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
}
}
);
// 6. 如果 group 需要 rebalance,直接返回空数据,这样更快地让 group 进行稳定状态
if (coordinator.needRejoin())
return Collections.emptyMap();
// 获取到请求的结果
return fetcher.fetchedRecords();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
pollOnce 可以简单分为6步来看,其作用分别如下:
# 4.5.4.2.1 coordinator.poll()
获取 GroupCoordinator 的地址,并建立相应 tcp 连接,发送 join-group、sync-group,之后才真正加入到了一个 group 中,这时会获取其要消费的 topic-partition 列表,如果设置了自动 commit,也会在这一步进行 commit。总之,对于一个新建的 group,group 状态将会从 Empty –>PreparingRebalance –> AwaiSync –> Stable
;
获取 GroupCoordinator 的地址,并建立相应 tcp 连接;
发送 join-group 请求,然后 group 将会进行 rebalance;
发送 sync-group 请求,之后才正在加入到了一个 group 中,这时会通过请求获取其要消费的topic-partition 列表;
如果设置了自动 commit,也会在这一步进行 commit offset
# 4.5.4.2.2 updateFetchPositions()
这个方法主要是用来更新这个 consumer 实例订阅的 topic-partition 列表的 fetch-offset 信息。目的就是为了获取其订阅的每个 topic-partition 对应的 position,这样 Fetcher 才知道从哪个 offset 开始去拉取这个 topic-partition 的数据
private void updateFetchPositions(Set<TopicPartition> partitions) {
// 先重置那些调用 seekToBegin 和 seekToEnd 的 offset 的 tp,设置其 the fetch position 的 offset
fetcher.resetOffsetsIfNeeded(partitions);
if (!subscriptions.hasAllFetchPositions(partitions)) {
// 获取所有分配 tp 的 offset, 即 committed offset, 更新到TopicPartitionState 中的 committed offset 中
coordinator.refreshCommittedOffsetsIfNeeded();
// 如果 the fetch position 值无效,则将上步获取的 committed offset 设置为 the fetch position
fetcher.updateFetchPositions(partitions);
}
}
2
3
4
5
6
7
8
9
10
在 Fetcher 中,这个 consumer 实例订阅的每个 topic-partition 都会有一个对应的TopicPartitionState 对象,在这个对象中会记录以下这些内容:
private static class TopicPartitionState {
// Fetcher 下次去拉取时的 offset,Fecher 在拉取时需要知道这个值
private Long position;
// last consumed position
// 最后一次获取的高水位标记
private Long highWatermark;
// the high watermark from last fetch
private Long lastStableOffset;
// consumer 已经处理完的最新一条消息的 offset,consumer 主动调用 offset-commit 时会更新这个值;
private OffsetAndMetadata committed;
// last committed position
// 是否暂停
private boolean paused;
// whether this partition has been paused by the user
// 这 topic-partition offset 重置的策略,重置之后,这个策略就会改为 null,防止再次操作
private OffsetResetStrategy resetStrategy;
// the strategy to use if the offset needs resetting
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 4.5.4.2.3 fetcher.fetchedRecords()
返回其 fetched records,并更新其 fetch-position offset,只有在 offset-commit 时(自动commit 时,是在第一步实现的),才会更新其 committed offset;
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
// 在 max.poll.records 中设置单词最大的拉取条数
int recordsRemaining = maxPollRecords;
try {
while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isFetched) {
// 从队列中获取但不移除此队列的头;如果此队列为空,返回null
CompletedFetch completedFetch = completedFetches.peek();
if (completedFetch == null) break;
// 获取下一个要处理的 nextInLineRecords
nextInLineRecords = parseCompletedFetch(completedFetch);
completedFetches.poll();
} else {
// 拉取records,更新 position
List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
TopicPartition partition = nextInLineRecords.partition;
if (!records.isEmpty()) {
List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
if (currentRecords == null) {
fetched.put(partition, records);
} else {
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
fetched.put(partition, newRecords);
}
recordsRemaining -= records.size();
}
}
}
}
catch (KafkaException e) {
if (fetched.isEmpty())
throw e;
}
return fetched;
}
private List<ConsumerRecord<K, V>> fetchRecords(PartitionRecords partitionRecords, int maxRecords) {
if (!subscriptions.isAssigned(partitionRecords.partition)) {
log.debug("Not returning fetched records for partition {} since it is no longer assigned",
partitionRecords.partition);
} else {
long position = subscriptions.position(partitionRecords.partition);
// 这个 tp 不能来消费了,比如调用 pause方法暂停消费
if (!subscriptions.isFetchable(partitionRecords.partition)) {
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable",
partitionRecords.partition);
} else if (partitionRecords.nextFetchOffset == position) {
// 获取该 tp 对应的records,并更新 partitionRecords 的fetchOffset(用于判断是否顺序)
List<ConsumerRecord<K, V>> partRecords = partitionRecords.fetchRecords(maxRecords);
long nextOffset = partitionRecords.nextFetchOffset;
log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
"position to {}", position,partitionRecords.partition, nextOffset);
// 更新消费的到 offset( the fetch position)
subscriptions.position(partitionRecords.partition,nextOffset);
// 获取 Lag(即 position与 hw 之间差值),hw 为 null 时,才返回null
Long partitionLag = subscriptions.partitionLag(partitionRecords.partition, isolationLevel);
if (partitionLag != null)
this.sensors.recordPartitionLag(partitionRecords.partition, partitionLag);
return partRecords;
} else {
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
partitionRecords.partition,partitionRecords.nextFetchOffset, position);
}
}
partitionRecords.drain();
return emptyList();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# 4.5.4.2.4 fetcher.sendFetches()
只要订阅的 topic-partition list 没有未处理的 fetch 请求,就发送对这个 topic-partition 的 fetch 请求,在真正发送时,还是会按 node 级别去发送,leader 是同一个 node 的 topic-partition 会合成一个请求去发送;
// 向订阅的所有 partition (只要该 leader 暂时没有拉取请求)所在 leader 发送 fetch请求
public int sendFetches() {
// 1. 创建 Fetch Request
Map<Node, FetchRequest.Builder> fetchRequestMap =createFetchRequests();
for (Map.Entry<Node, FetchRequest.Builder> fetchEntry :fetchRequestMap.entrySet()) {
final FetchRequest.Builder request = fetchEntry.getValue();
final Node fetchTarget = fetchEntry.getKey();
log.debug("Sending {} fetch for partitions {} to broker {}",isolationLevel, request.fetchData().keySet(),
fetchTarget);
// 2 发送 Fetch Request
client.send(fetchTarget, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
FetchResponse response = (FetchResponse) resp.responseBody();
if (!matchesRequestedPartitions(request,response)) {
log.warn("Ignoring fetch response containing partitions {} since it does not match " +
"the requested partitions {}",response.responseData().keySet(),
request.fetchData().keySet());
return;
}
Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
for (Map.Entry<TopicPartition,FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
long fetchOffset =request.fetchData().get(partition).fetchOffset;
FetchResponse.PartitionData fetchData =entry.getValue();
log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
isolationLevel, fetchOffset,partition, fetchData);
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
resp.requestHeader().apiVersion()));
}
sensors.fetchLatency.record(resp.requestLatencyMs());
}
@Override
public void onFailure(RuntimeException e) {
log.debug("Fetch request {} to {} failed",request.fetchData(), fetchTarget, e);
}
}
);
}
return fetchRequestMap.size();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
createFetchRequests():为订阅的所有 topic-partition list 创建 fetch 请求(只要该topic-partition 没有还在处理的请求),创建的 fetch 请求依然是按照 node 级别创建的;
client.send():发送 fetch 请求,并设置相应的 Listener,请求处理成功的话,就加入到completedFetches 中,在加入这个 completedFetches 集合时,是按照 topic-partition 级别去加入,这样也就方便了后续的处理。
从这里可以看出,在每次发送 fetch 请求时,都会向所有可发送的 topic-partition 发送 fetch 请求,调用一次 fetcher.sendFetches,拉取到的数据,可需要多次 pollOnce 循环才能处理完,因为Fetcher 线程是在后台运行,这也保证了尽可能少地阻塞用户的处理线程,因为如果 Fetcher 中没有可处理的数据,用户的线程是会阻塞在 poll 方法中的
# 4.5.4.2.5 client.poll()
调用底层 NetworkClient 提供的接口去发送相应的请求;
# 4.5.4.2.6 coordinator.needRejoin()
如果当前实例分配的 topic-partition 列表发送了变化,那么这个 consumer group 就需要进行rebalance
# 4.5.5 自动提交
最简单的提交方式是让悄费者自动提交偏移量。如果enable.auto.commit被设为 true,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由auto.commit.interval.ms
控制,默认值是 5s。与消费者里的其他东西 一样,自动提交也是在轮询(poll() )里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那 么就会提交从上一次轮询返回的偏移量。
不过,这种简便的方式也会带来一些问题,来看一下下面的例子:
假设我们仍然使用默认的 5s提交时间间隔,在最近一次提交之后的 3s发生了再均衡,再 均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后 了 3s,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无也完全避免的
# 4.5.6 手动提交
# 4.5.6.1 同步提交
取消自动提交,把 auto.commit.offset
设为 false,让应用程序决定何时提交 偏 移量。使用commitSync()
提交偏移量最简单也最可靠。这个 API会提交由 poll() 方法返回 的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常
while (true) {
// 消息拉取
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(),record.key(), record.value());
}
// 处理完成单次消息以后,ᨀ交当前的offset,如果ᨀ交失败就抛出异常
consumer.commitSync();
}
2
3
4
5
6
7
8
9
# 4.5.6.2 异步提交
同步提交有一个不足之处,在 broker对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。我们可以通过降低提交频率来提升吞吐量,但如果发生了再均衡, 会增加重复消息的数量。这个时候可以使用异步提交 API。我们只管发送提交请求,无需等待 broker的响应。
while (true) {
// 消息拉取
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());
}
// 异步ᨀ交
consumer.commitAsync((offsets, exception) -> {
exception.printStackTrace();
System.out.println(offsets.size());
}
);
}
2
3
4
5
6
7
8
9
10
11
12
13