RocketMQ 事务消息优化:解决 ES 库存更新延迟问题
RocketMQ 事务消息优化:解决 ES 库存更新延迟问题
在微服务项目中,order 服务可能会发送 RocketMQ 消息更新 ES 库存标志。当商品库存表很大,保存时间较长时,search 服务消费消息后,可能会发现库存还是下单扣减前的库存,造成数据不一致。为了解决这个问题,可以使用 RocketMQ 提供的事务消息功能,确保 ES 库存与订单数据一致。
问题描述
在 order 服务中,tackAnOrder
方法处理订单逻辑,包括扣减库存和发送 RocketMQ 消息更新 ES 库存标志。由于库存表很大,保存时间较长,search 服务消费消息时,可能会发现库存还是下单扣减前的库存,导致数据不一致。
解决方案
使用 RocketMQ 的事务消息功能,可以确保本地事务(扣减库存)和消息发送同时成功或失败,避免数据不一致的问题。
1. 定义事务消息监听器
@Component
public class DrugChangeMessageListener implements TransactionalMessageListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 在这个方法中执行本地事务,包括扣减库存和保存订单等操作
// 如果本地事务执行成功,则返回 COMMIT_MESSAGE
return LocalTransactionState.COMMIT_MESSAGE;
// 如果本地事务执行失败,则返回 ROLLBACK_MESSAGE
// return LocalTransactionState.ROLLBACK_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 在这个方法中检查本地事务的状态,如果本地事务成功提交,则返回 COMMIT_MESSAGE
// 如果本地事务尚未完成或者执行失败,则返回 UNKNOW
return LocalTransactionState.COMMIT_MESSAGE;
// 如果本地事务执行失败,则返回 ROLLBACK_MESSAGE
// return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
2. 发送事务消息
@Transactional(rollbackFor = Exception.class)
@Override
public TackAnOrderVo tackAnOrder(PlaceOrderVo req) {
// 处理订单逻辑
// 扣减库存
// 将需要变更库存的商品ID加进集合,后面发生mq
List<Long> drugIdList = new ArrayList<>();
// 保持订单
// 发送mq更新es库存标志
for (Long drugId : drugIdList) {
DrugChangeMessage drugChangeMessage = new DrugChangeMessage();
drugChangeMessage.setDrugId(drugId);
drugChangeMessage.setStoreId(req.getStoreId());
drugChangeMessage.setChangeType(5);
drugChangeMessage.setMerchantId(Long.parseLong(merchantId));
drugChangeMessage.setEnterpriseId(Long.parseLong(enterpriseId));
drugChangeMessage.setPlatformId(Long.parseLong(platformId));
Message<DrugChangeMessage> msg = MessageBuilder.
withPayload(drugChangeMessage).
setHeader(MessageConst.PROPERTY_KEYS, drugChangeMessage.getDrugId()).
setHeader(MessageConst.PROPERTY_TAGS, DRUG_CHANGE_TAG).
build();
TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(
DRUG_CHANGE_GROUP, DRUG_CHANGE_TOPIC, msg, null);
if (sendResult.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE) {
// 事务消息发送失败,执行回滚操作
throw new RuntimeException("Failed to send transactional message.");
}
}
}
代码说明:
DrugChangeMessageListener
类实现了TransactionalMessageListener
接口,用于处理事务消息的本地事务逻辑和状态检查。executeLocalTransaction
方法执行本地事务,包括扣减库存和保存订单等操作。如果本地事务执行成功,则返回COMMIT_MESSAGE
;如果本地事务执行失败,则返回ROLLBACK_MESSAGE
。checkLocalTransaction
方法检查本地事务的状态。如果本地事务成功提交,则返回COMMIT_MESSAGE
;如果本地事务尚未完成或者执行失败,则返回UNKNOW
;如果本地事务执行失败,则返回ROLLBACK_MESSAGE
。tackAnOrder
方法使用rocketMQTemplate.sendMessageInTransaction
方法发送事务消息。如果事务消息发送失败,则执行回滚操作。
代码示例:
public interface DrugSource {
String DRUG_CHANGE_OUTPUT="drug-change-output";
@Output(DRUG_CHANGE_OUTPUT)
MessageChannel drugChangeOutput();
}
@Service
public class DrugProducer {
private static final String DRUG_CHANGE_TAG="drugChange";
/**
* 发送药品信息更改的消息
*/
@Resource
private DrugSource drugChangeSource;
public Boolean sendDrugChangeMsg(DrugChangeMessage message) {
Message<DrugChangeMessage> msg = MessageBuilder.
withPayload(message).
setHeader(MessageConst.PROPERTY_KEYS, message.getDrugId()).
setHeader(MessageConst.PROPERTY_TAGS, DRUG_CHANGE_TAG).
build();
return drugChangeSource.drugChangeOutput().send(msg);
}
}
总结
使用 RocketMQ 的事务消息功能,可以确保本地事务和消息发送同时成功或失败,保证数据一致性。在使用事务消息时,需要注意本地事务的实现和状态检查,以及事务消息发送的成功与失败处理。
注意:
- 事务消息需要在 RocketMQ 集群中配置开启事务消息功能。
- 本地事务的实现和状态检查需要根据具体业务逻辑进行编写。
- 为了提高可靠性,可以考虑使用事务消息的回查机制,确保本地事务和消息发送的一致性。
希望这篇文章对你有所帮助。
原文地址: http://www.cveoy.top/t/topic/phGt 著作权归作者所有。请勿转载和采集!