微服务项目:RocketMQ 消息更新 ES 库存失败的优化方案
在订单服务的 'tackAnOrder' 方法内发送 RocketMQ 消息更新 ES 库存标志时,你发现在搜索服务消费时,库存还是下单扣减前的库存。这可能是由于消息的处理顺序导致的。
为了解决这个问题,你可以尝试以下优化代码的方法:
-
确保消息的顺序性:在发送 RocketMQ 消息时,可以设置消息的顺序属性,确保消息按照发送的顺序进行消费。可以在发送消息时设置消息的顺序标识,例如订单 ID,让消费端按照订单 ID 的顺序进行消费。
-
异步处理消息:可以将发送 RocketMQ 消息的逻辑改为异步处理,将消息发送到消息队列后立即返回,不等待消息消费的结果。这样可以提高订单服务的处理速度,并且不会因为等待消息消费导致库存更新延迟。
-
使用分布式事务:可以将订单服务和搜索服务的库存更新操作放在一个分布式事务中。在订单服务的事务提交后,再发送 RocketMQ 消息,确保订单服务和搜索服务的库存更新在一个事务中进行。
-
检查消费端的代码:检查搜索服务的消费端代码,确保消费端正确处理 RocketMQ 消息,并且按照正确的顺序进行消费和更新库存。
请根据你的具体业务和代码情况选择适合的优化方法,并进行相应的调整和测试。
public interface DrugSource {
String DRUG_CHANGE_OUTPUT = 'drug-change-output';
@Output(DRUG_CHANGE_OUTPUT)
MessageChannel drugChangeOutput();
}
@Service
public class DrugProducer {
private static final String DRUG_CHANGE_TAG = 'drugChange';
'
* 发送药品信息更改的消息
*'
@Resource
private DrugSource drugChangeSource;
public Boolean sendDrugChangeMsg(DrugChangeMessage message) {
Message<DrugChangeMessage> msg = MessageBuilder.
withPayload(message).
setHeader(MessageConst.PROPERTY_KEYS, message.getDrugId()).
setHeader(MessageConst.PROPERTY_TAGS, DRUG_CHANGE_TAG).
build();
return drugChangeSource.drugChangeOutput().send(msg);
}
}
@Transactional(rollbackFor = Exception.class)
@Override
public TackAnOrderVo tackAnOrder(PlaceOrderVo req) {
//处理订单逻辑
//扣减库存
//将需要变更库存的商品ID加进集合,后面发生mq
List<Long> drugIdList = new ArrayList<>();
//保持订单
//发送mq更新es库存标志
for (Long drugId : drugIdList) {
DrugChangeMessage drugChangeMessage = new DrugChangeMessage();
drugChangeMessage.setDrugId(drugId);
drugChangeMessage.setStoreId(req.getStoreId());
drugChangeMessage.setChangeType(5);
drugChangeMessage.setMerchantId(Long.parseLong(merchantId));
drugChangeMessage.setEnterpriseId(Long.parseLong(enterpriseId));
drugChangeMessage.setPlatformId(Long.parseLong(platformId));
drugProducer.sendDrugChangeMsg(drugChangeMessage);
}
}```
原文地址: https://www.cveoy.top/t/topic/phGl 著作权归作者所有。请勿转载和采集!