java连接kafka服务器并监听队列
以下是Java连接Kafka服务器并监听队列的基本步骤:
-
安装Kafka服务器并启动。
-
在Java代码中引入Kafka相关的依赖,例如kafka-clients和slf4j等。
-
创建Kafka消费者对象,指定Kafka服务器地址和消费者组ID。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
- 订阅要监听的Kafka队列。
consumer.subscribe(Arrays.asList("test-topic"));
- 在循环中不断拉取Kafka队列中的消息并进行处理。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
- 关闭Kafka消费者对象。
consumer.close();
``
原文地址: https://www.cveoy.top/t/topic/dxE7 著作权归作者所有。请勿转载和采集!