Redis消息消费控制:每秒限速10条数据不抛异常

在处理Redis消息时,为了避免过快的消费速度导致系统异常,可以通过控制消费频率来确保稳定性。本文将介绍一种使用计数器和定时器来限制每秒消费数据数量的方法,并确保不会抛出异常。

代码示例:

@Override
public void onMessage(Message message, byte[] pattern) {
    String channel = new String(pattern);
    log.info('onMessage --> 消息通道是:' + channel);

    RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer();
    Object deserialize = valueSerializer.deserialize(message.getBody());
    log.info('反序列化的结果:' + deserialize);
    if (deserialize == null) return;
    String md5DigestAsHex = DigestUtils.md5DigestAsHex(deserialize.toString().getBytes(StandardCharsets.UTF_8));
    log.info('计算得到的key: ' + md5DigestAsHex);
    Boolean result = redisTemplate.opsForValue().setIfAbsent(md5DigestAsHex, '1', 20, TimeUnit.SECONDS);
    if (Boolean.TRUE.equals(result)) {
        // redis消息进行处理
        log.info('接收的结果:' + deserialize.toString());
    } else {
        log.info('其他服务处理中');
    }
}

private AtomicInteger counter = new AtomicInteger(0);

// 每秒定时重置计数器的值为0
@Scheduled(fixedRate = 1000)
public void resetCounter() {
    counter.set(0);
}

@Override
public void onMessage(Message message, byte[] pattern) {
    // 判断计数器的值是否小于10
    if (counter.get() < 10) {
        // 进行消费
        // ...
        // 计数器加1
        counter.incrementAndGet();
    } else {
        // 等待下次定时器触发时重置计数器的值
    }
}

解释:

  1. 计数器: 使用AtomicInteger类型作为计数器,保证线程安全。
  2. 定时器: 使用@Scheduled注解,每秒钟执行一次resetCounter方法,将计数器重置为0。
  3. 消费控制:onMessage方法中判断计数器的值是否小于10,如果小于10,则进行消息消费,否则等待下次定时器重置计数器。

优点:

  • 限制每秒消费数据数量,避免消费过快导致系统异常。
  • 使用计数器和定时器,代码简洁易懂。
  • 不会抛出异常,保证系统稳定运行。

注意:

  • 调整fixedRate参数可以改变每秒消费数据的数量。
  • 可以根据实际情况,调整消费控制逻辑。

通过使用计数器和定时器,可以轻松实现Redis消息消费的限速,确保系统稳定运行。


原文地址: https://www.cveoy.top/t/topic/pief 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录