Kafka 批量消费可以通过设置消费者的批处理大小来实现。具体步骤如下:

  1. 创建消费者配置
Properties props = new Properties();
props.put('bootstrap.servers', 'localhost:9092');
props.put('group.id', 'group1');
props.put('key.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer');
props.put('value.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer');
  1. 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  1. 订阅主题
consumer.subscribe(Arrays.asList('topic1'));
  1. 设置批处理大小
consumer.poll(Duration.ofMillis(100));
consumer.seekToBeginning(consumer.assignment());
consumer.commitSync();

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    int count = records.count();
    if (count == 0) {
        continue;
    }
    List<String> messages = new ArrayList<>();
    for (ConsumerRecord<String, String> record : records) {
        messages.add(record.value());
    }
    // 批量处理消息
    processBatch(messages);
}

在上面的代码中,我们通过设置consumer.poll(Duration.ofMillis(100))来控制每次消费的批处理大小。当消费者接收到一批消息后,将它们添加到一个List中,并在while循环中批量处理这些消息。注意,我们也可以通过调整auto.commit.interval.ms属性来控制提交偏移量的频率。

Java Kafka 批量消费实现详解

原文地址: https://www.cveoy.top/t/topic/oocT 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录