@Bean注解的KafkaTemplate创建与消息发送IP/端口打印
这段代码的作用是创建一个KafkaTemplate实例,用于发送消息到Kafka。如果当前没有KafkaTemplate bean存在,则会创建一个。
要在KafkaTemplate发送消息时打印消息实际发往的Kafka的ip和端口,可以在创建KafkaProducerFactory时设置一个自定义的ProducerInterceptor。在该Interceptor中可以获取到ProducerRecord对象,从而获取到消息实际发送到的Kafka的ip和端口信息。具体操作如下:
-
创建一个ProducerInterceptor实现类,实现'onSend()'方法,该方法中可以获取到ProducerRecord对象,从而获取到消息实际发送到的Kafka的ip和端口信息。
-
在创建KafkaProducerFactory时设置该Interceptor,如下所示:
@Bean
public ProducerFactory<Object, Object> kafkaProducerFactory() {
Map<String, Object> producerConfigs = new HashMap<>();
// 配置producerConfigs,省略...
DefaultKafkaProducerFactory<Object, Object> producerFactory =
new DefaultKafkaProducerFactory<>(producerConfigs);
// 设置自定义的ProducerInterceptor
producerFactory.setInterceptors(Collections.singletonList(new MyProducerInterceptor()));
return producerFactory;
}
- 在KafkaTemplate bean的创建方法中,将ProducerFactory注入到KafkaTemplate中,如下所示:
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
if (this.messageConverter != null) {
kafkaTemplate.setMessageConverter(this.messageConverter);
}
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
通过以上操作,就可以在KafkaTemplate发送消息时打印消息实际发往的Kafka的ip和端口信息了。
原文地址: https://www.cveoy.top/t/topic/mRi3 著作权归作者所有。请勿转载和采集!