Override Nonnull public MonoVoid handleNonnull WebSocketSession session MapString Disposable subs = new ConcurrentHashMap; return filter handlesession switch
要去除 token 验证,可以直接将 handle 方法中的 auth 相关代码删除,即不需要获取用户认证信息。同时,也可以在 handle 方法中直接返回一个空的 Mono,表示不需要进行任何认证验证。具体来说,可以将 handle 方法修改如下:
@Override
public Mono<Void> handle(@Nonnull WebSocketSession session) {
Map<String, Disposable> subs = new ConcurrentHashMap<>();
return filter
.handle(session)
.switchIfEmpty(session
.send(Mono.just(session.textMessage(JSON.toJSONString(Message.authError()))))
.then(session.close(CloseStatus.BAD_DATA))
.then(Mono.empty()))
.flatMap(auth -> session
.receive()
.doOnNext(message -> {
try {
log.info("<<<<<<< 进入try方法 <<<<<<<");
if (message.getType() == WebSocketMessage.Type.PONG) {
return;
}
if (message.getType() == WebSocketMessage.Type.PING) {
session
.send(Mono.just(session.pongMessage(DataBufferFactory::allocateBuffer)))
.subscribe();
return;
}
MessagingRequest request = JSON.parseObject(message.getPayloadAsText(), MessagingRequest.class);
if (request == null) {
return;
}
if (request.getType() == MessagingRequest.Type.ping) {
session
.send(Mono.just(session.textMessage(JSON.toJSONString(
Message.pong(request.getId())
))))
.subscribe();
return;
}
if (StringUtils.isEmpty(request.getId())) {
session
.send(Mono.just(session.textMessage(JSON.toJSONString(
Message.error(request.getType().name(), null, "id不能为空")
)))).subscribe();
return;
}
if (request.getType() == MessagingRequest.Type.sub) {
//重复订阅
Disposable old = subs.get(request.getId());
if (old != null && !old.isDisposed()) {
return;
}
Disposable sub = messagingManager
.subscribe(SubscribeRequest.of(request, auth))
.doOnEach(ReactiveLogger.onError(err -> log.error("{}", err.getMessage(), err)))
.onErrorResume(err -> Mono.just(Message.error(request.getId(), request.getTopic(), err)))
.map(msg -> session.textMessage(JSON.toJSONString(msg)))
.doOnComplete(() -> {
log.debug("complete subscription:{}", request.getTopic());
subs.remove(request.getId());
Mono.just(session.textMessage(JSON.toJSONString(Message.complete(request.getId()))))
.as(session::send)
.subscribe();
})
.doOnCancel(() -> {
log.debug("cancel subscription:{}", request.getTopic());
subs.remove(request.getId());
})
.transform(session::send)
.subscribe();
if (!sub.isDisposed()) {
subs.put(request.getId(), sub);
}
} else if (request.getType() == MessagingRequest.Type.unsub) {
Optional.ofNullable(subs.remove(request.getId()))
.ifPresent(Disposable::dispose);
} else {
session.send(Mono.just(session.textMessage(JSON.toJSONString(
Message.error(request.getId(), request.getTopic(), "不支持的类型:" + request.getType())
)))).subscribe();
}
} catch (Exception e) {
log.warn(e.getMessage(), e);
session.send(Mono.just(session.textMessage(JSON.toJSONString(
Message.error("illegal_argument", null, "消息格式错误")
)))).subscribe();
}
})
.then())
.doFinally(r -> {
subs.values().forEach(Disposable::dispose);
subs.clear();
});
}
@Override
public Mono<Authentication> handle(WebSocketSession session) {
// 直接返回一个空的 Mono,表示不需要进行任何认证验证
return Mono.empty();
}
``
原文地址: https://www.cveoy.top/t/topic/g0A2 著作权归作者所有。请勿转载和采集!