SpringBoot 整合 Kafka:自定义 KafkaTemplate 打印访问地址

本文将介绍 Spring Boot 整合 Kafka 的基本步骤,并讲解如何通过自定义 KafkaTemplate 类,在发送消息时在控制台打印实际访问 Kafka 的地址 IP 和端口等信息。

1. 添加依赖

pom.xml 文件中添加 Kafka 和 Spring Kafka 的依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.5.5.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.5.0</version>
</dependency>

2. 配置 Kafka

application.yml 文件中添加 Kafka 相关的配置:

spring:
  kafka:
    bootstrap-servers: localhost:9092

3. 创建 KafkaProducerConfig

创建一个 KafkaProducerConfig 类,用于配置 KafkaTemplate:

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

4. 发送消息

在需要发送消息的地方注入 KafkaTemplate,并使用 send() 方法发送消息:

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String topic, String message) {
    kafkaTemplate.send(topic, message);
}

5. 自定义 KafkaTemplate

如果想在发送消息的同时在控制台打印实际访问 Kafka 的地址 IP 和端口等信息,可以自定义 KafkaTemplate。创建一个 MyKafkaTemplate 类,继承 KafkaTemplate,并重写 doSend() 方法:

public class MyKafkaTemplate<K, V> extends KafkaTemplate<K, V> {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyKafkaTemplate.class);

    public MyKafkaTemplate(ProducerFactory<K, V> producerFactory) {
        super(producerFactory);
    }

    @Override
    protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
        LOGGER.info('Sending message to topic '{}' at {}', producerRecord.topic(), ((DefaultKafkaProducerFactory<?, ?>) getProducerFactory()).getConfigurationProperties().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
        return super.doSend(producerRecord);
    }
}

6. 修改 KafkaProducerConfig

修改 KafkaProducerConfig 类中的 kafkaTemplate() 方法,返回 MyKafkaTemplate 类的实例:

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new MyKafkaTemplate<>(producerFactory());
}

通过以上步骤,您就可以在发送消息时在控制台打印实际访问 Kafka 的地址 IP 和端口等信息。

SpringBoot 整合 Kafka:自定义 KafkaTemplate 打印访问地址

原文地址: https://www.cveoy.top/t/topic/mQ3b 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录