以下是一个 Java 集成 Kafka 的生产者发消息静态工具类,支持获取回调获取 topic 名称、所在的 topic 分区数、所在分区的 offset、HTTP 接入层收到消息的时间、消息发送至 Kafka 的时间:

import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaProducerUtil {

    private static final Logger logger = LoggerFactory.getLogger(KafkaProducerUtil.class);
    private static KafkaProducer<String, String> producer;
    private static String topicName = "test_topic";

    static {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(props);
    }

    public static void sendMessage(String message) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topicName, message);
        long startTime = System.currentTimeMillis();
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    logger.error("Error while producing message to topic :" + metadata.topic());
                    logger.error("Error while producing message to partition :" + metadata.partition());
                    logger.error("Error while producing message to offset :" + metadata.offset());
                    logger.error("Error while producing message to timestamp :" + metadata.timestamp());
                } else {
                    logger.info("Message sent to topic :" + metadata.topic());
                    logger.info("Message sent to partition :" + metadata.partition());
                    logger.info("Message sent to offset :" + metadata.offset());
                    logger.info("Message sent to timestamp :" + metadata.timestamp());
                    long endTime = System.currentTimeMillis();
                    logger.info("HTTP接入层收到消息的时间: " + new Date(startTime));
                    logger.info("消息发送至Kafka的时间:" + new Date(endTime));
                }
            }
        });
    }

    public static void closeProducer() {
        producer.close();
    }

}

使用方法如下:

KafkaProducerUtil.sendMessage("Hello Kafka!");

其中,topicName 可以根据需要进行修改。sendMessage 方法会将消息发送到指定的 topic,同时在发送完成后获取回调信息,并输出日志。closeProducer 方法用于关闭生产者。


原文地址: https://www.cveoy.top/t/topic/oa2y 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录