SpringBoot项目配置Kafka消费组监听教程
要在 Spring Boot 项目中配置 Kafka 消费组监听,需要进行以下步骤:
- 添加 Kafka 依赖:在项目的 pom.xml 文件中添加 Kafka 依赖。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
- 创建 Kafka 消费者配置类:创建一个 Kafka 消费者配置类,用于配置 Kafka 连接和消费者的相关属性。
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value('${kafka.bootstrap-servers}')
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
- 创建 Kafka 消费者:创建一个 Kafka 消费者类,用于监听 Kafka 消息并处理。
@Component
public class KafkaConsumer {
@KafkaListener(topics = '${kafka.topic}')
public void consume(String message) {
// 处理 Kafka 消息
System.out.println('Received message: ' + message);
}
}
- 配置 Kafka 连接和消费者属性:在项目的 application.properties 或 application.yml 文件中配置 Kafka 连接和消费者的相关属性。
kafka.bootstrap-servers=localhost:9092
kafka.topic=my-topic
或
kafka:
bootstrap-servers: localhost:9092
topic: my-topic
通过以上步骤,就可以在 Spring Boot 项目中配置 Kafka 消费组监听了。Kafka 消费者会自动连接到 Kafka 服务器并监听指定的主题,一旦有消息到达,就会调用consume方法进行处理。
原文地址: https://www.cveoy.top/t/topic/qnWV 著作权归作者所有。请勿转载和采集!