使用 spring+kafka demo 并不使用KafkaListener注解
可以使用 KafkaConsumer API 来消费 Kafka 中的消息,不需要使用 @KafkaListener 注解。以下是使用 Spring + KafkaConsumer API 的示例代码:
- 配置 KafkaConsumer:
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Bean
public KafkaConsumer<String, String> kafkaConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new KafkaConsumer<>(props);
}
}
- 编写 Kafka 消费者:
@Component
public class KafkaMessageConsumer {
@Autowired
private KafkaConsumer<String, String> kafkaConsumer;
@PostConstruct
public void init() {
kafkaConsumer.subscribe(Arrays.asList("test-topic"));
Executors.newSingleThreadExecutor().submit(() -> {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
}
});
}
}
在上述代码中,我们使用 @PostConstruct 注解来初始化 Kafka 消费者,然后订阅指定的 topic,并通过 KafkaConsumer.poll() 方法来获取消息并消费。在实际应用中,我们需要根据业务需要来处理消息。
原文地址: https://www.cveoy.top/t/topic/bZam 著作权归作者所有。请勿转载和采集!