以下是一个简单的示例代码,用于消费RocketMQ中的dev_anlog主题数据,并检查时间戳是否超过15分钟:

import time
from rocketmq.client import PushConsumer, ConsumeStatus, MessageModel

consumer = PushConsumer('consumer_group_name')
consumer.set_name_server_address('localhost:9876')
consumer.set_message_model(MessageModel.CLUSTERING)

def callback(msg):
    # 解析消息中的时间戳
    timestamp = int(msg.properties['timestamp'])
    # 计算当前时间与消息时间戳之间的差距
    delta = time.time() * 1000 - timestamp
    if delta > 900000: # 超过15分钟
        print(f"异常:消息时间戳为{timestamp},当前时间戳为{time.time()*1000}")
    else:
        print("正常:", msg.body.decode())

consumer.subscribe('dev_anlog', callback)
consumer.start()

首先,我们创建一个PushConsumer对象,并设置RocketMQ的NameServer地址和消息模式。然后,我们定义一个回调函数,该函数将在每次接收到消息时被调用。

在回调函数中,我们首先解析消息中的时间戳,并计算当前时间与消息时间戳之间的差距。如果差距超过15分钟,我们将打印一条异常消息;否则,我们将打印消息的内容。

最后,我们订阅dev_anlog主题,并启动PushConsumer对象。这将开始消费RocketMQ中的消息,并将它们传递给我们的回调函数进行处理

python消费rocket mq中dev_anlog这个topic的数据然后根据数据解析数据中的时间与当前时间对比相差超过15分钟记录异常如果消费不到数据同样记录异常

原文地址: https://www.cveoy.top/t/topic/dpmZ 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录