Java 使用 HttpClient 接收 SSE 流数据

在 Java 中,可以使用 java.net.http.HttpClient 类发送 HTTP 请求并接收 SSE(Server-Sent Events)返回的流数据。以下是一个示例代码:

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpHeaders;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpClient.Version;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow;
import java.util.function.Consumer;

public class SSEClient {

    public static void main(String[] args) {
        HttpClient client = HttpClient.newBuilder()
                .version(Version.HTTP_1_1)
                .build();

        HttpRequest request = HttpRequest.newBuilder()
                .uri(URI.create("http://example.com/sse-endpoint"))
                .GET()
                .build();

        CompletableFuture<Void> future = client.sendAsync(request, new SSEBodyHandler())
                .thenApply(HttpResponse::body)
                .thenApply(SSEClient::parseSSE)
                .thenAccept(SSEClient::processSSE);

        try {
            future.get(); // 等待请求完成
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    private static void processSSE(SSEEvent event) {
        // 处理SSE事件
        System.out.println(event.getData());
    }

    private static SSEEvent parseSSE(String body) {
        // 解析SSE消息
        String[] lines = body.split("\n");
        String eventType = null;
        String data = "";

        for (String line : lines) {
            if (line.startsWith("event:")) {
                eventType = line.substring(6).trim();
            } else if (line.startsWith("data:")) {
                data += line.substring(5).trim() + "\n";
            }
        }

        return new SSEEvent(eventType, data);
    }

    private static class SSEBodyHandler implements BodyHandler<String> {
        private final StringBuilder builder = new StringBuilder();

        @Override
        public BodyHandler<String> apply(HttpHeaders headers) {
            return this;
        }

        @Override
        public Flow.Subscriber<ByteBuffer> apply(Flow.Publisher<ByteBuffer> publisher) {
            return new Flow.Subscriber<>() {
                @Override
                public void onSubscribe(Flow.Subscription subscription) {
                    subscription.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(ByteBuffer item) {
                    builder.append(new String(item.array()));
                }

                @Override
                public void onError(Throwable throwable) {
                    throwable.printStackTrace();
                }

                @Override
                public void onComplete() {
                    // SSE流接收完成
                    System.out.println(builder.toString());
                }
            };
        }
    }

    private static class SSEEvent {
        private final String eventType;
        private final String data;

        public SSEEvent(String eventType, String data) {
            this.eventType = eventType;
            this.data = data;
        }

        public String getEventType() {
            return eventType;
        }

        public String getData() {
            return data;
        }
    }
}

在上述代码中,我们首先创建一个 HttpClient 实例,并使用 sendAsync 方法发送一个 GET 请求到 SSE 的 URL。为了接收 SSE 的流数据,我们自定义了一个 SSEBodyHandler 类,该类实现了 HttpResponse.BodyHandler 接口,并重写了 applyapply 方法。在 apply 方法中,我们返回了自身实例,而在 apply 方法中,我们创建了一个 Flow.Subscriber 实例来处理 SSE 的流数据。在 onNext 方法中,我们将接收到的数据添加到 StringBuilder 中,直到流接收完成(onComplete 方法被调用),我们将 StringBuilder 中的数据打印出来。

main 方法中,我们使用 sendAsync 方法发送请求,并通过 thenApply 方法将 HttpResponse 转换为 SSE 的字符串数据,然后通过 thenApply 方法将字符串数据解析为 SSEEvent 对象,最后通过 thenAccept 方法处理 SSEEvent 对象。

请注意,以上示例代码仅用于演示目的,实际使用时可能需要根据具体的需求进行相应的修改。

Java 使用 HttpClient 接收 SSE 流数据

原文地址: http://www.cveoy.top/t/topic/fUe 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录