以下是基于 RocketMQ 的 Java 操作实现增删改查的代码示例:

  1. 建立连接
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();
  1. 发送消息
Message message = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
  1. 消费消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();
  1. 删除消息
// 删除指定消息
MessageExt msg = ...;
producer.deleteMessage(msg);

// 删除所有消息
producer.deleteMessage(new MessageExt());
  1. 查询消息
// 根据消息ID查询消息
MessageExt msg = consumer.viewMessage("messageId");
System.out.printf("%s%n", msg);

// 根据消息Key查询消息
MessageExt msg = consumer.viewMessage("TopicTest", "messageKey");
System.out.printf("%s%n", msg);

注意:以上代码示例仅为参考,具体实现可能会根据实际需求进行调整

rocketMQ java操作实现增删改查代码

原文地址: http://www.cveoy.top/t/topic/htMv 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录