在微服务项目order服务的tackAnOrder方法内,发送RocketMQ消息更新ES库存标志,商品库存表很大,保存时间较长,在search服务消费时,发现库存还是下单扣减前的库存,RocketMQ的延时消息没有延时,如何优化代码?

首先,可以考虑将发送RocketMQ消息的逻辑放在扣减库存的循环之外,这样可以减少发送消息的次数。

修改后的代码如下:

@Transactional(rollbackFor = Exception.class)
@Override
public TackAnOrderVo tackAnOrder(PlaceOrderVo req) {
    //处理订单逻辑

    //扣减库存
    //将需要变更库存的商品ID加进集合,后面发生mq
    List<Long> drugIdList = new ArrayList<>();
    //保持订单

    //发送mq更新es库存标志
    List<DrugChangeMessage> drugChangeMessages = new ArrayList<>();
    for (Long drugId : drugIdList) {
        DrugChangeMessage drugChangeMessage = new DrugChangeMessage();
        drugChangeMessage.setDrugId(drugId);
        drugChangeMessage.setStoreId(req.getStoreId());
        drugChangeMessage.setChangeType(5);
        drugChangeMessage.setMerchantId(Long.parseLong(merchantId));
        drugChangeMessage.setEnterpriseId(Long.parseLong(enterpriseId));
        drugChangeMessage.setPlatformId(Long.parseLong(platformId));
        drugChangeMessages.add(drugChangeMessage);
    }
    drugProducer.sendDrugChangeMsg(drugChangeMessages);
}

然后,在DrugProducer类中修改sendDrugChangeMsg方法,支持批量发送消息:

public Boolean sendDrugChangeMsg(List<DrugChangeMessage> messages) {
    List<Message<DrugChangeMessage>> msgList = new ArrayList<>();
    for (DrugChangeMessage message : messages) {
        Message<DrugChangeMessage> msg = MessageBuilder.
                withPayload(message).
                setHeader(MessageConst.PROPERTY_KEYS, message.getDrugId()).
                setHeader(MessageConst.PROPERTY_TAGS, DRUG_CHANGE_TAG).
                setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 3).
                build();
        msgList.add(msg);
    }
    return drugChangeSource.drugChangeOutput().send(msgList);
}

这样可以减少发送消息的次数,提高性能。

另外,还可以考虑使用RocketMQ的批量发送消息功能,将多个消息打包成一个消息发送,进一步减少网络开销和提高性能。具体做法是将多个消息放入同一个MessageBatch对象中,然后发送该MessageBatch对象。

修改后的sendDrugChangeMsg方法如下:

public Boolean sendDrugChangeMsg(List<DrugChangeMessage> messages) {
    MessageBatch messageBatch = MessageBuilder.createBatch();
    for (DrugChangeMessage message : messages) {
        Message<DrugChangeMessage> msg = MessageBuilder.
                withPayload(message).
                setHeader(MessageConst.PROPERTY_KEYS, message.getDrugId()).
                setHeader(MessageConst.PROPERTY_TAGS, DRUG_CHANGE_TAG).
                setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 3).
                build();
        messageBatch.addMessage(msg);
    }
    return drugChangeSource.drugChangeOutput().send(messageBatch);
}

这样可以将多个消息打包成一个消息批量发送,进一步提高性能。

需要注意的是,接收方需要支持批量接收消息,并对消息进行解析处理。

代码示例:

public interface DrugSource {
    String DRUG_CHANGE_OUTPUT='drug-change-output';

    @Output(DRUG_CHANGE_OUTPUT)
    MessageChannel drugChangeOutput();
}

@Service
public class DrugProducer {
    private static final String DRUG_CHANGE_TAG='drugChange';
    /**
     * 发送药品信息更改的消息
     */
    @Resource
    private DrugSource drugChangeSource;

    public Boolean sendDrugChangeMsg(DrugChangeMessage message) {
        Message<DrugChangeMessage> msg = MessageBuilder.
                withPayload(message).
                setHeader(MessageConst.PROPERTY_KEYS, message.getDrugId()).
                setHeader(MessageConst.PROPERTY_TAGS, DRUG_CHANGE_TAG).
                setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 3).
                build();
        return drugChangeSource.drugChangeOutput().send(msg);
    }
}

通过以上优化措施,可以有效解决RocketMQ延迟消息失效导致ES库存更新问题,并提高系统性能。


原文地址: http://www.cveoy.top/t/topic/phGN 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录