Redis 消息消费限速实现 - 每秒只消费 10 条数据
Redis 消息消费限速实现 - 每秒只消费 10 条数据
本文介绍如何使用 Java 代码实现 Redis 消息消费限速,确保每秒只消费 10 条数据。通过 AtomicInteger 和 Thread.sleep() 方法控制消费速度,并使用 log4j 记录日志信息。
private AtomicInteger count = new AtomicInteger(0);
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(pattern);
log.info('onMessage --> 消息通道是:' + channel);
RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer();
Object deserialize = valueSerializer.deserialize(message.getBody());
log.info('反序列化的结果:' + deserialize);
if (deserialize == null) return;
String md5DigestAsHex = DigestUtils.md5DigestAsHex(deserialize.toString().getBytes(StandardCharsets.UTF_8));
log.info('计算得到的key: ' + md5DigestAsHex);
Boolean result = redisTemplate.opsForValue().setIfAbsent(md5DigestAsHex, '1', 20, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(result)) {
// redis消息进行处理
log.info('接收的结果:' + deserialize.toString());
if (count.incrementAndGet() >= 10) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error('线程休眠异常: ' + e.getMessage());
} finally {
count.set(0);
}
}
} else {
log.info('其他服务处理中');
}
}
代码解析:
- AtomicInteger count: 用于记录每秒消费的 message 数量。
- count.incrementAndGet() >= 10: 当 count 的值大于等于 10 时,执行限速逻辑。
- Thread.sleep(1000): 让线程休眠 1 秒,控制每秒消费不超过 10 条消息。
- count.set(0): 重置 count 为 0,准备下一秒的计数。
注意:
- 该代码使用 log4j 记录日志信息,请根据实际情况修改日志配置。
- 您可以根据需要调整每秒消费的数量和休眠时间。
- 这种限速方法只能保证在大多数情况下每秒消费不超过 10 条消息,在极端情况下可能会有延迟。
其他方法:
除了使用 AtomicInteger 和 Thread.sleep(),还可以使用其他方法实现 Redis 消息消费限速,例如:
- 使用 Spring 的 RateLimiter 来实现限速。
- 使用 Redis 的 Lua 脚本实现限速。
- 使用第三方工具实现限速。
选择合适的限速方法取决于您的具体需求和场景。
原文地址: https://www.cveoy.top/t/topic/piek 著作权归作者所有。请勿转载和采集!