RocketMQ 顺序消息发送和消费指南 - 保证消息顺序性
RocketMQ 顺序消息发送和消费指南
RocketMQ 提供了顺序发送和消费消息的功能,可以保证消息的顺序性。
顺序发送消息
- 创建一个顺序生产者对象,设置生产者组名和 NameServer 地址。
- 启动生产者。
- 使用顺序生产者发送消息,指定发送的消息顺序属性。
- 关闭生产者。
顺序消费消息
- 创建一个顺序消费者对象,设置消费者组名和 NameServer 地址。
- 注册消息监听器,实现
MessageListenerConcurrently接口的consumeMessage方法,用于处理接收到的消息。 - 启动消费者。
- 关闭消费者。
注意事项
- 顺序发送和消费消息需要保证消息的顺序属性一致。
- 顺序发送和消费消息需要使用相同的消费者组名。
- 顺序发送和消费消息需要在同一个 Topic 下进行。
- 顺序消费消息在同一个消息队列上是串行执行的。
示例代码
顺序发送消息
DefaultMQProducer producer = new DefaultMQProducer('group_name');
producer.setNamesrvAddr('127.0.0.1:9876');
producer.start();
Message message1 = new Message('topic', 'order', '1'.getBytes());
message1.setKeys('1');
message1.setTags('order');
message1.putUserProperty('sequenceId', '1');
SendResult sendResult1 = producer.send(message1);
producer.shutdown();
顺序消费消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer('group_name');
consumer.setNamesrvAddr('127.0.0.1:9876');
consumer.subscribe('topic', 'order');
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// 处理消息
System.out.println(new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
原文地址: https://www.cveoy.top/t/topic/qyP8 著作权归作者所有。请勿转载和采集!