Java Kafka 批量消费实现详解
Kafka 批量消费可以通过设置消费者的批处理大小来实现。具体步骤如下:
- 创建消费者配置
Properties props = new Properties();
props.put('bootstrap.servers', 'localhost:9092');
props.put('group.id', 'group1');
props.put('key.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer');
props.put('value.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer');
- 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- 订阅主题
consumer.subscribe(Arrays.asList('topic1'));
- 设置批处理大小
consumer.poll(Duration.ofMillis(100));
consumer.seekToBeginning(consumer.assignment());
consumer.commitSync();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
int count = records.count();
if (count == 0) {
continue;
}
List<String> messages = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
messages.add(record.value());
}
// 批量处理消息
processBatch(messages);
}
在上面的代码中,我们通过设置consumer.poll(Duration.ofMillis(100))来控制每次消费的批处理大小。当消费者接收到一批消息后,将它们添加到一个List中,并在while循环中批量处理这些消息。注意,我们也可以通过调整auto.commit.interval.ms属性来控制提交偏移量的频率。
原文地址: https://www.cveoy.top/t/topic/oocT 著作权归作者所有。请勿转载和采集!