这段代码的作用是创建一个KafkaTemplate实例,用于发送消息到Kafka。如果当前没有KafkaTemplate bean存在,则会创建一个。

要在KafkaTemplate发送消息时打印消息实际发往的Kafka的ip和端口,可以在创建KafkaProducerFactory时设置一个自定义的ProducerInterceptor。在该Interceptor中可以获取到ProducerRecord对象,从而获取到消息实际发送到的Kafka的ip和端口信息。具体操作如下:

  1. 创建一个ProducerInterceptor实现类,实现'onSend()'方法,该方法中可以获取到ProducerRecord对象,从而获取到消息实际发送到的Kafka的ip和端口信息。

  2. 在创建KafkaProducerFactory时设置该Interceptor,如下所示:

@Bean
public ProducerFactory<Object, Object> kafkaProducerFactory() {
    Map<String, Object> producerConfigs = new HashMap<>();
    // 配置producerConfigs,省略...
    DefaultKafkaProducerFactory<Object, Object> producerFactory =
            new DefaultKafkaProducerFactory<>(producerConfigs);
    // 设置自定义的ProducerInterceptor
    producerFactory.setInterceptors(Collections.singletonList(new MyProducerInterceptor()));
    return producerFactory;
}
  1. 在KafkaTemplate bean的创建方法中,将ProducerFactory注入到KafkaTemplate中,如下所示:
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener) {
    KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
    if (this.messageConverter != null) {
        kafkaTemplate.setMessageConverter(this.messageConverter);
    }
    kafkaTemplate.setProducerListener(kafkaProducerListener);
    kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
    return kafkaTemplate;
}

通过以上操作,就可以在KafkaTemplate发送消息时打印消息实际发往的Kafka的ip和端口信息了。

@Bean注解的KafkaTemplate创建与消息发送IP/端口打印

原文地址: https://www.cveoy.top/t/topic/mRi3 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录