WebSocket 客户端类 - OKX API 连接和数据订阅
@Slf4j
@Component
public class WebSocketClient {
private static List<WebSocket> webSockets = new ArrayList<>();
private static Boolean flag = false;
private static Boolean isConnect = false;
private static String sign;
private final static HashFunction crc32 = Hashing.crc32();
private final static ObjectReader objectReader = new ObjectMapper().readerFor(OrderBookData.class);
private static Map<String, Optional<SpotOrderBook>> bookMap = new HashMap<>();
public WebSocketClient() {
}
@Resource
private OrderSubscribeService orderSubscribeService;
private static WebSocketClient socketClient;
@PostConstruct
public void init() {
socketClient = this;
socketClient.orderSubscribeService = this.orderSubscribeService;
}
public static void setRedis(String data) {
socketClient.orderSubscribeService.onMessage(data);
}
//与服务器建立连接,参数为服务器的URL
public static WebSocket connection(final String url) {
OkHttpClient client = new OkHttpClient.Builder()
.readTimeout(5, TimeUnit.SECONDS)
.build();
Request request = new Request.Builder()
.url(url)
.build();
WebSocket webSocket = client.newWebSocket(request, new WebSocketListener() {
ScheduledExecutorService service;
@Override
public void onOpen(final WebSocket webSocket, final Response response) {
//连接成功后,设置定时器,每隔25s,自动向服务器发送心跳,保持与服务器连接
isConnect = true;
webSockets.add(webSocket);
System.out.println(Instant.now().toString() + ' Connected to the server success!');
Runnable runnable = new Runnable() {
@Override
public void run() {
// task to run goes here
sendMessage('ping');
}
};
service = Executors.newSingleThreadScheduledExecutor();
// 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间
service.scheduleAtFixedRate(runnable, 25, 25, TimeUnit.SECONDS);
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
System.out.println('Connection is about to disconnect!');
webSocket.close(1000, 'Long time no message was sent or received!');
}
@Override
public void onClosed(final WebSocket webSocket, final int code, final String reason) {
webSockets.remove(webSocket);
System.out.println('Connection dropped!');
}
@Override
public void onFailure(final WebSocket webSocket, final Throwable t, final Response response) {
System.out.println('Connection failed,Please reconnect!');
if (Objects.nonNull(service)) {
service.shutdown();
reConnect();
}
}
@Override
public void onMessage(final WebSocket webSocket, final String s) {
setRedis(s);
if(s.contains('pong')){
System.out.println(DateFormatUtils.format(new Date(), DateUtils.TIME_STYLE_S4) + ' Receive: ' + s);
}
if (null != s && s.contains('login')) {
if (s.endsWith('true}')) {
flag = true;
}
}
}
});
return webSocket;
}
public static void reConnect() {
log.info('重连:{};{}');
try {
WebSocketConfig.publicConnect(socketClient);
//开启订阅
// socketClient.orderSubscribeService.subscribe();
} catch (Exception e) {
e.printStackTrace();
}
}
private static void isLogin(String s) {
if (null != s && s.contains('login')) {
if (s.endsWith('true}')) {
flag = true;
}
}
}
//获得sign
private static String sha256_HMAC(String message, String secret) {
String hash = '';
try {
Mac sha256_HMAC = Mac.getInstance('HmacSHA256');
SecretKeySpec secret_key = new SecretKeySpec(secret.getBytes(CharsetEnum.UTF_8.charset()), 'HmacSHA256');
sha256_HMAC.init(secret_key);
byte[] bytes = sha256_HMAC.doFinal(message.getBytes(CharsetEnum.UTF_8.charset()));
hash = Base64.getEncoder().encodeToString(bytes);
} catch (Exception e) {
System.out.println('Error HmacSHA256 ===========' + e.getMessage());
}
return hash;
}
private static String listToJson(List<Map> list) {
JSONArray jsonArray = new JSONArray();
for (Map map : list) {
jsonArray.add(JSONObject.fromObject(map));
}
return jsonArray.toJSONString();
}
//登录
public static void login(String apiKey, String passPhrase, String secretKey) {
String timestamp = System.currentTimeMillis()/1000 + '';
String message = timestamp + 'GET' + '/users/self/verify';
sign = sha256_HMAC(message, secretKey);
List<Map<String, Object>> args=new ArrayList<>();
Map<String, Object> data=new HashMap<>();
data.put('apiKey',apiKey);
data.put('passphrase',passPhrase);
data.put('timestamp',timestamp);
data.put('sign',sign);
args.add(data);
Map<String, Object> op=new HashMap<>();
op.put('op','login');
op.put('args',args);
sendMessage(JSON.toJSONString(op));
}
//订阅,参数为频道组成的集合
public static void subscribe(List<Map> list) {
String s = listToJson(list);
String str = '{\'op\': \'subscribe\', \'args\':' + s + '}';
for (WebSocket webSocket : webSockets) {
if (null != webSocket) {
sendMessage(webSocket, str);
}
}
}
//取消订阅,参数为频道组成的集合
public static void unsubscribe(List<Map> list) {
String s = listToJson(list);
String str = '{\'op\': \'unsubscribe\', \'args\':' + s + '}';
for (WebSocket webSocket : webSockets) {
if (null != webSocket) {
sendMessage(webSocket, str);
}
}
}
private static void sendMessage(WebSocket webSocket, String str) {
if (null != webSocket) {
try {
Thread.sleep(1300);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(DateFormatUtils.format(new Date(), DateUtils.TIME_STYLE_S4) + 'Send a message to the server:' + str);
webSocket.send(str);
} else {
System.out.println('Please establish the connection before you operate it!');
}
}
//断开连接
public static void closeConnection() {
for (WebSocket webSocket : webSockets) {
if (null != webSocket) {
webSocket.close(1000, 'User actively closes the connection');
}
}
}
public boolean getIsLogin() {
return flag;
}
public boolean getIsConnect() {
return isConnect;
}
public static <T extends OrderBookItem> int checksum(List<T> asks, List<T> bids) {
System.out.println('深度');
StringBuilder s = new StringBuilder();
for (int i = 0; i < 25; i++) {
if (i < bids.size()) {
s.append(bids.get(i).getPrice().toString());
s.append(':');
s.append(bids.get(i).getSize());
s.append(':');
}
if (i < asks.size()) {
s.append(asks.get(i).getPrice().toString());
s.append(':');
s.append(asks.get(i).getSize());
s.append(':');
}
}
final String str;
if (s.length() > 0) {
str = s.substring(0, s.length() - 1);
} else {
str = '';
}
return crc32.hashString(str, StandardCharsets.UTF_8).asInt();
}
private static <T extends OrderBookItem> String getStr(List<T> asks, List<T> bids) {
StringBuilder s = new StringBuilder();
for (int i = 0; i < 25; i++) {
if (i < bids.size()) {
s.append(bids.get(i).getPrice().toString());
s.append(':');
s.append(bids.get(i).getSize());
s.append(':');
}
if (i < asks.size()) {
s.append(asks.get(i).getPrice().toString());
s.append(':');
s.append(asks.get(i).getSize());
s.append(':');
}
}
final String str;
if (s.length() > 0) {
str = s.substring(0, s.length() - 1);
} else {
str = '';
}
return str;
}
public static Optional<SpotOrderBook> parse(String json) {
try {
OrderBookData data = objectReader.readValue(json);
List<SpotOrderBookItem> asks =
data.getAsks().stream().map(x -> new SpotOrderBookItem(new String(x.get(0)), x.get(1), x.get(2), x.get(3)))
.collect(Collectors.toList());
List<SpotOrderBookItem> bids =
data.getBids().stream().map(x -> new SpotOrderBookItem(new String(x.get(0)), x.get(1), x.get(2), x.get(3)))
.collect(Collectors.toList());
return Optional.of(new SpotOrderBook(asks, bids, data.getTs(), data.getChecksum()));
} catch (Exception e) {
return Optional.empty();
}
}
@Data
public static class OrderBookData {
private List<List<String>> asks;
private List<List<String>> bids;
private String ts;
private int checksum;
public List<List<String>> getAsks() {
return asks;
}
public void setAsks(List<List<String>> asks) {
this.asks = asks;
}
public List<List<String>> getBids() {
return bids;
}
public void setBids(List<List<String>> bids) {
this.bids = bids;
}
public String getTs() {
return ts;
}
public void setTs(String ts) {
this.ts = ts;
}
public int getChecksum() {
return checksum;
}
public void setChecksum(int checksum) {
this.checksum = checksum;
}
}
}
WebSocketConfig 类
@Component
public class WebSocketConfig {
private static String SERVICE_URL_PUBLIC;
private static String SERVICE_URL_PRIVATE;
@Value('${okx.webSocket.public}')
private String webSocketPublic;
@Value('${okx.webSocket.private}')
private String webSocketPrivate;
@PostConstruct
public void getByWebSocket(){
SERVICE_URL_PUBLIC=this.webSocketPublic;
SERVICE_URL_PRIVATE=this.webSocketPrivate;
}
public static void publicConnect(WebSocketClient webSocketClient) {
System.out.println(SERVICE_URL_PUBLIC);
WebSocketClient.connection(SERVICE_URL_PUBLIC);
}
public static void loginConnect(WebSocketClient webSocketClient,String apiKey,String passphrase,String secretKey) {
System.out.println(SERVICE_URL_PRIVATE);
//与服务器建立连接
WebSocketClient.connection(SERVICE_URL_PRIVATE);
//登录账号,用户需提供 api-key,passphrase,secret—key 不要随意透漏 ^_^
WebSocketClient.login(apiKey,passphrase,secretKey);
}
public static void closeConnection(WebSocketClient webSocketClient) {
WebSocketClient.closeConnection();
}
}
/**
* 系统初始化runner
*/
@Component
@Order(value = 1)
@Slf4j
public class SystemInitRunner implements ApplicationRunner {
@Resource
private UserAccountMapper userAccountMapper;
@Resource
private OrderSubscribeService orderSubscribeService;
@Resource
private IUserAccountPositionsService iUserAccountPositionsService;
@Override
public void run(ApplicationArguments args) {
log.info('==服务启动后,订单频道==');
//查询账户信息
List<UserAccount> userAccountList = userAccountMapper.getAccoutList();
for(UserAccount userAccount: userAccountList){
if(userAccount.getBindingType()==1){
orderSubscribeService.orderSubscribe(userAccount.getApikey(),userAccount.getSecretkey(),userAccount.getPassphrase());
log.info('==账户ID:【'+userAccount.getId()+'】订单频道订阅完成==');
orderSubscribeService.subscribe(userAccount.getApikey(),userAccount.getSecretkey(),userAccount.getPassphrase());
log.info('==账户ID:【'+userAccount.getId()+'】账户余额和持仓频道订阅完成==');
}
}
//更新持仓信息
iUserAccountPositionsService.getPositions();
log.info('初始化--更新持仓信息');
log.info('初始订单数据完成...');
}
}
@Slf4j
@Service
public class OrderSubscribeServiceImpl implements OrderSubscribeService {
private static final WebSocketClient webSocketClient = new WebSocketClient();
@Value('${okx.url}')
private String okxUrl;
@Resource
private AiPlanDetailedMapper aiPlanDetailedMapper;
@Resource
private AiPlanMapper aiPlanMapper;
@Autowired
private UserAccountMapper userAccountMapper;
@Autowired
private UserAccountBalanceMapper userAccountBalanceMapper;
@Autowired
private UserToInstMapper userToInstMapper;
@Autowired
private InstListMapper instListMapper;
@Override
public void orderSubscribe(String apikey, String secretkey, String passphrase) {
log.info('订单频道');
WebSocketConfig.loginConnect(webSocketClient, apikey, passphrase, secretkey);
//添加订阅频道
ArrayList<Map> channelLists = new ArrayList<>();
Map spotTickerMap = new HashMap();
spotTickerMap.put('channel', 'orders');
spotTickerMap.put('instType', 'SWAP');
channelLists.add(spotTickerMap);
//调用订阅方法
WebSocketClient.subscribe(channelLists);
}
@Override
public void subscribe(String apikey, String secretkey, String passphrase) {
log.info('账户余额和持仓频道');
WebSocketConfig.loginConnect(webSocketClient, apikey, passphrase, secretkey);
ArrayList<Map> channelLists = new ArrayList<>();
Map spotTickerMap = new HashMap();
spotTickerMap.put('channel', 'balance_and_position');
channelLists.add(spotTickerMap);
//调用订阅方法
WebSocketClient.subscribe(channelLists);
}
}
账户余额和持仓频道 websocket 需要登录验证,多个账户订单,为什么只有最后一个账户建立了连接 ,要详细代码内容:给出的代码中,WebSocketClient类是一个WebSocket客户端类,用于与服务器建立连接,并发送和接收消息。它包含了连接服务器、登录、订阅和取消订阅等功能。
WebSocketConfig类是WebSocket的配置类,用于获取WebSocket的URL,并调用WebSocketClient类的连接方法。
SystemInitRunner类是一个系统初始化的Runner,在系统启动后,会自动执行一些初始化操作,包括订阅订单频道和更新持仓信息。
OrderSubscribeServiceImpl类是订单订阅的实现类,它调用WebSocketClient类的订阅方法,订阅订单频道和账户余额和持仓频道。
根据给出的代码,每个账户都会调用一次订阅订单频道和订阅账户余额和持仓频道的方法。但是在WebSocketConfig类中,只会调用最后一个账户的连接方法,因为每次调用连接方法时,会覆盖之前的连接。
如果要实现多个账户同时建立连接,可以对WebSocketClient类进行修改,将webSocket变量改为一个List,每次连接时添加到List中,这样就可以保存多个连接了。在订阅和取消订阅方法中,也需要修改为遍历List,分别发送消息给每个连接。
另外,需要注意的是,WebSocket是一种长连接,如果要同时建立多个连接,可能会导致系统负载过高。建议根据实际需求和系统资源进行调整。
原文地址: https://www.cveoy.top/t/topic/o9Dt 著作权归作者所有。请勿转载和采集!