@Component public class MQDeadLetterListener {

private final Logger logger = LoggerFactory.getLogger(getClass());

private static final int MAX_RETRY_TIMES = 3; // 最大重试次数

@Autowired
private AmqpTemplate amqpTemplate;

@RabbitListener(queuesToDeclare = @Queue('movie.type.list.dead'))
public void consumer(Message message, Channel channel) throws Exception {
    String msgId = message.getMessageProperties().getCorrelationId();
    int retryTimes = (int) message.getMessageProperties().getHeaders().getOrDefault('retryTimes', 0);
    try {
        logger.info('消息发送到死信队列. msgId={}, body={}', msgId, new String(message.getBody()));
        // 发送确认消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        if (retryTimes < MAX_RETRY_TIMES) {
            logger.error('消息消费失败,将重新发送. msgId={}, body={}, retryTimes={}', msgId, new String(message.getBody()), retryTimes);
            // 发送拒绝消息,将消息重新发送到原队列
            message.getMessageProperties().getHeaders().put('retryTimes', retryTimes + 1);
            amqpTemplate.convertAndSend('movie.type.list', message.getBody(), msg -> {
                msg.getMessageProperties().setExpiration(String.valueOf((retryTimes + 1) * 1000)); // 设置延时时间
                msg.getMessageProperties().setCorrelationId(msgId);
                return msg;
            });
        } else {
            logger.error('消息消费失败,已达到最大重试次数,将丢弃消息. msgId={}, body={}, retryTimes={}', msgId, new String(message.getBody()), retryTimes);
            // 发送确认消息,将消息从队列中删除
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            // TODO: 可以将丢弃的消息进入人工干预流程
        }
    }
}

}

RabbitMQ 死信队列监听器优化:避免消息循环发送

原文地址: https://www.cveoy.top/t/topic/noVq 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录