使用 Semaphore 控制 Redis 消息消费速度 - 每秒仅处理 10 条消息
@Override public void onMessage(Message message, byte[] pattern) { String channel = new String(pattern); log.info('onMessage --> 消息通道是:', channel);
RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer();
Object deserialize = valueSerializer.deserialize(message.getBody());
log.info('反序列化的结果:', deserialize);
if (deserialize == null) return;
String md5DigestAsHex = DigestUtils.md5DigestAsHex(deserialize.toString().getBytes(StandardCharsets.UTF_8));
log.info('计算得到的key:', md5DigestAsHex);
Boolean result = redisTemplate.opsForValue().setIfAbsent(md5DigestAsHex, '1', 20, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(result)) {
// redis消息进行处理
log.info('接收的结果:', deserialize.toString());
} else {
log.info('其他服务处理中');
}
}
每秒只消费10条数据,不抛异常内容:可以使用 Semaphore 来控制每秒只消费 10 条数据。具体实现如下:
private final Semaphore semaphore = new Semaphore(10);
@Override
public void onMessage(Message message, byte[] pattern) {
try {
semaphore.acquire();
// 消息处理逻辑
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error('Semaphore acquire interrupted', e);
} finally {
semaphore.release();
}
}
在处理消息之前,使用 semaphore.acquire() 来获取许可证,如果没有可用的许可证则阻塞线程。处理消息完成后,使用 semaphore.release() 释放许可证。这样每秒只会有 10 个线程同时处理消息,超过 10 个的请求将被阻塞等待。
原文地址: https://www.cveoy.top/t/topic/pieh 著作权归作者所有。请勿转载和采集!