Redis Pub/Sub 队列实现数据采集任务调度

本文将介绍使用Redis Pub/Sub 队列实现数据采集任务调度的方案,并提供代码示例。该方案可以有效地将数据采集任务与主业务逻辑解耦,提高系统的可扩展性和性能。

代码示例

发送消息

Map map = new HashMap();
map.put('bar', '1m');
map.put('instid', instid);
pubSubRedisQueue.sendMessage(JSON.toJSONString(map));

接收消息

@Service
public class PubSubRedisQueue {
    //队列名
    public static final String KEY = 'messageQueue';

    @Resource
    private RedisTemplate<String, Object> redisTemplate;


    public void sendMessage(String message) {
        redisTemplate.opsForList().leftPush(KEY, message);
    }

}

private void consumeMessages() {
    while (true) {
        String message = redisTemplate.opsForList().rightPop('messageQueue', 1, TimeUnit.SECONDS);
        if (message != null) {
            // 处理消息
            System.out.println('Received message: ' + message);
            HashMap hashMap = JSON.parseObject(message, HashMap.class);
            String instid = hashMap.get('instid').toString();
            String bar = hashMap.get('bar').toString();
            switch (bar) {
                case '1m':
                    asyncInstDataCapture.dataCapture1m(instid);
                    break;
                case '5m':
                    asyncInstDataCapture.dataCapture5m(instid);
                    break;
                case '15m':
                    asyncInstDataCapture.dataCapture15m(instid);
                    break;
                case '30m':
                    asyncInstDataCapture.dataCapture30m(instid);
                    break;
                case '1h':
                    asyncInstDataCapture.dataCapture1h(instid);
                    break;
                case '4h':
                    asyncInstDataCapture.dataCapture4h(instid);
                    break;
                default:
            }
            try {
                // 暂停120ms
                Thread.sleep(120);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

报错分析

报错信息:

Received message: {"instid":"1-AAVE-USDT-SWAP","bar":"1m"}
21:51:07.058 [main] INFO  c.a.c.s.SentinelWebMvcConfigurer - [addInterceptors,52] - [Sentinel Starter] register SentinelWebInterceptor with urlPatterns: [/**].
Exception in thread "TaskExecutor-1" com.alibaba.fastjson.JSONException: syntax error, expect {, actual string, pos 50, line 1, column 51"{"instid":"1-AAVE-USDT-SWAP","bar":"1m"}"
	at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:197)
	at com.alibaba.fastjson.parser.deserializer.MapDeserializer.deserialze(MapDeserializer.java:64)
	at com.alibaba.fastjson.parser.deserializer.MapDeserializer.deserialze(MapDeserializer.java:41)
	at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:671)
	at com.alibaba.fastjson.JSON.parseObject(JSON.java:368)
	at com.alibaba.fastjson.JSON.parseObject(JSON.java:272)
	at com.alibaba.fastjson.JSON.parseObject(JSON.java:491)
	at com.ai.datacapture.redis.MessageConsumer.consumeMessages(MessageConsumer.java:38)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

报错内容:报错信息显示是在使用Fastjson解析JSON字符串时出现了语法错误。具体原因是字符串中存在非法的JSON格式字符。根据报错提示,问题出现在字符串的第51个字符位置,即在"{"后面的引号处。

根据代码和报错信息,可以推断出可能是在发送消息时,JSON字符串的格式出现了问题。建议检查发送消息的地方,确保生成的JSON字符串格式正确,没有额外的引号或转义字符。

另外,也可以尝试使用其他的JSON解析库,例如Jackson或Gson,看是否能解析该字符串。这样可以确认是Fastjson的问题还是字符串本身的问题。

总结

本文介绍了使用Redis Pub/Sub 队列实现数据采集任务调度的方案,并提供了代码示例。该方案可以有效地将数据采集任务与主业务逻辑解耦,提高系统的可扩展性和性能。在实际应用中,需要根据具体情况选择合适的JSON解析库,并确保发送的消息格式正确。


原文地址: https://www.cveoy.top/t/topic/pkMh 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录