OKX 币种历史 K 线数据采集服务实现
@Slf4j @Service public class InstHistoryCandlesServiceImpl implements InstHistoryCandlesService { @Value("${okx.url}") private String okxUrl;
@Autowired
private ProcedureMapper procedureMapper;
@Autowired
private InstListMapper instListMapper;
@Autowired
private InstHistoryCandles1hMapper instHistoryCandles1hMapper;
@Autowired
private InstHistoryCandles4hMapper instHistoryCandles4hMapper;
@Autowired
private AsyncInstDataCapture asyncInstDataCapture;
@Autowired
private RateLimiterUtil rateLimiterUtil;
@Override
public void instHistoryCandles1mBefore() {
log.info('采集1m产品数据(实时数据)');
//查询开启采集的币种
InstList list = new InstList();
list.setOpenDatacapture(1);
List<InstList> instLists = instListMapper.selectInstListList(list);
for (int i = 0; i < instLists.size(); i++) {
InstList instList = instLists.get(i);
//获取产品ID
String instid = instList.getInstid();
// 限速
rateLimiterUtil.acquire();
//异步方法
asyncInstDataCapture.dataCapture1m(instid);
// if ((i + 1) % 9 == 0) { // try { // Thread.sleep(1500); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } // if ((i + 1) % 9 != 0 && i == instLists.size() - 1) { // try { // Thread.sleep(1500); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } } }
@Override
public void instHistoryCandles5mBefore() {
log.info('采集5m产品数据(实时数据)');
//查询开启采集的币种
InstList list = new InstList();
list.setOpenDatacapture(1);
List<InstList> instLists = instListMapper.selectInstListList(list);
for (int i = 0; i < instLists.size(); i++) {
InstList instList = instLists.get(i);
//获取产品ID
String instid = instList.getInstid();
// 限速
rateLimiterUtil.acquire();
//异步方法
asyncInstDataCapture.dataCapture5m(instid);
// if ((i + 1) % 5 == 0) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } // if ((i + 1) % 5 != 0 && i == instLists.size() - 1) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } } }
@Override
public void instHistoryCandles15mBefore() {
log.info('采集15m产品数据(实时数据)');
//查询开启采集的币种
InstList list = new InstList();
list.setOpenDatacapture(1);
List<InstList> instLists = instListMapper.selectInstListList(list);
for (int i = 0; i < instLists.size(); i++) {
InstList instList = instLists.get(i);
//获取产品ID
String instid = instList.getInstid();
// 限速
rateLimiterUtil.acquire();
//异步方法
asyncInstDataCapture.dataCapture15m(instid);
// if ((i + 1) % 5 == 0) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } // if ((i + 1) % 5 != 0 && i == instLists.size() - 1) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } } }
@Override
public void instHistoryCandles30mBefore() {
log.info('采集30m产品数据(实时数据)');
//查询开启采集的币种
InstList list = new InstList();
list.setOpenDatacapture(1);
List<InstList> instLists = instListMapper.selectInstListList(list);
for (int i = 0; i < instLists.size(); i++) {
InstList instList = instLists.get(i);
//获取产品ID
String instid = instList.getInstid();
// 限速
rateLimiterUtil.acquire();
//异步方法
asyncInstDataCapture.dataCapture30m(instid);
// if ((i + 1) % 5 == 0) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } // if ((i + 1) % 5 != 0 && i == instLists.size() - 1) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } } }
@Override
public void instHistoryCandles1hBefore() {
//log.info('采集1h产品数据(实时数据)');
//查询开启采集的币种
InstList list = new InstList();
list.setOpenDatacapture(1);
List<InstList> instLists = instListMapper.selectInstListList(list);
for (int i = 0; i < instLists.size(); i++) {
InstList instList = instLists.get(i);
//获取产品ID
String instid = instList.getInstid();
// 限速
rateLimiterUtil.acquire();
//异步方法
asyncInstDataCapture.dataCapture1h(instid);
// if ((i + 1) % 5 == 0) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } // if ((i + 1) % 5 != 0 && i == instLists.size() - 1) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } } }
@Override
public void instHistoryCandles4hBefore() {
//log.info('采集4m产品数据(实时数据)');
//查询开启采集的币种
InstList list = new InstList();
list.setOpenDatacapture(1);
List<InstList> instLists = instListMapper.selectInstListList(list);
for (int i = 0; i < instLists.size(); i++) {
InstList instList = instLists.get(i);
//获取产品ID
String instid = instList.getInstid();
// 限速
rateLimiterUtil.acquire();
//异步方法
asyncInstDataCapture.dataCapture4h(instid);
// if ((i + 1) % 5 == 0) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } // if ((i + 1) % 5 != 0 && i == instLists.size() - 1) { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } } } }
asyncInstDataCapture.dataCapture4h(instid);是外部接口,限速20次/2s,如果优化限速内容:如果要优化限速,可以使用以下方法:
-
将限速逻辑移动到
asyncInstDataCapture.dataCapture4h(instid)方法中,而不是在InstHistoryCandlesServiceImpl中进行限速。这样可以确保每个异步方法都受到限速限制。 -
使用一个计数器来跟踪已经发送的请求数量,并在达到限速阈值时进行暂停等待。可以使用
AtomicInteger来实现计数器。 -
在
asyncInstDataCapture.dataCapture4h(instid)方法中,检查计数器的值是否达到限速阈值。如果达到了限速阈值,则使用Thread.sleep()方法进行暂停等待。
下面是修改后的代码示例:
@Slf4j
@Service
public class InstHistoryCandlesServiceImpl implements InstHistoryCandlesService {
@Value("${okx.url}")
private String okxUrl;
@Autowired
private ProcedureMapper procedureMapper;
@Autowired
private InstListMapper instListMapper;
@Autowired
private InstHistoryCandles1hMapper instHistoryCandles1hMapper;
@Autowired
private InstHistoryCandles4hMapper instHistoryCandles4hMapper;
@Autowired
private AsyncInstDataCapture asyncInstDataCapture;
@Autowired
private RateLimiterUtil rateLimiterUtil;
private AtomicInteger requestCounter = new AtomicInteger(0);
@Override
public void instHistoryCandles1mBefore() {
log.info('采集1m产品数据(实时数据)');
//查询开启采集的币种
InstList list = new InstList();
list.setOpenDatacapture(1);
List<InstList> instLists = instListMapper.selectInstListList(list);
for (int i = 0; i < instLists.size(); i++) {
InstList instList = instLists.get(i);
//获取产品ID
String instid = instList.getInstid();
//异步方法
asyncInstDataCapture.dataCapture1m(instid);
// 增加计数器
int counter = requestCounter.incrementAndGet();
// 检查是否达到限速阈值
if (counter % 9 == 0) {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 其他方法类似,只需要修改相应的异步方法和限速阈值即可
}
在asyncInstDataCapture.dataCapture4h(instid)方法中,可以添加类似的限速逻辑:
public void dataCapture4h(String instid) {
// 限速逻辑
int counter = requestCounter.incrementAndGet();
if (counter % 5 == 0) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 其他业务逻辑
}
以上代码示例中,计数器requestCounter使用AtomicInteger来确保线程安全。每次发送请求时,计数器增加1,并检查是否达到限速阈值。如果达到限速阈值,则进行暂停等待。这样可以确保每个请求都受到限速限制。
原文地址: https://www.cveoy.top/t/topic/pbRm 著作权归作者所有。请勿转载和采集!