Kafka Template SendResult: 获取 Topic 名称、分区、Offset 和时间信息
使用 Spring Kafka 的 kafkaTemplate.send() 方法发送消息时,可以通过 ListenableFuture 的 addCallback() 方法添加回调函数,在回调函数中获取 SendResult 对象,从而获取到发送结果信息。
以下是代码示例,展示如何获取 topic 名称、所在 partition 分区数、所在分区的 offset、HTTP 接入层收到消息的时间、消息发送至 Kafka 的时间:
ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topicName, msg);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
String topic = result.getRecordMetadata().topic();
int partition = result.getRecordMetadata().partition();
long offset = result.getRecordMetadata().offset();
long sendTime = result.getRecordMetadata().timestamp();
long receiveTime = System.currentTimeMillis();
// 使用获取到的信息进行其他操作
}
@Override
public void onFailure(Throwable ex) {
// 处理发送失败的情况
}
});
解释:
topic:发送消息的主题名称。partition:消息所在的 Kafka 分区。offset:消息在分区中的偏移量。sendTime:消息发送到 Kafka Broker 的时间戳。receiveTime:HTTP 接入层收到消息的时间戳,可以通过System.currentTimeMillis()获取。
通过以上代码,您可以获取到发送消息的关键信息,并在回调函数中进行相应的处理,例如记录消息发送日志、监控消息发送延迟等。
原文地址: https://www.cveoy.top/t/topic/oa6O 著作权归作者所有。请勿转载和采集!