WebSocket 客户端 - OKX API 实时数据订阅指南
WebSocket 客户端 - OKX API 实时数据订阅指南
本指南详细介绍使用 WebSocketClient 类连接 OKX API 获取实时数据,包括账户余额、持仓、订单等信息的订阅方法。
WebSocketClient 类
@Slf4j
@Component
public class WebSocketClient {
private static WebSocket webSocket = null;
private static Boolean flag = false;
private static Boolean isConnect = false;
private static String sign;
private final static HashFunction crc32 = Hashing.crc32();
private final static ObjectReader objectReader = new ObjectMapper().readerFor(OrderBookData.class);
private static Map<String, Optional<SpotOrderBook>> bookMap = new HashMap<>();
public WebSocketClient() {
}
@Resource
private OrderSubscribeService orderSubscribeService;
private static WebSocketClient socketClient;
@PostConstruct
public void init() {
socketClient = this;
socketClient.orderSubscribeService = this.orderSubscribeService;
}
public static void setRedis(String data) {
socketClient.orderSubscribeService.onMessage(data);
}
// 与服务器建立连接,参数为服务器的 URL
public static WebSocket connection(final String url) {
OkHttpClient client = new OkHttpClient.Builder()
.readTimeout(5, TimeUnit.SECONDS)
.build();
Request request = new Request.Builder()
.url(url)
.build();
webSocket = client.newWebSocket(request, new WebSocketListener() {
ScheduledExecutorService service;
@Override
public void onOpen(final WebSocket webSocket, final Response response) {
// 连接成功后,设置定时器,每隔 25s,自动向服务器发送心跳,保持与服务器连接
isConnect = true;
System.out.println(Instant.now().toString() + ' Connected to the server success!');
Runnable runnable = new Runnable() {
@Override
public void run() {
// task to run goes here
sendMessage('ping');
}
};
service = Executors.newSingleThreadScheduledExecutor();
// 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间
service.scheduleAtFixedRate(runnable, 25, 25, TimeUnit.SECONDS);
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
System.out.println('Connection is about to disconnect!');
webSocket.close(1000, 'Long time no message was sent or received!');
webSocket = null;
}
@Override
public void onClosed(final WebSocket webSocket, final int code, final String reason) {
System.out.println('Connection dropped!');
}
@Override
public void onFailure(final WebSocket webSocket, final Throwable t, final Response response) {
System.out.println('Connection failed,Please reconnect!');
if (Objects.nonNull(service)) {
service.shutdown();
reConnect();
}
}
@Override
public void onMessage(final WebSocket webSocket, final String s) {
setRedis(s);
if (s.contains('pong')) {
System.out.println(DateFormatUtils.format(new Date(), DateUtils.TIME_STYLE_S4) + ' Receive: ' + s);
}
if (null != s && s.contains('login')) {
if (s.endsWith('true}')) {
flag = true;
}
}
}
});
return webSocket;
}
public static void reConnect() {
log.info('重连:{};{}');
try {
WebSocketConfig.publicConnect(socketClient);
// 开启订阅
// socketClient.orderSubscribeService.subscribe();
} catch (Exception e) {
e.printStackTrace();
}
}
private static void isLogin(String s) {
if (null != s && s.contains('login')) {
if (s.endsWith('true}')) {
flag = true;
}
}
}
// 获得 sign
private static String sha256_HMAC(String message, String secret) {
String hash = '';
try {
Mac sha256_HMAC = Mac.getInstance('HmacSHA256');
SecretKeySpec secret_key = new SecretKeySpec(secret.getBytes(CharsetEnum.UTF_8.charset()), 'HmacSHA256');
sha256_HMAC.init(secret_key);
byte[] bytes = sha256_HMAC.doFinal(message.getBytes(CharsetEnum.UTF_8.charset()));
hash = Base64.getEncoder().encodeToString(bytes);
} catch (Exception e) {
System.out.println('Error HmacSHA256 ===========' + e.getMessage());
}
return hash;
}
private static String listToJson(List<Map> list) {
JSONArray jsonArray = new JSONArray();
for (Map map : list) {
jsonArray.add(JSONObject.fromObject(map));
}
return jsonArray.toJSONString();
}
// 登录
public static void login(String apiKey, String passPhrase, String secretKey) {
String timestamp = System.currentTimeMillis() / 1000 + '';
String message = timestamp + 'GET' + '/users/self/verify';
sign = sha256_HMAC(message, secretKey);
List<Map<String, Object>> args = new ArrayList<>();
Map<String, Object> data = new HashMap<>();
data.put('apiKey', apiKey);
data.put('passphrase', passPhrase);
data.put('timestamp', timestamp);
data.put('sign', sign);
args.add(data);
Map<String, Object> op = new HashMap<>();
op.put('op', 'login');
op.put('args', args);
sendMessage(JSON.toJSONString(op));
}
// 订阅,参数为频道组成的集合
public static void subscribe(List<Map> list) {
String s = listToJson(list);
String str = '{\'op\': \'subscribe\', \'args\':' + s + '}';
if (null != webSocket) {
sendMessage(str);
}
}
// 取消订阅,参数为频道组成的集合
public static void unsubscribe(List<Map> list) {
String s = listToJson(list);
String str = '{\'op\': \'unsubscribe\', \'args\':' + s + '}';
if (null != webSocket) {
sendMessage(str);
}
}
private static void sendMessage(String str) {
if (null != webSocket) {
try {
Thread.sleep(1300);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(DateFormatUtils.format(new Date(), DateUtils.TIME_STYLE_S4) + 'Send a message to the server:' + str);
webSocket.send(str);
} else {
System.out.println('Please establish the connection before you operate it!');
}
}
// 断开连接
public static void closeConnection() {
if (null != webSocket) {
webSocket.close(1000, 'User actively closes the connection');
} else {
System.out.println('Please establish the connection before you operate it!');
}
}
public boolean getIsLogin() {
return flag;
}
public boolean getIsConnect() {
return isConnect;
}
public static <T extends OrderBookItem> int checksum(List<T> asks, List<T> bids) {
System.out.println('深度');
StringBuilder s = new StringBuilder();
for (int i = 0; i < 25; i++) {
if (i < bids.size()) {
s.append(bids.get(i).getPrice().toString());
s.append(':');
s.append(bids.get(i).getSize());
s.append(':');
}
if (i < asks.size()) {
s.append(asks.get(i).getPrice().toString());
s.append(':');
s.append(asks.get(i).getSize());
s.append(':');
}
}
final String str;
if (s.length() > 0) {
str = s.substring(0, s.length() - 1);
} else {
str = '';
}
return crc32.hashString(str, StandardCharsets.UTF_8).asInt();
}
private static <T extends OrderBookItem> String getStr(List<T> asks, List<T> bids) {
StringBuilder s = new StringBuilder();
for (int i = 0; i < 25; i++) {
if (i < bids.size()) {
s.append(bids.get(i).getPrice().toString());
s.append(':');
s.append(bids.get(i).getSize());
s.append(':');
}
if (i < asks.size()) {
s.append(asks.get(i).getPrice().toString());
s.append(':');
s.append(asks.get(i).getSize());
s.append(':');
}
}
final String str;
if (s.length() > 0) {
str = s.substring(0, s.length() - 1);
} else {
str = '';
}
return str;
}
public static Optional<SpotOrderBook> parse(String json) {
try {
OrderBookData data = objectReader.readValue(json);
List<SpotOrderBookItem> asks =
data.getAsks().stream().map(x -> new SpotOrderBookItem(new String(x.get(0)), x.get(1), x.get(2), x.get(3)))
.collect(Collectors.toList());
List<SpotOrderBookItem> bids =
data.getBids().stream().map(x -> new SpotOrderBookItem(new String(x.get(0)), x.get(1), x.get(2), x.get(3)))
.collect(Collectors.toList());
return Optional.of(new SpotOrderBook(asks, bids, data.getTs(), data.getChecksum()));
} catch (Exception e) {
return Optional.empty();
}
}
@Data
public static class OrderBookData {
private List<List<String>> asks;
private List<List<String>> bids;
private String ts;
private int checksum;
public List<List<String>> getAsks() {
return asks;
}
public void setAsks(List<List<String>> asks) {
this.asks = asks;
}
public List<List<String>> getBids() {
return bids;
}
public void setBids(List<List<String>> bids) {
this.bids = bids;
}
public String getTs() {
return ts;
}
public void setTs(String ts) {
this.ts = ts;
}
public int getChecksum() {
return checksum;
}
public void setChecksum(int checksum) {
this.checksum = checksum;
}
}
}
WebSocketConfig 类
@Component
public class WebSocketConfig {
private static String SERVICE_URL_PUBLIC;
private static String SERVICE_URL_PRIVATE;
@Value('${okx.webSocket.public}')
private String webSocketPublic;
@Value('${okx.webSocket.private}')
private String webSocketPrivate;
@PostConstruct
public void getByWebSocket(){
SERVICE_URL_PUBLIC=this.webSocketPublic;
SERVICE_URL_PRIVATE=this.webSocketPrivate;
}
public static void publicConnect(WebSocketClient webSocketClient) {
System.out.println(SERVICE_URL_PUBLIC);
WebSocketClient.connection(SERVICE_URL_PUBLIC);
}
public static void loginConnect(WebSocketClient webSocketClient,String apiKey,String passphrase,String secretKey) {
System.out.println(SERVICE_URL_PRIVATE);
// 与服务器建立连接
WebSocketClient.connection(SERVICE_URL_PRIVATE);
// 登录账号,用户需提供 api-key,passphrase,secret—key 不要随意透漏 ^_^
WebSocketClient.login(apiKey,passphrase,secretKey);
}
public static void closeConnection(WebSocketClient webSocketClient) {
WebSocketClient.closeConnection();
}
}
@Slf4j
@Service
public class OrderSubscribeServiceImpl implements OrderSubscribeService {
private static final WebSocketClient webSocketClient = new WebSocketClient();
@Value('${okx.url}')
private String okxUrl;
@Resource
private AiPlanDetailedMapper aiPlanDetailedMapper;
@Resource
private AiPlanMapper aiPlanMapper;
@Autowired
private UserAccountMapper userAccountMapper;
@Autowired
private UserAccountBalanceMapper userAccountBalanceMapper;
@Autowired
private UserToInstMapper userToInstMapper;
@Autowired
private InstListMapper instListMapper;
@Override
public void orderSubscribe(String apikey, String secretkey, String passphrase) {
log.info('订单频道');
WebSocketConfig.loginConnect(webSocketClient, apikey, passphrase, secretkey);
// 添加订阅频道
ArrayList<Map> channelLists = new ArrayList<>();
Map spotTickerMap = new HashMap();
spotTickerMap.put('channel', 'orders');
spotTickerMap.put('instType', 'SWAP');
channelLists.add(spotTickerMap);
// 调用订阅方法
WebSocketClient.subscribe(channelLists);
}
@Override
public void subscribe(String apikey, String secretkey, String passphrase) {
log.info('账户余额和持仓频道');
WebSocketConfig.loginConnect(webSocketClient, apikey, passphrase, secretkey);
ArrayList<Map> channelLists = new ArrayList<>();
Map spotTickerMap = new HashMap();
spotTickerMap.put('channel', 'balance_and_position');
channelLists.add(spotTickerMap);
// 调用订阅方法
WebSocketClient.subscribe(channelLists);
}
}
账户余额和持仓频道 websocket 需要登录验证,多个账户订单,为什么只有最后一个账户建立了连接 内容:在代码中,WebSocketClient是一个单例类,通过静态的getInstance方法获取实例。在实例化WebSocketClient时,会通过@PostConstruct注解的init方法来设置OrderSubscribeService,并通过setRedis方法将OrderSubscribeService的实例传递给WebSocketClient类。
在OrderSubscribeServiceImpl中,通过调用WebSocketConfig的loginConnect方法来建立连接,然后通过调用WebSocketClient的subscribe方法来订阅频道。
问题可能出在WebSocketClient类中的静态变量socketClient上。在reConnect方法中,调用了WebSocketConfig的publicConnect方法,该方法会重新建立连接并订阅频道。但是在reConnect方法中,并没有将重新建立的WebSocketClient实例赋值给socketClient变量,导致后续调用subscribe方法时,始终使用的是最后一个建立连接的WebSocketClient实例。
解决方法是在reConnect方法中将重新建立的WebSocketClient实例赋值给socketClient变量,即将以下代码添加到reConnect方法中:
socketClient = new WebSocketClient();
socketClient.orderSubscribeService = WebSocketClient.socketClient.orderSubscribeService;
这样就可以保证每次重新连接时,都使用新的WebSocketClient实例。
原文地址: https://www.cveoy.top/t/topic/o9Dm 著作权归作者所有。请勿转载和采集!