可以通过使用ListenableFuture的addCallback()方法来添加回调函数,回调函数中可以获取到发送结果SendResult,从而获取到topic和partition信息,以及消息发送和接收时间。具体的代码如下:

ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topicName, msg);

listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
    @Override
    public void onSuccess(SendResult<String, String> result) {
        String topic = result.getRecordMetadata().topic();
        int partition = result.getRecordMetadata().partition();
        long offset = result.getRecordMetadata().offset();
        long sendTime = result.getRecordMetadata().timestamp();
        long receiveTime = System.currentTimeMillis();
        // do something with the results
    }

    @Override
    public void onFailure(Throwable ex) {
        // handle failure
    }
});
``
ListenableFutureSendResultString String listenableFuture = kafkaTemplatesendtopicName msg;怎么同步回调获取topic名称、所在的topic分区数、所在分区的offset、HTTP接入层收到消息的时间、消息发送至Kafka的时间:

原文地址: http://www.cveoy.top/t/topic/fjEX 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录