Java 使用 HttpClient 接收 SSE 流数据
Java 使用 HttpClient 接收 SSE 流数据
在 Java 中,可以使用 java.net.http.HttpClient 类发送 HTTP 请求并接收 SSE(Server-Sent Events)返回的流数据。以下是一个示例代码:
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpHeaders;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpClient.Version;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow;
import java.util.function.Consumer;
public class SSEClient {
public static void main(String[] args) {
HttpClient client = HttpClient.newBuilder()
.version(Version.HTTP_1_1)
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://example.com/sse-endpoint"))
.GET()
.build();
CompletableFuture<Void> future = client.sendAsync(request, new SSEBodyHandler())
.thenApply(HttpResponse::body)
.thenApply(SSEClient::parseSSE)
.thenAccept(SSEClient::processSSE);
try {
future.get(); // 等待请求完成
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
private static void processSSE(SSEEvent event) {
// 处理SSE事件
System.out.println(event.getData());
}
private static SSEEvent parseSSE(String body) {
// 解析SSE消息
String[] lines = body.split("\n");
String eventType = null;
String data = "";
for (String line : lines) {
if (line.startsWith("event:")) {
eventType = line.substring(6).trim();
} else if (line.startsWith("data:")) {
data += line.substring(5).trim() + "\n";
}
}
return new SSEEvent(eventType, data);
}
private static class SSEBodyHandler implements BodyHandler<String> {
private final StringBuilder builder = new StringBuilder();
@Override
public BodyHandler<String> apply(HttpHeaders headers) {
return this;
}
@Override
public Flow.Subscriber<ByteBuffer> apply(Flow.Publisher<ByteBuffer> publisher) {
return new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(ByteBuffer item) {
builder.append(new String(item.array()));
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
// SSE流接收完成
System.out.println(builder.toString());
}
};
}
}
private static class SSEEvent {
private final String eventType;
private final String data;
public SSEEvent(String eventType, String data) {
this.eventType = eventType;
this.data = data;
}
public String getEventType() {
return eventType;
}
public String getData() {
return data;
}
}
}
在上述代码中,我们首先创建一个 HttpClient 实例,并使用 sendAsync 方法发送一个 GET 请求到 SSE 的 URL。为了接收 SSE 的流数据,我们自定义了一个 SSEBodyHandler 类,该类实现了 HttpResponse.BodyHandler 接口,并重写了 apply 和 apply 方法。在 apply 方法中,我们返回了自身实例,而在 apply 方法中,我们创建了一个 Flow.Subscriber 实例来处理 SSE 的流数据。在 onNext 方法中,我们将接收到的数据添加到 StringBuilder 中,直到流接收完成(onComplete 方法被调用),我们将 StringBuilder 中的数据打印出来。
在 main 方法中,我们使用 sendAsync 方法发送请求,并通过 thenApply 方法将 HttpResponse 转换为 SSE 的字符串数据,然后通过 thenApply 方法将字符串数据解析为 SSEEvent 对象,最后通过 thenAccept 方法处理 SSEEvent 对象。
请注意,以上示例代码仅用于演示目的,实际使用时可能需要根据具体的需求进行相应的修改。
原文地址: http://www.cveoy.top/t/topic/fUe 著作权归作者所有。请勿转载和采集!