Redis 消息消费限速实现 - 每秒只消费 10 条数据

本文介绍如何使用 Java 代码实现 Redis 消息消费限速,确保每秒只消费 10 条数据。通过 AtomicInteger 和 Thread.sleep() 方法控制消费速度,并使用 log4j 记录日志信息。

private AtomicInteger count = new AtomicInteger(0);

@Override
public void onMessage(Message message, byte[] pattern) {
    String channel = new String(pattern);
    log.info('onMessage --> 消息通道是:' + channel);

    RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer();
    Object deserialize = valueSerializer.deserialize(message.getBody());
    log.info('反序列化的结果:' + deserialize);
    if (deserialize == null) return;
    String md5DigestAsHex = DigestUtils.md5DigestAsHex(deserialize.toString().getBytes(StandardCharsets.UTF_8));
    log.info('计算得到的key: ' + md5DigestAsHex);
    Boolean result = redisTemplate.opsForValue().setIfAbsent(md5DigestAsHex, '1', 20, TimeUnit.SECONDS);
    if (Boolean.TRUE.equals(result)) {
        // redis消息进行处理
        log.info('接收的结果:' + deserialize.toString());
        if (count.incrementAndGet() >= 10) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                log.error('线程休眠异常: ' + e.getMessage());
            } finally {
                count.set(0);
            }
        }
    } else {
        log.info('其他服务处理中');
    }
}

代码解析:

  1. AtomicInteger count: 用于记录每秒消费的 message 数量。
  2. count.incrementAndGet() >= 10: 当 count 的值大于等于 10 时,执行限速逻辑。
  3. Thread.sleep(1000): 让线程休眠 1 秒,控制每秒消费不超过 10 条消息。
  4. count.set(0): 重置 count 为 0,准备下一秒的计数。

注意:

  • 该代码使用 log4j 记录日志信息,请根据实际情况修改日志配置。
  • 您可以根据需要调整每秒消费的数量和休眠时间。
  • 这种限速方法只能保证在大多数情况下每秒消费不超过 10 条消息,在极端情况下可能会有延迟。

其他方法:

除了使用 AtomicInteger 和 Thread.sleep(),还可以使用其他方法实现 Redis 消息消费限速,例如:

  • 使用 Spring 的 RateLimiter 来实现限速。
  • 使用 Redis 的 Lua 脚本实现限速。
  • 使用第三方工具实现限速。

选择合适的限速方法取决于您的具体需求和场景。


原文地址: https://www.cveoy.top/t/topic/piek 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录