RocketMQ是一款分布式消息中间件,具有高可靠性、高吞吐量、低延迟等特点,适用于大规模分布式系统中的消息通信、异步通信、事务消息等场景。下面是RocketMQ实战的一些例子:

  1. 安装RocketMQ

安装RocketMQ的步骤包括下载解压、配置环境变量、启动NameServer和Broker等。具体步骤可以参考官方文档。

  1. 发送和接收消息

RocketMQ提供了多种发送和接收消息的方式,包括同步发送、异步发送、单向发送、顺序发送、广播发送等。下面以同步发送和接收消息为例:

发送消息:

DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();

Message message = new Message("topic", "tag", "hello world".getBytes(StandardCharsets.UTF_8));
SendResult result = producer.send(message);
System.out.println(result);

producer.shutdown();

接收消息:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic", "tag");

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
        for (MessageExt message : messages) {
            System.out.println(new String(message.getBody(), StandardCharsets.UTF_8));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

consumer.start();
  1. 顺序消息

RocketMQ支持顺序消息,即保证同一消息队列中的消息按照发送顺序进行消费。下面是一个顺序消息的例子:

发送消息:

DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();

List<Message> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
    Message message = new Message("topic", "tag", ("message " + i).getBytes(StandardCharsets.UTF_8));
    message.setKeys(String.valueOf(i));
    messages.add(message);
}
producer.send(messages, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> queues, Message message, Object arg) {
        Integer index = (Integer) arg;
        return queues.get(index % queues.size());
    }
}, 0);

producer.shutdown();

接收消息:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic", "tag");

consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messages, ConsumeOrderlyContext context) {
        for (MessageExt message : messages) {
            System.out.println(new String(message.getBody(), StandardCharsets.UTF_8));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

consumer.start();
  1. 事务消息

RocketMQ支持事务消息,即在发送消息时可以指定事务ID,然后在回调方法中执行本地事务,最后根据本地事务的执行结果提交或回滚事务。下面是一个事务消息的例子:

发送消息:

TransactionMQProducer producer = new TransactionMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
        try {
            // 执行本地事务
            System.out.println("execute local transaction");
            Thread.sleep(5000);
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (InterruptedException e) {
            e.printStackTrace();
            return LocalTransactionState.UNKNOW;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt message) {
        // 检查本地事务的状态
        System.out.println("check local transaction");
        return LocalTransactionState.COMMIT_MESSAGE;
    }
});
producer.start();

Message message = new Message("topic", "tag", "hello world".getBytes(StandardCharsets.UTF_8));
TransactionSendResult result = producer.sendMessageInTransaction(message, null);
System.out.println(result);

producer.shutdown();

接收消息:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic", "tag");

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
        for (MessageExt message : messages) {
            System.out.println(new String(message.getBody(), StandardCharsets.UTF_8));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

consumer.start();

以上是RocketMQ实战的一些例子,可以根据具体需求选择不同的发送和接收消息方式,以及使用RocketMQ提供的其他特性

rocketMQ 实战

原文地址: http://www.cveoy.top/t/topic/htNj 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录