WebSocket 客户端 - OKX API 实时数据订阅指南

本指南详细介绍使用 WebSocketClient 类连接 OKX API 获取实时数据,包括账户余额、持仓、订单等信息的订阅方法。

WebSocketClient 类

@Slf4j
@Component
public class WebSocketClient {
    private static WebSocket webSocket = null;
    private static Boolean flag = false;
    private static Boolean isConnect = false;
    private static String sign;
    private final static HashFunction crc32 = Hashing.crc32();
    private final static ObjectReader objectReader = new ObjectMapper().readerFor(OrderBookData.class);
    private static Map<String, Optional<SpotOrderBook>> bookMap = new HashMap<>();

    public WebSocketClient() {
    }


    @Resource
    private OrderSubscribeService orderSubscribeService;

    private static WebSocketClient socketClient;

    @PostConstruct
    public void init() {
        socketClient = this;
        socketClient.orderSubscribeService = this.orderSubscribeService;
    }

    public static void setRedis(String data) {
        socketClient.orderSubscribeService.onMessage(data);
    }


    // 与服务器建立连接,参数为服务器的 URL
    public static WebSocket connection(final String url) {


        OkHttpClient client = new OkHttpClient.Builder()
                .readTimeout(5, TimeUnit.SECONDS)
                .build();
        Request request = new Request.Builder()
                .url(url)
                .build();

        webSocket = client.newWebSocket(request, new WebSocketListener() {
            ScheduledExecutorService service;


            @Override
            public void onOpen(final WebSocket webSocket, final Response response) {
                // 连接成功后,设置定时器,每隔 25s,自动向服务器发送心跳,保持与服务器连接
                isConnect = true;
                System.out.println(Instant.now().toString() + ' Connected to the server success!');
                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        // task to run goes here
                        sendMessage('ping');
                    }
                };
                service = Executors.newSingleThreadScheduledExecutor();
                // 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间
                service.scheduleAtFixedRate(runnable, 25, 25, TimeUnit.SECONDS);
            }

            @Override
            public void onClosing(WebSocket webSocket, int code, String reason) {
                System.out.println('Connection is about to disconnect!');
                webSocket.close(1000, 'Long time no message was sent or received!');
                webSocket = null;
            }

            @Override
            public void onClosed(final WebSocket webSocket, final int code, final String reason) {
                System.out.println('Connection dropped!');
            }

            @Override
            public void onFailure(final WebSocket webSocket, final Throwable t, final Response response) {
                System.out.println('Connection failed,Please reconnect!');
                if (Objects.nonNull(service)) {
                    service.shutdown();
                    reConnect();
                }
            }

            @Override
            public void onMessage(final WebSocket webSocket, final String s) {
                setRedis(s);
                if (s.contains('pong')) {
                    System.out.println(DateFormatUtils.format(new Date(), DateUtils.TIME_STYLE_S4) + ' Receive: ' + s);
                }
                if (null != s && s.contains('login')) {
                    if (s.endsWith('true}')) {
                        flag = true;
                    }
                }

            }
        });
        return webSocket;



    }

    public static void reConnect() {
        log.info('重连:{};{}');
        try {
            WebSocketConfig.publicConnect(socketClient);
            // 开启订阅
            // socketClient.orderSubscribeService.subscribe();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    private static void isLogin(String s) {
        if (null != s && s.contains('login')) {
            if (s.endsWith('true}')) {
                flag = true;
            }
        }
    }

    // 获得 sign
    private static String sha256_HMAC(String message, String secret) {
        String hash = '';
        try {
            Mac sha256_HMAC = Mac.getInstance('HmacSHA256');
            SecretKeySpec secret_key = new SecretKeySpec(secret.getBytes(CharsetEnum.UTF_8.charset()), 'HmacSHA256');
            sha256_HMAC.init(secret_key);
            byte[] bytes = sha256_HMAC.doFinal(message.getBytes(CharsetEnum.UTF_8.charset()));
            hash = Base64.getEncoder().encodeToString(bytes);
        } catch (Exception e) {
            System.out.println('Error HmacSHA256 ===========' + e.getMessage());
        }
        return hash;
    }

    private static String listToJson(List<Map> list) {
        JSONArray jsonArray = new JSONArray();
        for (Map map : list) {
            jsonArray.add(JSONObject.fromObject(map));
        }
        return jsonArray.toJSONString();
    }

    // 登录
    public static void login(String apiKey, String passPhrase, String secretKey) {
        String timestamp = System.currentTimeMillis() / 1000 + '';
        String message = timestamp + 'GET' + '/users/self/verify';
        sign = sha256_HMAC(message, secretKey);
        List<Map<String, Object>> args = new ArrayList<>();
        Map<String, Object> data = new HashMap<>();
        data.put('apiKey', apiKey);
        data.put('passphrase', passPhrase);
        data.put('timestamp', timestamp);
        data.put('sign', sign);
        args.add(data);
        Map<String, Object> op = new HashMap<>();
        op.put('op', 'login');
        op.put('args', args);
        sendMessage(JSON.toJSONString(op));
    }


    // 订阅,参数为频道组成的集合
    public static void subscribe(List<Map> list) {
        String s = listToJson(list);
        String str = '{\'op\': \'subscribe\', \'args\':' + s + '}';
        if (null != webSocket) {
            sendMessage(str);
        }
    }

    // 取消订阅,参数为频道组成的集合
    public static void unsubscribe(List<Map> list) {
        String s = listToJson(list);
        String str = '{\'op\': \'unsubscribe\', \'args\':' + s + '}';
        if (null != webSocket) {
            sendMessage(str);
        }
    }

    private static void sendMessage(String str) {
        if (null != webSocket) {
            try {
                Thread.sleep(1300);
            } catch (Exception e) {
                e.printStackTrace();
            }

            System.out.println(DateFormatUtils.format(new Date(), DateUtils.TIME_STYLE_S4) + 'Send a message to the server:' + str);
            webSocket.send(str);
        } else {
            System.out.println('Please establish the connection before you operate it!');
        }
    }

    // 断开连接
    public static void closeConnection() {
        if (null != webSocket) {
            webSocket.close(1000, 'User actively closes the connection');
        } else {
            System.out.println('Please establish the connection before you operate it!');
        }
    }

    public boolean getIsLogin() {
        return flag;
    }

    public boolean getIsConnect() {
        return isConnect;
    }

    public static <T extends OrderBookItem> int checksum(List<T> asks, List<T> bids) {
        System.out.println('深度');
        StringBuilder s = new StringBuilder();
        for (int i = 0; i < 25; i++) {
            if (i < bids.size()) {
                s.append(bids.get(i).getPrice().toString());
                s.append(':');
                s.append(bids.get(i).getSize());
                s.append(':');
            }
            if (i < asks.size()) {
                s.append(asks.get(i).getPrice().toString());
                s.append(':');
                s.append(asks.get(i).getSize());
                s.append(':');
            }
        }
        final String str;
        if (s.length() > 0) {
            str = s.substring(0, s.length() - 1);
        } else {
            str = '';
        }

        return crc32.hashString(str, StandardCharsets.UTF_8).asInt();
    }

    private static <T extends OrderBookItem> String getStr(List<T> asks, List<T> bids) {
        StringBuilder s = new StringBuilder();
        for (int i = 0; i < 25; i++) {
            if (i < bids.size()) {
                s.append(bids.get(i).getPrice().toString());
                s.append(':');
                s.append(bids.get(i).getSize());
                s.append(':');
            }
            if (i < asks.size()) {
                s.append(asks.get(i).getPrice().toString());
                s.append(':');
                s.append(asks.get(i).getSize());
                s.append(':');
            }
        }
        final String str;
        if (s.length() > 0) {
            str = s.substring(0, s.length() - 1);
        } else {
            str = '';
        }
        return str;
    }

    public static Optional<SpotOrderBook> parse(String json) {

        try {
            OrderBookData data = objectReader.readValue(json);
            List<SpotOrderBookItem> asks =
                    data.getAsks().stream().map(x -> new SpotOrderBookItem(new String(x.get(0)), x.get(1), x.get(2), x.get(3)))
                            .collect(Collectors.toList());

            List<SpotOrderBookItem> bids =
                    data.getBids().stream().map(x -> new SpotOrderBookItem(new String(x.get(0)), x.get(1), x.get(2), x.get(3)))
                            .collect(Collectors.toList());

            return Optional.of(new SpotOrderBook(asks, bids, data.getTs(), data.getChecksum()));
        } catch (Exception e) {
            return Optional.empty();
        }
    }

    @Data
    public static class OrderBookData {
        private List<List<String>> asks;
        private List<List<String>> bids;
        private String ts;
        private int checksum;

        public List<List<String>> getAsks() {
            return asks;
        }

        public void setAsks(List<List<String>> asks) {
            this.asks = asks;
        }

        public List<List<String>> getBids() {
            return bids;
        }

        public void setBids(List<List<String>> bids) {
            this.bids = bids;
        }

        public String getTs() {
            return ts;
        }

        public void setTs(String ts) {
            this.ts = ts;
        }

        public int getChecksum() {
            return checksum;
        }

        public void setChecksum(int checksum) {
            this.checksum = checksum;
        }
    }

}

