可以使用 KafkaConsumer API 来消费 Kafka 中的消息,不需要使用 @KafkaListener 注解。以下是使用 Spring + KafkaConsumer API 的示例代码:

  1. 配置 KafkaConsumer:
@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Bean
    public KafkaConsumer<String, String> kafkaConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        return new KafkaConsumer<>(props);
    }
}
  1. 编写 Kafka 消费者:
@Component
public class KafkaMessageConsumer {

    @Autowired
    private KafkaConsumer<String, String> kafkaConsumer;

    @PostConstruct
    public void init() {
        kafkaConsumer.subscribe(Arrays.asList("test-topic"));
        Executors.newSingleThreadExecutor().submit(() -> {
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Received message: " + record.value());
                }
            }
        });
    }
}

在上述代码中,我们使用 @PostConstruct 注解来初始化 Kafka 消费者,然后订阅指定的 topic,并通过 KafkaConsumer.poll() 方法来获取消息并消费。在实际应用中,我们需要根据业务需要来处理消息。

使用 spring+kafka demo 并不使用KafkaListener注解

原文地址: https://www.cveoy.top/t/topic/bZam 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录