@Slf4j @Service public class InstHistoryCandlesServiceImpl implements InstHistoryCandlesService { @Value("${okx.url}") private String okxUrl;

@Autowired
private ProcedureMapper procedureMapper;
@Autowired
private InstListMapper instListMapper;
@Autowired
private InstHistoryCandles1hMapper instHistoryCandles1hMapper;
@Autowired
private InstHistoryCandles4hMapper instHistoryCandles4hMapper;
@Autowired
private AsyncInstDataCapture asyncInstDataCapture;
@Autowired
private RateLimiterUtil rateLimiterUtil;


 @Override
public void instHistoryCandles1mBefore() {
    log.info('采集1m产品数据(实时数据)');
    //查询开启采集的币种
    InstList list = new InstList();
    list.setOpenDatacapture(1);
    List<InstList> instLists = instListMapper.selectInstListList(list);
    for (int i = 0; i < instLists.size(); i++) {
        InstList instList = instLists.get(i);
        //获取产品ID
        String instid = instList.getInstid();
        // 限速
        rateLimiterUtil.acquire();
        //异步方法
        asyncInstDataCapture.dataCapture1m(instid);

// if ((i + 1) % 9 == 0) { // try { // Thread.sleep(1500); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } // if ((i + 1) % 9 != 0 && i == instLists.size() - 1) { // try { // Thread.sleep(1500); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } } }

@Override
public void instHistoryCandles5mBefore() {
    log.info('采集5m产品数据(实时数据)');
    //查询开启采集的币种
    InstList list = new InstList();
    list.setOpenDatacapture(1);
    List<InstList> instLists = instListMapper.selectInstListList(list);
    for (int i = 0; i < instLists.size(); i++) {
        InstList instList = instLists.get(i);
        //获取产品ID
        String instid = instList.getInstid();
        // 限速
        rateLimiterUtil.acquire();
        //异步方法
        asyncInstDataCapture.dataCapture5m(instid);

// if ((i + 1) % 5 == 0) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } // if ((i + 1) % 5 != 0 && i == instLists.size() - 1) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } } }

@Override
public void instHistoryCandles15mBefore() {
    log.info('采集15m产品数据(实时数据)');
    //查询开启采集的币种
    InstList list = new InstList();
    list.setOpenDatacapture(1);
    List<InstList> instLists = instListMapper.selectInstListList(list);
    for (int i = 0; i < instLists.size(); i++) {
        InstList instList = instLists.get(i);
        //获取产品ID
        String instid = instList.getInstid();
        // 限速
        rateLimiterUtil.acquire();
        //异步方法
        asyncInstDataCapture.dataCapture15m(instid);

// if ((i + 1) % 5 == 0) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } // if ((i + 1) % 5 != 0 && i == instLists.size() - 1) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } } }

@Override
public void instHistoryCandles30mBefore() {
    log.info('采集30m产品数据(实时数据)');
    //查询开启采集的币种
    InstList list = new InstList();
    list.setOpenDatacapture(1);
    List<InstList> instLists = instListMapper.selectInstListList(list);
    for (int i = 0; i < instLists.size(); i++) {
        InstList instList = instLists.get(i);
        //获取产品ID
        String instid = instList.getInstid();
        // 限速
        rateLimiterUtil.acquire();
        //异步方法
        asyncInstDataCapture.dataCapture30m(instid);

// if ((i + 1) % 5 == 0) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } // if ((i + 1) % 5 != 0 && i == instLists.size() - 1) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } } }

@Override
public void instHistoryCandles1hBefore() {
    //log.info('采集1h产品数据(实时数据)');
    //查询开启采集的币种
    InstList list = new InstList();
    list.setOpenDatacapture(1);
    List<InstList> instLists = instListMapper.selectInstListList(list);
    for (int i = 0; i < instLists.size(); i++) {
        InstList instList = instLists.get(i);
        //获取产品ID
        String instid = instList.getInstid();
        // 限速
        rateLimiterUtil.acquire();
        //异步方法
        asyncInstDataCapture.dataCapture1h(instid);

// if ((i + 1) % 5 == 0) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } // if ((i + 1) % 5 != 0 && i == instLists.size() - 1) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } } }

@Override
public void instHistoryCandles4hBefore() {
    //log.info('采集4m产品数据(实时数据)');
    //查询开启采集的币种
    InstList list = new InstList();
    list.setOpenDatacapture(1);
    List<InstList> instLists = instListMapper.selectInstListList(list);
    for (int i = 0; i < instLists.size(); i++) {
        InstList instList = instLists.get(i);
        //获取产品ID
        String instid = instList.getInstid();
        // 限速
        rateLimiterUtil.acquire();
        //异步方法
        asyncInstDataCapture.dataCapture4h(instid);

// if ((i + 1) % 5 == 0) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } // if ((i + 1) % 5 != 0 && i == instLists.size() - 1) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } } } }

asyncInstDataCapture.dataCapture4h(instid);是外部接口,限速20次/2s,如果优化限速内容:如果要优化限速,可以使用以下方法:

  1. 将限速逻辑移动到asyncInstDataCapture.dataCapture4h(instid)方法中,而不是在InstHistoryCandlesServiceImpl中进行限速。这样可以确保每个异步方法都受到限速限制。

  2. 使用一个计数器来跟踪已经发送的请求数量,并在达到限速阈值时进行暂停等待。可以使用AtomicInteger来实现计数器。

  3. asyncInstDataCapture.dataCapture4h(instid)方法中,检查计数器的值是否达到限速阈值。如果达到了限速阈值,则使用Thread.sleep()方法进行暂停等待。

下面是修改后的代码示例:

@Slf4j
@Service
public class InstHistoryCandlesServiceImpl implements InstHistoryCandlesService {
    @Value("${okx.url}")
    private String okxUrl;

    @Autowired
    private ProcedureMapper procedureMapper;
    @Autowired
    private InstListMapper instListMapper;
    @Autowired
    private InstHistoryCandles1hMapper instHistoryCandles1hMapper;
    @Autowired
    private InstHistoryCandles4hMapper instHistoryCandles4hMapper;
    @Autowired
    private AsyncInstDataCapture asyncInstDataCapture;
    @Autowired
    private RateLimiterUtil rateLimiterUtil;

    private AtomicInteger requestCounter = new AtomicInteger(0);

    @Override
    public void instHistoryCandles1mBefore() {
        log.info('采集1m产品数据(实时数据)');
        //查询开启采集的币种
        InstList list = new InstList();
        list.setOpenDatacapture(1);
        List<InstList> instLists = instListMapper.selectInstListList(list);
        for (int i = 0; i < instLists.size(); i++) {
            InstList instList = instLists.get(i);
            //获取产品ID
            String instid = instList.getInstid();
            //异步方法
            asyncInstDataCapture.dataCapture1m(instid);
            // 增加计数器
            int counter = requestCounter.incrementAndGet();
            // 检查是否达到限速阈值
            if (counter % 9 == 0) {
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    // 其他方法类似,只需要修改相应的异步方法和限速阈值即可
}

asyncInstDataCapture.dataCapture4h(instid)方法中,可以添加类似的限速逻辑:

public void dataCapture4h(String instid) {
    // 限速逻辑
    int counter = requestCounter.incrementAndGet();
    if (counter % 5 == 0) {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 其他业务逻辑
}

以上代码示例中,计数器requestCounter使用AtomicInteger来确保线程安全。每次发送请求时,计数器增加1,并检查是否达到限速阈值。如果达到限速阈值,则进行暂停等待。这样可以确保每个请求都受到限速限制。

OKX 币种历史 K 线数据采集服务实现

原文地址: https://www.cveoy.top/t/topic/pbRm 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录