在使用 spring+kafka+springMvc 的情况下,如果不想使用 @KafkaListener 注解来监听 Kafka 消息,可以使用 KafkaTemplate 来发送和接收消息。

首先,在 Spring 配置文件中配置 KafkaTemplate:

<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <props>
                    <prop key="bootstrap.servers">localhost:9092</prop>
                    <prop key="key.serializer">org.apache.kafka.common.serialization.StringSerializer</prop>
                    <prop key="value.serializer">org.apache.kafka.common.serialization.StringSerializer</prop>
                </props>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>

然后,在 SpringMvc 的 Controller 中注入 KafkaTemplate:

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

接下来,可以使用 KafkaTemplate 发送和接收消息。比如,发送消息可以使用以下代码:

kafkaTemplate.send("test-topic", "test-message");

其中,第一个参数是消息发送的主题,第二个参数是消息的内容。

接收消息可以使用以下代码:

ConsumerRecords<String, String> records = kafkaTemplate.execute(new KafkaOperations.ProducerCallback<String, String, ConsumerRecords<String, String>>() {
    @Override
    public ConsumerRecords<String, String> doInKafka(Producer<String, String> producer) {
        ConsumerRecords<String, String> records = null;
        while (records == null || records.isEmpty()) {
            producer.flush();
            records = kafkaConsumer.poll(Duration.ofMillis(1000));
        }
        return records;
    }
});
for (ConsumerRecord<String, String> record : records) {
    System.out.println(record.topic() + " - " + record.partition() + " - " + record.offset() + " - " + record.key() + " - " + record.value());
}

其中,KafkaTemplate 的 execute 方法可以执行一个 kafka 操作,这里使用了 ProducerCallback 接口的 doInKafka 方法来执行消费操作。在 doInKafka 方法中,先等待一段时间,然后使用 KafkaConsumer 来获取消费的消息。最后,遍历 ConsumerRecords 中的消息即可。

需要注意的是,在使用 KafkaTemplate 接收消息时,需要手动创建一个 KafkaConsumer,并调用其 poll 方法来获取消息。另外,这种方式不支持并发消费,因为每个请求都会创建一个 KafkaConsumer。如果需要支持并发消费,可以使用 Spring 的 ConcurrentMessageListenerContainer。

使用 spring+kafka+springMvc 并不使用KafkaListener注解 demo

原文地址: https://www.cveoy.top/t/topic/bZbd 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录