WebSocketConfig 类

@Component
public class WebSocketConfig {

    private static  String SERVICE_URL_PUBLIC;
    private static  String SERVICE_URL_PRIVATE;
    @Value('${okx.webSocket.public}')
    private String webSocketPublic;
    @Value('${okx.webSocket.private}')
    private String webSocketPrivate;

    @PostConstruct
    public void getByWebSocket(){
        SERVICE_URL_PUBLIC=this.webSocketPublic;
        SERVICE_URL_PRIVATE=this.webSocketPrivate;
    }


    public static void publicConnect(WebSocketClient webSocketClient) {
        System.out.println(SERVICE_URL_PUBLIC);
        WebSocketClient.connection(SERVICE_URL_PUBLIC);
    }

    public static void loginConnect(WebSocketClient webSocketClient,String apiKey,String passphrase,String secretKey) {
        System.out.println(SERVICE_URL_PRIVATE);
        // 与服务器建立连接
        WebSocketClient.connection(SERVICE_URL_PRIVATE);
        // 登录账号,用户需提供 api-key,passphrase,secret—key 不要随意透漏 ^_^ 
        WebSocketClient.login(apiKey,passphrase,secretKey);
    }

    public static void closeConnection(WebSocketClient webSocketClient) {
        WebSocketClient.closeConnection();
    }
}

