@Slf4j @Component public class RedisMessageListenerListener implements MessageListener { @Resource private RedisTemplate<String, Object> redisTemplate; @Autowired private AsyncInstDataCapture asyncInstDataCapture;

private AtomicInteger counter = new AtomicInteger(0);

/**
 * 消息处理
 *
 * @param message
 * @param pattern
 */
@Override
public void onMessage(Message message, byte[] pattern) {
    String channel = new String(pattern);
    //log.info('onMessage --> 消息通道是:', channel);

    RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer();
    Object deserialize = valueSerializer.deserialize(message.getBody());
    //log.info('反序列化的结果:', deserialize);
    if (deserialize == null) return;
    String md5DigestAsHex = DigestUtils.md5DigestAsHex(deserialize.toString().getBytes(StandardCharsets.UTF_8));
    //log.info('计算得到的key: ', md5DigestAsHex);

    Boolean result = redisTemplate.opsForValue().setIfAbsent(md5DigestAsHex, '1', 20, TimeUnit.SECONDS);
    if (Boolean.TRUE.equals(result)) {
        // redis消息进行处理
        log.info('接收的结果:', deserialize.toString());
        HashMap hashMap = JSON.parseObject(deserialize.toString(), HashMap.class);
        String instid = hashMap.get('instid').toString();
        String bar = hashMap.get('bar').toString();
        switch (bar) {
            case '1m':
                asyncInstDataCapture.dataCapture1m(instid);
                break;
            case '5m':
                asyncInstDataCapture.dataCapture5m(instid);
                break;
            case '15m':
                asyncInstDataCapture.dataCapture15m(instid);
                break;
            case '30m':
                asyncInstDataCapture.dataCapture30m(instid);
                break;
            case '1h':
                asyncInstDataCapture.dataCapture1h(instid);
                break;
            case '4h':
                asyncInstDataCapture.dataCapture4h(instid);
                break;
            default:
        }
    } else {
        log.info('其他服务处理中');
    }

    try {
        log.info('进入休眠');
        Thread.sleep(5000);
    } catch (InterruptedException interruptedException) {
        interruptedException.printStackTrace();
    }

}

} 休眠没有生效内容:在RedisMessageListenerListener中,休眠的代码应该放在处理完消息后的逻辑之后。当前的代码是先处理完消息,再休眠。可以将休眠的代码放在消息处理的逻辑之后,如下所示:

@Override
public void onMessage(Message message, byte[] pattern) {
    // 消息处理逻辑

    try {
        // 休眠逻辑
        log.info('进入休眠');
        Thread.sleep(5000);
    } catch (InterruptedException interruptedException) {
        interruptedException.printStackTrace();
    }
}

这样休眠的代码就会在消息处理完后执行,实现休眠的效果。


原文地址: https://www.cveoy.top/t/topic/piyT 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录