Redis 消息队列消费 - 处理数据采集任务
Redis 消息队列消费 - 处理数据采集任务
该代码示例展示了如何使用 Redis 队列实现消息消费,并根据消息内容触发不同的数据采集任务。
private void consumeMessages() {
while (true) {
String message = redisTemplate.opsForList().rightPop('messageQueue', 1, TimeUnit.SECONDS);
if (message != null) {
// 处理消息
System.out.println('Received message: ' + message);
HashMap hashMap = JSON.parseObject(message, HashMap.class);
String instid = hashMap.get('instid').toString();
String bar = hashMap.get('bar').toString();
switch (bar) {
case '1m':
asyncInstDataCapture.dataCapture1m(instid);
break;
case '5m':
asyncInstDataCapture.dataCapture5m(instid);
break;
case '15m':
asyncInstDataCapture.dataCapture15m(instid);
break;
case '30m':
asyncInstDataCapture.dataCapture30m(instid);
break;
case '1h':
asyncInstDataCapture.dataCapture1h(instid);
break;
case '4h':
asyncInstDataCapture.dataCapture4h(instid);
break;
default:
}
try {
// 暂停120ms
Thread.sleep(120);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Received message: '{"instid":"1-AAVE-USDT-SWAP","bar":"1m"}'
21:51:07.058 [main] INFO c.a.c.s.SentinelWebMvcConfigurer - [addInterceptors,52] - [Sentinel Starter] register SentinelWebInterceptor with urlPatterns: [/**].
Exception in thread "TaskExecutor-1" com.alibaba.fastjson.JSONException: syntax error, expect {, actual string, pos 50, line 1, column 51'{"instid":"1-AAVE-USDT-SWAP","bar":"1m"}'
at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:197)
at com.alibaba.fastjson.parser.deserializer.MapDeserializer.deserialze(MapDeserializer.java:64)
at com.alibaba.fastjson.parser.deserializer.MapDeserializer.deserialze(MapDeserializer.java:41)
at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:671)
at com.alibaba.fastjson.JSON.parseObject(JSON.java:368)
at com.alibaba.fastjson.JSON.parseObject(JSON.java:272)
at com.alibaba.fastjson.JSON.parseObject(JSON.java:491)
at com.ai.datacapture.redis.MessageConsumer.consumeMessages(MessageConsumer.java:38)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
## 报错分析
根据报错信息来看,出现了`com.alibaba.fastjson.JSONException`异常,原因是 JSON 解析出错。具体地,在解析`message`时发生了错误。从错误信息来看,似乎是`message`的格式不符合 JSON 的语法。请检查`message`的值是否正确,并确保它是一个合法的 JSON 字符串。
解决方案
- 检查 message 的值: 确认 message 的值是否是一个合法的 JSON 字符串。
- 调试解析过程: 使用调试工具查看解析过程中
message的值,以及解析器遇到的具体错误。 - 验证 JSON 库版本: 检查 FastJSON 版本是否过旧,尝试更新到最新版本。
- 使用其他 JSON 库: 如果问题无法解决,可以尝试使用其他的 JSON 库,例如 Gson 或 Jackson。
总结
本示例代码展示了使用 Redis 队列实现消息消费,并通过消息内容触发不同的数据采集任务的典型流程。需要注意的是,在使用 JSON 库解析消息内容时,需要确保消息内容的格式正确,并及时解决解析过程中遇到的异常。
原文地址: https://www.cveoy.top/t/topic/pkMb 著作权归作者所有。请勿转载和采集!