SpringBoot + RetryTemplate实现RocketMQ异步发送消息失败重试配置与实现
- 引入依赖
在pom.xml文件中引入SpringBoot和RocketMQ的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
- 配置RocketMQ
在application.yml文件中配置RocketMQ的相关信息,比如namesrv地址、组名、topic等:
rocketmq:
name-server: localhost:9876
producer:
group: default
retries: 3
consumer:
group: default
concurrency: 1
max-reconsume-times: 3
template:
producer:
retry-times-when-send-async-failed: 3
其中producer.retries表示异步发送消息失败的重试次数,template.producer.retry-times-when-send-async-failed表示异步发送消息失败时的重试次数。
- 实现异步发送消息
在代码中使用RocketMQ的RocketMQTemplate实现异步发送消息:
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendAsyncMessage(String message) {
rocketMQTemplate.asyncSend("test-topic", message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 消息发送成功处理
}
@Override
public void onException(Throwable e) {
// 消息发送失败处理
}
});
}
- 实现重试机制
使用Spring的RetryTemplate实现异步发送消息失败的重试机制。
首先,需要在pom.xml文件中引入Spring Retry的依赖:
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>1.3.1.RELEASE</version>
</dependency>
然后,定义重试策略:
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
// 重试策略:3次重试,每次重试间隔1秒
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
最后,在异步发送消息的方法中加入RetryCallback和RecoveryCallback:
@Autowired
private RetryTemplate retryTemplate;
public void sendAsyncMessage(String message) {
retryTemplate.execute(new RetryCallback<Void, Exception>() {
@Override
public Void doWithRetry(RetryContext retryContext) throws Exception {
rocketMQTemplate.asyncSend("test-topic", message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 消息发送成功处理
}
@Override
public void onException(Throwable e) {
// 消息发送失败处理
throw new RuntimeException(e);
}
});
return null;
}
}, new RecoveryCallback<Void>() {
@Override
public Void recover(RetryContext retryContext) throws Exception {
// 重试失败后的处理
return null;
}
});
}
这样,当异步发送消息失败时,RetryTemplate会自动进行重试,直到达到最大重试次数或消息发送成功为止
原文地址: https://www.cveoy.top/t/topic/eOxs 著作权归作者所有。请勿转载和采集!