ListenableFutureSendResultString String listenableFuture = kafkaTemplatesendtopicName msg;怎么同步回调获取topic名称、所在的topic分区数、所在分区的offset、HTTP接入层收到消息的时间、消息发送至Kafka的时间:
可以通过使用ListenableFuture的addCallback()方法来添加回调函数,回调函数中可以获取到发送结果SendResult,从而获取到topic和partition信息,以及消息发送和接收时间。具体的代码如下:
ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topicName, msg);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
String topic = result.getRecordMetadata().topic();
int partition = result.getRecordMetadata().partition();
long offset = result.getRecordMetadata().offset();
long sendTime = result.getRecordMetadata().timestamp();
long receiveTime = System.currentTimeMillis();
// do something with the results
}
@Override
public void onFailure(Throwable ex) {
// handle failure
}
});
``
原文地址: http://www.cveoy.top/t/topic/fjEX 著作权归作者所有。请勿转载和采集!