Redis 消息监听器 - 使用 @Slf4j 和 @Component 注解
@Slf4j @Component public class RedisMessageListenerListener implements MessageListener { private AtomicInteger counter = new AtomicInteger(0); private Timer timer = new Timer(); @Resource private RedisTemplate<String, Object> redisTemplate; @Autowired private AsyncInstDataCapture asyncInstDataCapture;
/**
* 消息处理
*
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
// 增加计数器
counter.incrementAndGet();
// 判断计数器是否超过限制
if (counter.get() <= 10) {
// 处理消息
String channel = new String(pattern);
//log.info('onMessage --> 消息通道是:' + channel);
RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer();
Object deserialize = valueSerializer.deserialize(message.getBody());
//log.info('反序列化的结果:' + deserialize);
if (deserialize == null) return;
String md5DigestAsHex = DigestUtils.md5DigestAsHex(deserialize.toString().getBytes(StandardCharsets.UTF_8));
//log.info('计算得到的key: ' + md5DigestAsHex);
Boolean result = redisTemplate.opsForValue().setIfAbsent(md5DigestAsHex, '1', 20, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(result)) {
// redis消息进行处理
log.info('接收的结果:' + deserialize.toString());
if (StringUtil.isNotEmpty(deserialize.toString())) {
HashMap hashMap = JSON.parseObject(deserialize.toString(), HashMap.class);
String instid = hashMap.get('instid').toString();
String bar = hashMap.get('bar').toString();
switch (bar) {
case '1m':
asyncInstDataCapture.dataCapture1m(instid);
break;
case '5m':
asyncInstDataCapture.dataCapture5m(instid);
break;
case '15m':
asyncInstDataCapture.dataCapture15m(instid);
break;
case '30m':
asyncInstDataCapture.dataCapture30m(instid);
break;
case '1h':
asyncInstDataCapture.dataCapture1h(instid);
break;
case '4h':
asyncInstDataCapture.dataCapture4h(instid);
break;
default:
}
}
} else {
log.info('其他服务处理中');
}
}
// 定时器每秒重置计数器
timer.schedule(new TimerTask() {
@Override
public void run() {
counter.set(0);
}
}, 1200);
}
}
消息频率非常高,没有进入判断的数据都丢失了内容:根据你的描述,似乎是由于消息频率非常高,导致没有进入判断的数据丢失了。为了解决这个问题,你可以考虑以下两种方案:
-
调整计数器的限制:将计数器的限制提高,使其能够处理更多的消息。你可以根据实际情况调整计数器的限制值。
-
使用消息队列:将消息存储到一个消息队列中,然后在后台异步处理消息。这样可以避免消息丢失的问题,并且能够更好地控制消息处理的速度。
你可以根据实际情况选择合适的方案来解决问题。
原文地址: http://www.cveoy.top/t/topic/pjge 著作权归作者所有。请勿转载和采集!