使用 spring+kafka+springMvc 并不使用KafkaListener注解 demo
在使用 spring+kafka+springMvc 的情况下,如果不想使用 @KafkaListener 注解来监听 Kafka 消息,可以使用 KafkaTemplate 来发送和接收消息。
首先,在 Spring 配置文件中配置 KafkaTemplate:
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<props>
<prop key="bootstrap.servers">localhost:9092</prop>
<prop key="key.serializer">org.apache.kafka.common.serialization.StringSerializer</prop>
<prop key="value.serializer">org.apache.kafka.common.serialization.StringSerializer</prop>
</props>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
然后,在 SpringMvc 的 Controller 中注入 KafkaTemplate:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
接下来,可以使用 KafkaTemplate 发送和接收消息。比如,发送消息可以使用以下代码:
kafkaTemplate.send("test-topic", "test-message");
其中,第一个参数是消息发送的主题,第二个参数是消息的内容。
接收消息可以使用以下代码:
ConsumerRecords<String, String> records = kafkaTemplate.execute(new KafkaOperations.ProducerCallback<String, String, ConsumerRecords<String, String>>() {
@Override
public ConsumerRecords<String, String> doInKafka(Producer<String, String> producer) {
ConsumerRecords<String, String> records = null;
while (records == null || records.isEmpty()) {
producer.flush();
records = kafkaConsumer.poll(Duration.ofMillis(1000));
}
return records;
}
});
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic() + " - " + record.partition() + " - " + record.offset() + " - " + record.key() + " - " + record.value());
}
其中,KafkaTemplate 的 execute 方法可以执行一个 kafka 操作,这里使用了 ProducerCallback 接口的 doInKafka 方法来执行消费操作。在 doInKafka 方法中,先等待一段时间,然后使用 KafkaConsumer 来获取消费的消息。最后,遍历 ConsumerRecords 中的消息即可。
需要注意的是,在使用 KafkaTemplate 接收消息时,需要手动创建一个 KafkaConsumer,并调用其 poll 方法来获取消息。另外,这种方式不支持并发消费,因为每个请求都会创建一个 KafkaConsumer。如果需要支持并发消费,可以使用 Spring 的 ConcurrentMessageListenerContainer。
原文地址: https://www.cveoy.top/t/topic/bZbd 著作权归作者所有。请勿转载和采集!