Java Kafka 生产者发送消息静态工具类 - 支持获取回调信息
以下是一个 Java 集成 Kafka 的生产者发消息静态工具类,支持获取回调获取 topic 名称、所在的 topic 分区数、所在分区的 offset、HTTP 接入层收到消息的时间、消息发送至 Kafka 的时间:
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaProducerUtil {
private static final Logger logger = LoggerFactory.getLogger(KafkaProducerUtil.class);
private static KafkaProducer<String, String> producer;
private static String topicName = "test_topic";
static {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
public static void sendMessage(String message) {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, message);
long startTime = System.currentTimeMillis();
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
logger.error("Error while producing message to topic :" + metadata.topic());
logger.error("Error while producing message to partition :" + metadata.partition());
logger.error("Error while producing message to offset :" + metadata.offset());
logger.error("Error while producing message to timestamp :" + metadata.timestamp());
} else {
logger.info("Message sent to topic :" + metadata.topic());
logger.info("Message sent to partition :" + metadata.partition());
logger.info("Message sent to offset :" + metadata.offset());
logger.info("Message sent to timestamp :" + metadata.timestamp());
long endTime = System.currentTimeMillis();
logger.info("HTTP接入层收到消息的时间: " + new Date(startTime));
logger.info("消息发送至Kafka的时间:" + new Date(endTime));
}
}
});
}
public static void closeProducer() {
producer.close();
}
}
使用方法如下:
KafkaProducerUtil.sendMessage("Hello Kafka!");
其中,topicName 可以根据需要进行修改。sendMessage 方法会将消息发送到指定的 topic,同时在发送完成后获取回调信息,并输出日志。closeProducer 方法用于关闭生产者。
原文地址: https://www.cveoy.top/t/topic/oa2y 著作权归作者所有。请勿转载和采集!