使用 spring+kafka 并不使用KafkaListener注解 demo
以下是使用 Spring + Kafka 不使用 @KafkaListener 注解的示例代码:
- 首先,在 Spring 配置文件中配置 Kafka 相关的 bean:
<bean id="kafkaProducerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
</map>
</constructor-arg>
</bean>
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="kafkaProducerFactory"/>
</bean>
<bean id="kafkaConsumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="group.id" value="test-group"/>
<entry key="auto.offset.reset" value="earliest"/>
<entry key="enable.auto.commit" value="false"/>
</map>
</constructor-arg>
</bean>
<bean id="kafkaListenerContainerFactory" class="org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory">
<property name="consumerFactory" ref="kafkaConsumerFactory"/>
<property name="batchListener" value="true"/>
<property name="concurrency" value="3"/>
<property name="pollTimeout" value="3000"/>
<property name="ackMode" value="MANUAL_IMMEDIATE"/>
</bean>
- 创建一个 KafkaProducer 和 KafkaConsumer:
public class KafkaDemo {
private final KafkaTemplate<String, String> kafkaTemplate;
private final Consumer<String, String> kafkaConsumer;
public KafkaDemo(KafkaTemplate<String, String> kafkaTemplate, Consumer<String, String> kafkaConsumer) {
this.kafkaTemplate = kafkaTemplate;
this.kafkaConsumer = kafkaConsumer;
}
public void sendMessage(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
System.err.println("Unable to send message=[" + message + "] due to : " + ex.getMessage());
}
});
}
public void consume(String topic) {
kafkaConsumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: topic=" + record.topic() + ", partition=" + record.partition()
+ ", offset=" + record.offset() + ", key=" + record.key() + ", value=" + record.value());
kafkaConsumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)));
}
}
}
}
- 在 Spring 配置文件中创建 KafkaDemo 实例:
<bean id="kafkaDemo" class="com.example.KafkaDemo">
<constructor-arg ref="kafkaTemplate"/>
<constructor-arg ref="kafkaConsumer"/>
</bean>
- 在代码中使用 KafkaDemo 发送和接收消息:
@Autowired
private KafkaDemo kafkaDemo;
public void send(String topic, String message) {
kafkaDemo.sendMessage(topic, message);
}
public void receive(String topic) {
kafkaDemo.consume(topic);
}
这样就可以使用 Spring + Kafka 不使用 @KafkaListener 注解来发送和接收消息了。
原文地址: https://www.cveoy.top/t/topic/bZaO 著作权归作者所有。请勿转载和采集!