RabbitMQ 死信队列监听器优化:避免消息循环发送
@Component public class MQDeadLetterListener {
private final Logger logger = LoggerFactory.getLogger(getClass());
private static final int MAX_RETRY_TIMES = 3; // 最大重试次数
@Autowired
private AmqpTemplate amqpTemplate;
@RabbitListener(queuesToDeclare = @Queue('movie.type.list.dead'))
public void consumer(Message message, Channel channel) throws Exception {
String msgId = message.getMessageProperties().getCorrelationId();
int retryTimes = (int) message.getMessageProperties().getHeaders().getOrDefault('retryTimes', 0);
try {
logger.info('消息发送到死信队列. msgId={}, body={}', msgId, new String(message.getBody()));
// 发送确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (retryTimes < MAX_RETRY_TIMES) {
logger.error('消息消费失败,将重新发送. msgId={}, body={}, retryTimes={}', msgId, new String(message.getBody()), retryTimes);
// 发送拒绝消息,将消息重新发送到原队列
message.getMessageProperties().getHeaders().put('retryTimes', retryTimes + 1);
amqpTemplate.convertAndSend('movie.type.list', message.getBody(), msg -> {
msg.getMessageProperties().setExpiration(String.valueOf((retryTimes + 1) * 1000)); // 设置延时时间
msg.getMessageProperties().setCorrelationId(msgId);
return msg;
});
} else {
logger.error('消息消费失败,已达到最大重试次数,将丢弃消息. msgId={}, body={}, retryTimes={}', msgId, new String(message.getBody()), retryTimes);
// 发送确认消息,将消息从队列中删除
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// TODO: 可以将丢弃的消息进入人工干预流程
}
}
}
}
原文地址: https://www.cveoy.top/t/topic/noVq 著作权归作者所有。请勿转载和采集!