@Slf4j
@Service
public class OrderSubscribeServiceImpl implements OrderSubscribeService {
    private static final WebSocketClient webSocketClient = new WebSocketClient();
    @Value('${okx.url}')
    private String okxUrl;
    @Resource
    private AiPlanDetailedMapper aiPlanDetailedMapper;
    @Resource
    private AiPlanMapper aiPlanMapper;
    @Autowired
    private UserAccountMapper userAccountMapper;
    @Autowired
    private UserAccountBalanceMapper userAccountBalanceMapper;
    @Autowired
    private UserToInstMapper userToInstMapper;
    @Autowired
    private InstListMapper instListMapper;


    @Override
    public void orderSubscribe(String apikey, String secretkey, String passphrase) {
        log.info('订单频道');
        WebSocketConfig.loginConnect(webSocketClient, apikey, passphrase, secretkey);
        // 添加订阅频道
        ArrayList<Map> channelLists = new ArrayList<>();
        Map spotTickerMap = new HashMap();
        spotTickerMap.put('channel', 'orders');
        spotTickerMap.put('instType', 'SWAP');
        channelLists.add(spotTickerMap);
        // 调用订阅方法
        WebSocketClient.subscribe(channelLists);
    }

    @Override
    public void subscribe(String apikey, String secretkey, String passphrase) {
        log.info('账户余额和持仓频道');
        WebSocketConfig.loginConnect(webSocketClient, apikey, passphrase, secretkey);
        ArrayList<Map> channelLists = new ArrayList<>();
        Map spotTickerMap = new HashMap();
        spotTickerMap.put('channel', 'balance_and_position');
        channelLists.add(spotTickerMap);
        // 调用订阅方法
        WebSocketClient.subscribe(channelLists);
    }
}
账户余额和持仓频道 websocket  需要登录验证,多个账户订单,为什么只有最后一个账户建立了连接 内容:在代码中,WebSocketClient是一个单例类,通过静态的getInstance方法获取实例。在实例化WebSocketClient时,会通过@PostConstruct注解的init方法来设置OrderSubscribeService,并通过setRedis方法将OrderSubscribeService的实例传递给WebSocketClient类。

在OrderSubscribeServiceImpl中,通过调用WebSocketConfig的loginConnect方法来建立连接,然后通过调用WebSocketClient的subscribe方法来订阅频道。

问题可能出在WebSocketClient类中的静态变量socketClient上。在reConnect方法中,调用了WebSocketConfig的publicConnect方法,该方法会重新建立连接并订阅频道。但是在reConnect方法中,并没有将重新建立的WebSocketClient实例赋值给socketClient变量,导致后续调用subscribe方法时,始终使用的是最后一个建立连接的WebSocketClient实例。

解决方法是在reConnect方法中将重新建立的WebSocketClient实例赋值给socketClient变量,即将以下代码添加到reConnect方法中:

socketClient = new WebSocketClient();
socketClient.orderSubscribeService = WebSocketClient.socketClient.orderSubscribeService;

这样就可以保证每次重新连接时,都使用新的WebSocketClient实例。
WebSocket 客户端 - OKX API 实时数据订阅指南

原文地址: https://www.cveoy.top/t/topic/o9Dm 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录