以下是使用 Spring + Kafka 不使用 @KafkaListener 注解的示例代码:

  1. 首先,在 Spring 配置文件中配置 Kafka 相关的 bean:
<bean id="kafkaProducerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
    <constructor-arg>
        <map>
            <entry key="bootstrap.servers" value="localhost:9092"/>
            <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
            <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
        </map>
    </constructor-arg>
</bean>

<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg ref="kafkaProducerFactory"/>
</bean>

<bean id="kafkaConsumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
        <map>
            <entry key="bootstrap.servers" value="localhost:9092"/>
            <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
            <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
            <entry key="group.id" value="test-group"/>
            <entry key="auto.offset.reset" value="earliest"/>
            <entry key="enable.auto.commit" value="false"/>
        </map>
    </constructor-arg>
</bean>

<bean id="kafkaListenerContainerFactory" class="org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory">
    <property name="consumerFactory" ref="kafkaConsumerFactory"/>
    <property name="batchListener" value="true"/>
    <property name="concurrency" value="3"/>
    <property name="pollTimeout" value="3000"/>
    <property name="ackMode" value="MANUAL_IMMEDIATE"/>
</bean>
  1. 创建一个 KafkaProducer 和 KafkaConsumer:
public class KafkaDemo {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final Consumer<String, String> kafkaConsumer;

    public KafkaDemo(KafkaTemplate<String, String> kafkaTemplate, Consumer<String, String> kafkaConsumer) {
        this.kafkaTemplate = kafkaTemplate;
        this.kafkaConsumer = kafkaConsumer;
    }

    public void sendMessage(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }

            @Override
            public void onFailure(Throwable ex) {
                System.err.println("Unable to send message=[" + message + "] due to : " + ex.getMessage());
            }
        });
    }

    public void consume(String topic) {
        kafkaConsumer.subscribe(Collections.singletonList(topic));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: topic=" + record.topic() + ", partition=" + record.partition()
                        + ", offset=" + record.offset() + ", key=" + record.key() + ", value=" + record.value());
                kafkaConsumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)));
            }
        }
    }
}
  1. 在 Spring 配置文件中创建 KafkaDemo 实例:
<bean id="kafkaDemo" class="com.example.KafkaDemo">
    <constructor-arg ref="kafkaTemplate"/>
    <constructor-arg ref="kafkaConsumer"/>
</bean>
  1. 在代码中使用 KafkaDemo 发送和接收消息:
@Autowired
private KafkaDemo kafkaDemo;

public void send(String topic, String message) {
    kafkaDemo.sendMessage(topic, message);
}

public void receive(String topic) {
    kafkaDemo.consume(topic);
}

这样就可以使用 Spring + Kafka 不使用 @KafkaListener 注解来发送和接收消息了。

使用 spring+kafka 并不使用KafkaListener注解 demo

原文地址: https://www.cveoy.top/t/topic/bZaO 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录