要在 Spring Boot 项目中配置 Kafka 消费组监听,需要进行以下步骤:

  1. 添加 Kafka 依赖:在项目的 pom.xml 文件中添加 Kafka 依赖。
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
  1. 创建 Kafka 消费者配置类:创建一个 Kafka 消费者配置类,用于配置 Kafka 连接和消费者的相关属性。
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value('${kafka.bootstrap-servers}')
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}
  1. 创建 Kafka 消费者:创建一个 Kafka 消费者类,用于监听 Kafka 消息并处理。
@Component
public class KafkaConsumer {

    @KafkaListener(topics = '${kafka.topic}')
    public void consume(String message) {
        // 处理 Kafka 消息
        System.out.println('Received message: ' + message);
    }

}
  1. 配置 Kafka 连接和消费者属性:在项目的 application.properties 或 application.yml 文件中配置 Kafka 连接和消费者的相关属性。
kafka.bootstrap-servers=localhost:9092
kafka.topic=my-topic

kafka:
  bootstrap-servers: localhost:9092
  topic: my-topic

通过以上步骤,就可以在 Spring Boot 项目中配置 Kafka 消费组监听了。Kafka 消费者会自动连接到 Kafka 服务器并监听指定的主题,一旦有消息到达,就会调用consume方法进行处理。

SpringBoot项目配置Kafka消费组监听教程

原文地址: https://www.cveoy.top/t/topic/qnWV 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录