RocketMQ 事务消息优化:解决 ES 库存更新延迟问题

在微服务项目中,order 服务可能会发送 RocketMQ 消息更新 ES 库存标志。当商品库存表很大,保存时间较长时,search 服务消费消息后,可能会发现库存还是下单扣减前的库存,造成数据不一致。为了解决这个问题,可以使用 RocketMQ 提供的事务消息功能,确保 ES 库存与订单数据一致。

问题描述

在 order 服务中,tackAnOrder 方法处理订单逻辑,包括扣减库存和发送 RocketMQ 消息更新 ES 库存标志。由于库存表很大,保存时间较长,search 服务消费消息时,可能会发现库存还是下单扣减前的库存,导致数据不一致。

解决方案

使用 RocketMQ 的事务消息功能,可以确保本地事务(扣减库存)和消息发送同时成功或失败,避免数据不一致的问题。

1. 定义事务消息监听器

@Component
public class DrugChangeMessageListener implements TransactionalMessageListener {

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 在这个方法中执行本地事务,包括扣减库存和保存订单等操作

        // 如果本地事务执行成功,则返回 COMMIT_MESSAGE
        return LocalTransactionState.COMMIT_MESSAGE;
        
        // 如果本地事务执行失败,则返回 ROLLBACK_MESSAGE
        // return LocalTransactionState.ROLLBACK_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 在这个方法中检查本地事务的状态,如果本地事务成功提交,则返回 COMMIT_MESSAGE
        // 如果本地事务尚未完成或者执行失败,则返回 UNKNOW
        return LocalTransactionState.COMMIT_MESSAGE;
        
        // 如果本地事务执行失败,则返回 ROLLBACK_MESSAGE
        // return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

2. 发送事务消息

@Transactional(rollbackFor = Exception.class)
@Override
public TackAnOrderVo tackAnOrder(PlaceOrderVo req) {
    // 处理订单逻辑
    
    // 扣减库存
    // 将需要变更库存的商品ID加进集合,后面发生mq
    List<Long> drugIdList = new ArrayList<>();
    // 保持订单
    
    // 发送mq更新es库存标志
    for (Long drugId : drugIdList) {
        DrugChangeMessage drugChangeMessage = new DrugChangeMessage();
        drugChangeMessage.setDrugId(drugId);
        drugChangeMessage.setStoreId(req.getStoreId());
        drugChangeMessage.setChangeType(5);
        drugChangeMessage.setMerchantId(Long.parseLong(merchantId));
        drugChangeMessage.setEnterpriseId(Long.parseLong(enterpriseId));
        drugChangeMessage.setPlatformId(Long.parseLong(platformId));
        Message<DrugChangeMessage> msg = MessageBuilder.
            withPayload(drugChangeMessage).
            setHeader(MessageConst.PROPERTY_KEYS, drugChangeMessage.getDrugId()).
            setHeader(MessageConst.PROPERTY_TAGS, DRUG_CHANGE_TAG).
            build();
        TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(
            DRUG_CHANGE_GROUP, DRUG_CHANGE_TOPIC, msg, null);
        if (sendResult.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE) {
            // 事务消息发送失败,执行回滚操作
            throw new RuntimeException("Failed to send transactional message.");
        }
    }
}

代码说明:

  • DrugChangeMessageListener 类实现了 TransactionalMessageListener 接口,用于处理事务消息的本地事务逻辑和状态检查。
  • executeLocalTransaction 方法执行本地事务,包括扣减库存和保存订单等操作。如果本地事务执行成功,则返回 COMMIT_MESSAGE;如果本地事务执行失败,则返回 ROLLBACK_MESSAGE
  • checkLocalTransaction 方法检查本地事务的状态。如果本地事务成功提交,则返回 COMMIT_MESSAGE;如果本地事务尚未完成或者执行失败,则返回 UNKNOW;如果本地事务执行失败,则返回 ROLLBACK_MESSAGE
  • tackAnOrder 方法使用 rocketMQTemplate.sendMessageInTransaction 方法发送事务消息。如果事务消息发送失败,则执行回滚操作。

代码示例:

public interface DrugSource {
    String DRUG_CHANGE_OUTPUT="drug-change-output";

    @Output(DRUG_CHANGE_OUTPUT)
    MessageChannel drugChangeOutput();
}

@Service
public class DrugProducer {
    private static final String DRUG_CHANGE_TAG="drugChange";
    
    /**
     * 发送药品信息更改的消息
     */
    @Resource
    private DrugSource drugChangeSource;

    public Boolean sendDrugChangeMsg(DrugChangeMessage message) {
        Message<DrugChangeMessage> msg = MessageBuilder.
                withPayload(message).
                setHeader(MessageConst.PROPERTY_KEYS, message.getDrugId()).
                setHeader(MessageConst.PROPERTY_TAGS, DRUG_CHANGE_TAG).
                build();
        return drugChangeSource.drugChangeOutput().send(msg);
    }
}

总结

使用 RocketMQ 的事务消息功能,可以确保本地事务和消息发送同时成功或失败,保证数据一致性。在使用事务消息时,需要注意本地事务的实现和状态检查,以及事务消息发送的成功与失败处理。

注意:

  • 事务消息需要在 RocketMQ 集群中配置开启事务消息功能。
  • 本地事务的实现和状态检查需要根据具体业务逻辑进行编写。
  • 为了提高可靠性,可以考虑使用事务消息的回查机制,确保本地事务和消息发送的一致性。

希望这篇文章对你有所帮助。


原文地址: http://www.cveoy.top/t/topic/phGt 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录