微服务项目order服务优化:解决RocketMQ延迟消息失效导致ES库存更新问题
在微服务项目order服务的tackAnOrder方法内,发送RocketMQ消息更新ES库存标志,商品库存表很大,保存时间较长,在search服务消费时,发现库存还是下单扣减前的库存,RocketMQ的延时消息没有延时,如何优化代码?
首先,可以考虑将发送RocketMQ消息的逻辑放在扣减库存的循环之外,这样可以减少发送消息的次数。
修改后的代码如下:
@Transactional(rollbackFor = Exception.class)
@Override
public TackAnOrderVo tackAnOrder(PlaceOrderVo req) {
//处理订单逻辑
//扣减库存
//将需要变更库存的商品ID加进集合,后面发生mq
List<Long> drugIdList = new ArrayList<>();
//保持订单
//发送mq更新es库存标志
List<DrugChangeMessage> drugChangeMessages = new ArrayList<>();
for (Long drugId : drugIdList) {
DrugChangeMessage drugChangeMessage = new DrugChangeMessage();
drugChangeMessage.setDrugId(drugId);
drugChangeMessage.setStoreId(req.getStoreId());
drugChangeMessage.setChangeType(5);
drugChangeMessage.setMerchantId(Long.parseLong(merchantId));
drugChangeMessage.setEnterpriseId(Long.parseLong(enterpriseId));
drugChangeMessage.setPlatformId(Long.parseLong(platformId));
drugChangeMessages.add(drugChangeMessage);
}
drugProducer.sendDrugChangeMsg(drugChangeMessages);
}
然后,在DrugProducer
类中修改sendDrugChangeMsg
方法,支持批量发送消息:
public Boolean sendDrugChangeMsg(List<DrugChangeMessage> messages) {
List<Message<DrugChangeMessage>> msgList = new ArrayList<>();
for (DrugChangeMessage message : messages) {
Message<DrugChangeMessage> msg = MessageBuilder.
withPayload(message).
setHeader(MessageConst.PROPERTY_KEYS, message.getDrugId()).
setHeader(MessageConst.PROPERTY_TAGS, DRUG_CHANGE_TAG).
setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 3).
build();
msgList.add(msg);
}
return drugChangeSource.drugChangeOutput().send(msgList);
}
这样可以减少发送消息的次数,提高性能。
另外,还可以考虑使用RocketMQ的批量发送消息功能,将多个消息打包成一个消息发送,进一步减少网络开销和提高性能。具体做法是将多个消息放入同一个MessageBatch
对象中,然后发送该MessageBatch
对象。
修改后的sendDrugChangeMsg
方法如下:
public Boolean sendDrugChangeMsg(List<DrugChangeMessage> messages) {
MessageBatch messageBatch = MessageBuilder.createBatch();
for (DrugChangeMessage message : messages) {
Message<DrugChangeMessage> msg = MessageBuilder.
withPayload(message).
setHeader(MessageConst.PROPERTY_KEYS, message.getDrugId()).
setHeader(MessageConst.PROPERTY_TAGS, DRUG_CHANGE_TAG).
setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 3).
build();
messageBatch.addMessage(msg);
}
return drugChangeSource.drugChangeOutput().send(messageBatch);
}
这样可以将多个消息打包成一个消息批量发送,进一步提高性能。
需要注意的是,接收方需要支持批量接收消息,并对消息进行解析处理。
代码示例:
public interface DrugSource {
String DRUG_CHANGE_OUTPUT='drug-change-output';
@Output(DRUG_CHANGE_OUTPUT)
MessageChannel drugChangeOutput();
}
@Service
public class DrugProducer {
private static final String DRUG_CHANGE_TAG='drugChange';
/**
* 发送药品信息更改的消息
*/
@Resource
private DrugSource drugChangeSource;
public Boolean sendDrugChangeMsg(DrugChangeMessage message) {
Message<DrugChangeMessage> msg = MessageBuilder.
withPayload(message).
setHeader(MessageConst.PROPERTY_KEYS, message.getDrugId()).
setHeader(MessageConst.PROPERTY_TAGS, DRUG_CHANGE_TAG).
setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 3).
build();
return drugChangeSource.drugChangeOutput().send(msg);
}
}
通过以上优化措施,可以有效解决RocketMQ延迟消息失效导致ES库存更新问题,并提高系统性能。
原文地址: http://www.cveoy.top/t/topic/phGN 著作权归作者所有。请勿转载和采集!