Redis 消息队列消费 - 处理数据采集任务

该代码示例展示了如何使用 Redis 队列实现消息消费,并根据消息内容触发不同的数据采集任务。

private void consumeMessages() {
    while (true) {
        String message = redisTemplate.opsForList().rightPop('messageQueue', 1, TimeUnit.SECONDS);
        if (message != null) {
            // 处理消息
            System.out.println('Received message: ' + message);
            HashMap hashMap = JSON.parseObject(message, HashMap.class);
            String instid = hashMap.get('instid').toString();
            String bar = hashMap.get('bar').toString();
            switch (bar) {
                case '1m':
                    asyncInstDataCapture.dataCapture1m(instid);
                    break;
                case '5m':
                    asyncInstDataCapture.dataCapture5m(instid);
                    break;
                case '15m':
                    asyncInstDataCapture.dataCapture15m(instid);
                    break;
                case '30m':
                    asyncInstDataCapture.dataCapture30m(instid);
                    break;
                case '1h':
                    asyncInstDataCapture.dataCapture1h(instid);
                    break;
                case '4h':
                    asyncInstDataCapture.dataCapture4h(instid);
                    break;
                default:
            }
            try {
                // 暂停120ms
                Thread.sleep(120);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Received message: '{"instid":"1-AAVE-USDT-SWAP","bar":"1m"}'
21:51:07.058 [main] INFO  c.a.c.s.SentinelWebMvcConfigurer - [addInterceptors,52] - [Sentinel Starter] register SentinelWebInterceptor with urlPatterns: [/**].
Exception in thread "TaskExecutor-1" com.alibaba.fastjson.JSONException: syntax error, expect {, actual string, pos 50, line 1, column 51'{"instid":"1-AAVE-USDT-SWAP","bar":"1m"}'
	at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:197)
	at com.alibaba.fastjson.parser.deserializer.MapDeserializer.deserialze(MapDeserializer.java:64)
	at com.alibaba.fastjson.parser.deserializer.MapDeserializer.deserialze(MapDeserializer.java:41)
	at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:671)
	at com.alibaba.fastjson.JSON.parseObject(JSON.java:368)
	at com.alibaba.fastjson.JSON.parseObject(JSON.java:272)
	at com.alibaba.fastjson.JSON.parseObject(JSON.java:491)
	at com.ai.datacapture.redis.MessageConsumer.consumeMessages(MessageConsumer.java:38)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

## 报错分析

根据报错信息来看,出现了`com.alibaba.fastjson.JSONException`异常,原因是 JSON 解析出错。具体地,在解析`message`时发生了错误。从错误信息来看,似乎是`message`的格式不符合 JSON 的语法。请检查`message`的值是否正确,并确保它是一个合法的 JSON 字符串。

解决方案

  1. 检查 message 的值: 确认 message 的值是否是一个合法的 JSON 字符串。
  2. 调试解析过程: 使用调试工具查看解析过程中 message 的值,以及解析器遇到的具体错误。
  3. 验证 JSON 库版本: 检查 FastJSON 版本是否过旧,尝试更新到最新版本。
  4. 使用其他 JSON 库: 如果问题无法解决,可以尝试使用其他的 JSON 库,例如 Gson 或 Jackson。

总结

本示例代码展示了使用 Redis 队列实现消息消费,并通过消息内容触发不同的数据采集任务的典型流程。需要注意的是,在使用 JSON 库解析消息内容时,需要确保消息内容的格式正确,并及时解决解析过程中遇到的异常。


原文地址: https://www.cveoy.top/t/topic/pkMb 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录