@Slf4j @Component public class RedisMessageListenerListener implements MessageListener { private AtomicInteger counter = new AtomicInteger(0); private Timer timer = new Timer(); @Resource private RedisTemplate<String, Object> redisTemplate; @Autowired private AsyncInstDataCapture asyncInstDataCapture;

/**
 * 消息处理
 *
 * @param message
 * @param pattern
 */
@Override
public void onMessage(Message message, byte[] pattern) {
    // 增加计数器
    counter.incrementAndGet();

    // 判断计数器是否超过限制
    if (counter.get() &lt;= 10) {
        // 处理消息
        String channel = new String(pattern);
        //log.info('onMessage --&gt; 消息通道是:{}', channel);

        RedisSerializer&lt;?&gt; valueSerializer = redisTemplate.getValueSerializer();
        Object deserialize = valueSerializer.deserialize(message.getBody());
        //log.info('反序列化的结果:{}', deserialize);
        if (deserialize == null) return;
        String md5DigestAsHex = DigestUtils.md5DigestAsHex(deserialize.toString().getBytes(StandardCharsets.UTF_8));
        //log.info('计算得到的key: {}', md5DigestAsHex);

        Boolean result = redisTemplate.opsForValue().setIfAbsent(md5DigestAsHex, '1', 20, TimeUnit.SECONDS);
        if (Boolean.TRUE.equals(result)) {
            // redis消息进行处理
            log.info('接收的结果:{}', deserialize.toString());
            if (StringUtil.isNotEmpty(deserialize.toString())) {
                HashMap hashMap = JSON.parseObject(deserialize.toString(), HashMap.class);
                String instid = hashMap.get('instid').toString();
                String bar = hashMap.get('bar').toString();
                switch (bar) {
                    case '1m':
                        asyncInstDataCapture.dataCapture1m(instid);
                        break;
                    case '5m':
                        asyncInstDataCapture.dataCapture5m(instid);
                        break;
                    case '15m':
                        asyncInstDataCapture.dataCapture15m(instid);
                        break;
                    case '30m':
                        asyncInstDataCapture.dataCapture30m(instid);
                        break;
                    case '1h':
                        asyncInstDataCapture.dataCapture1h(instid);
                        break;
                    case '4h':
                        asyncInstDataCapture.dataCapture4h(instid);
                        break;
                    default:
                }
            }
        } else {
            log.info('其他服务处理中');
        }
    } else {
        // 超过限制的消息存储到队列中
        redisTemplate.opsForList().leftPush('messageQueue', message.getBody());
    }

    // 定时器每秒重置计数器
    timer.schedule(new TimerTask() {
        @Override
        public void run() {
            counter.set(0);
        }
    }, 1200);
}

@Async
public void processMessageQueue() {
    while (true) {
        byte[] message = redisTemplate.opsForList().rightPop('messageQueue');
        if (message == null) {
            // 队列中没有消息,退出循环
            break;
        }

        // 处理消息
        // ...
    }
}

@PostConstruct
public void init() {
    processMessageQueue();
}

}


原文地址: http://www.cveoy.top/t/topic/pjfh 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录