Redis Pub/Sub 队列实现数据采集任务调度
Redis Pub/Sub 队列实现数据采集任务调度
本文将介绍使用Redis Pub/Sub 队列实现数据采集任务调度的方案,并提供代码示例。该方案可以有效地将数据采集任务与主业务逻辑解耦,提高系统的可扩展性和性能。
代码示例
发送消息
Map map = new HashMap();
map.put('bar', '1m');
map.put('instid', instid);
pubSubRedisQueue.sendMessage(JSON.toJSONString(map));
接收消息
@Service
public class PubSubRedisQueue {
//队列名
public static final String KEY = 'messageQueue';
@Resource
private RedisTemplate<String, Object> redisTemplate;
public void sendMessage(String message) {
redisTemplate.opsForList().leftPush(KEY, message);
}
}
private void consumeMessages() {
while (true) {
String message = redisTemplate.opsForList().rightPop('messageQueue', 1, TimeUnit.SECONDS);
if (message != null) {
// 处理消息
System.out.println('Received message: ' + message);
HashMap hashMap = JSON.parseObject(message, HashMap.class);
String instid = hashMap.get('instid').toString();
String bar = hashMap.get('bar').toString();
switch (bar) {
case '1m':
asyncInstDataCapture.dataCapture1m(instid);
break;
case '5m':
asyncInstDataCapture.dataCapture5m(instid);
break;
case '15m':
asyncInstDataCapture.dataCapture15m(instid);
break;
case '30m':
asyncInstDataCapture.dataCapture30m(instid);
break;
case '1h':
asyncInstDataCapture.dataCapture1h(instid);
break;
case '4h':
asyncInstDataCapture.dataCapture4h(instid);
break;
default:
}
try {
// 暂停120ms
Thread.sleep(120);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
报错分析
报错信息:
Received message: {"instid":"1-AAVE-USDT-SWAP","bar":"1m"}
21:51:07.058 [main] INFO c.a.c.s.SentinelWebMvcConfigurer - [addInterceptors,52] - [Sentinel Starter] register SentinelWebInterceptor with urlPatterns: [/**].
Exception in thread "TaskExecutor-1" com.alibaba.fastjson.JSONException: syntax error, expect {, actual string, pos 50, line 1, column 51"{"instid":"1-AAVE-USDT-SWAP","bar":"1m"}"
at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:197)
at com.alibaba.fastjson.parser.deserializer.MapDeserializer.deserialze(MapDeserializer.java:64)
at com.alibaba.fastjson.parser.deserializer.MapDeserializer.deserialze(MapDeserializer.java:41)
at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:671)
at com.alibaba.fastjson.JSON.parseObject(JSON.java:368)
at com.alibaba.fastjson.JSON.parseObject(JSON.java:272)
at com.alibaba.fastjson.JSON.parseObject(JSON.java:491)
at com.ai.datacapture.redis.MessageConsumer.consumeMessages(MessageConsumer.java:38)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
报错内容:报错信息显示是在使用Fastjson解析JSON字符串时出现了语法错误。具体原因是字符串中存在非法的JSON格式字符。根据报错提示,问题出现在字符串的第51个字符位置,即在"{"后面的引号处。
根据代码和报错信息,可以推断出可能是在发送消息时,JSON字符串的格式出现了问题。建议检查发送消息的地方,确保生成的JSON字符串格式正确,没有额外的引号或转义字符。
另外,也可以尝试使用其他的JSON解析库,例如Jackson或Gson,看是否能解析该字符串。这样可以确认是Fastjson的问题还是字符串本身的问题。
总结
本文介绍了使用Redis Pub/Sub 队列实现数据采集任务调度的方案,并提供了代码示例。该方案可以有效地将数据采集任务与主业务逻辑解耦,提高系统的可扩展性和性能。在实际应用中,需要根据具体情况选择合适的JSON解析库,并确保发送的消息格式正确。
原文地址: https://www.cveoy.top/t/topic/pkMh 著作权归作者所有。请勿转载和采集!