以下是Java连接Kafka服务器并监听队列的基本步骤:

  1. 安装Kafka服务器并启动。

  2. 在Java代码中引入Kafka相关的依赖,例如kafka-clients和slf4j等。

  3. 创建Kafka消费者对象,指定Kafka服务器地址和消费者组ID。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
  1. 订阅要监听的Kafka队列。
consumer.subscribe(Arrays.asList("test-topic"));
  1. 在循环中不断拉取Kafka队列中的消息并进行处理。
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}
  1. 关闭Kafka消费者对象。
consumer.close();
``
java连接kafka服务器并监听队列

原文地址: https://www.cveoy.top/t/topic/dxE7 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录