SpringBoot 整合 Kafka:自定义 KafkaTemplate 打印访问地址
SpringBoot 整合 Kafka:自定义 KafkaTemplate 打印访问地址
本文将介绍 Spring Boot 整合 Kafka 的基本步骤,并讲解如何通过自定义 KafkaTemplate 类,在发送消息时在控制台打印实际访问 Kafka 的地址 IP 和端口等信息。
1. 添加依赖
在 pom.xml 文件中添加 Kafka 和 Spring Kafka 的依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
2. 配置 Kafka
在 application.yml 文件中添加 Kafka 相关的配置:
spring:
kafka:
bootstrap-servers: localhost:9092
3. 创建 KafkaProducerConfig
创建一个 KafkaProducerConfig 类,用于配置 KafkaTemplate:
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
4. 发送消息
在需要发送消息的地方注入 KafkaTemplate,并使用 send() 方法发送消息:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
5. 自定义 KafkaTemplate
如果想在发送消息的同时在控制台打印实际访问 Kafka 的地址 IP 和端口等信息,可以自定义 KafkaTemplate。创建一个 MyKafkaTemplate 类,继承 KafkaTemplate,并重写 doSend() 方法:
public class MyKafkaTemplate<K, V> extends KafkaTemplate<K, V> {
private static final Logger LOGGER = LoggerFactory.getLogger(MyKafkaTemplate.class);
public MyKafkaTemplate(ProducerFactory<K, V> producerFactory) {
super(producerFactory);
}
@Override
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
LOGGER.info('Sending message to topic '{}' at {}', producerRecord.topic(), ((DefaultKafkaProducerFactory<?, ?>) getProducerFactory()).getConfigurationProperties().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
return super.doSend(producerRecord);
}
}
6. 修改 KafkaProducerConfig
修改 KafkaProducerConfig 类中的 kafkaTemplate() 方法,返回 MyKafkaTemplate 类的实例:
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new MyKafkaTemplate<>(producerFactory());
}
通过以上步骤,您就可以在发送消息时在控制台打印实际访问 Kafka 的地址 IP 和端口等信息。
原文地址: https://www.cveoy.top/t/topic/mQ3b 著作权归作者所有。请勿转载和采集!