使用 Spring Kafka 的 kafkaTemplate.send() 方法发送消息时,可以通过 ListenableFutureaddCallback() 方法添加回调函数,在回调函数中获取 SendResult 对象,从而获取到发送结果信息。

以下是代码示例,展示如何获取 topic 名称、所在 partition 分区数、所在分区的 offset、HTTP 接入层收到消息的时间、消息发送至 Kafka 的时间:

ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topicName, msg);

listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
    @Override
    public void onSuccess(SendResult<String, String> result) {
        String topic = result.getRecordMetadata().topic();
        int partition = result.getRecordMetadata().partition();
        long offset = result.getRecordMetadata().offset();
        long sendTime = result.getRecordMetadata().timestamp();
        long receiveTime = System.currentTimeMillis();
        // 使用获取到的信息进行其他操作
    }

    @Override
    public void onFailure(Throwable ex) {
        // 处理发送失败的情况
    }
});

解释:

  • topic:发送消息的主题名称。
  • partition:消息所在的 Kafka 分区。
  • offset:消息在分区中的偏移量。
  • sendTime:消息发送到 Kafka Broker 的时间戳。
  • receiveTime:HTTP 接入层收到消息的时间戳,可以通过 System.currentTimeMillis() 获取。

通过以上代码,您可以获取到发送消息的关键信息,并在回调函数中进行相应的处理,例如记录消息发送日志、监控消息发送延迟等。

Kafka Template SendResult: 获取 Topic 名称、分区、Offset 和时间信息

原文地址: https://www.cveoy.top/t/topic/oa6O 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录