rocketMQ java操作实现增删改查代码
以下是基于 RocketMQ 的 Java 操作实现增删改查的代码示例:
- 建立连接
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
- 发送消息
Message message = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
- 消费消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
- 删除消息
// 删除指定消息
MessageExt msg = ...;
producer.deleteMessage(msg);
// 删除所有消息
producer.deleteMessage(new MessageExt());
- 查询消息
// 根据消息ID查询消息
MessageExt msg = consumer.viewMessage("messageId");
System.out.printf("%s%n", msg);
// 根据消息Key查询消息
MessageExt msg = consumer.viewMessage("TopicTest", "messageKey");
System.out.printf("%s%n", msg);
注意:以上代码示例仅为参考,具体实现可能会根据实际需求进行调整
原文地址: http://www.cveoy.top/t/topic/htMv 著作权归作者所有。请勿转载和采集!