gRPC实现多发布者多订阅者模式:详细示例与代码解析
以下是一个使用gRPC实现多发布者多订阅者模式的简单示例程序:
'publisher.proto':
syntax = 'proto3';
package publisher;
service Publisher {
rpc Publish (PublishRequest) returns (PublishResponse) {}
}
message PublishRequest {
string topic = 1;
string message = 2;
}
message PublishResponse {
bool success = 1;
string error_message = 2;
}
'subscriber.proto':
syntax = 'proto3';
package subscriber;
service Subscriber {
rpc Subscribe (SubscribeRequest) returns (stream SubscribeResponse) {}
}
message SubscribeRequest {
repeated string topics = 1;
}
message SubscribeResponse {
string topic = 1;
string message = 2;
}
'publisher_server.go':
package main
import (
"context"
"fmt"
"log"
"net"
"google.golang.org/grpc"
publisherpb "path/to/publisher/proto"
)
type publisherServer struct{}
func (s *publisherServer) Publish(ctx context.Context, req *publisherpb.PublishRequest) (*publisherpb.PublishResponse, error) {
// 在这里实现发布消息的逻辑
topic := req.Topic
message := req.Message
fmt.Printf("收到来自主题 %s 的消息:%s\n", topic, message)
return &publisherpb.PublishResponse{
Success: true,
},
nil
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
publisherpb.RegisterPublisherServer(s, &publisherServer{})
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
'subscriber_server.go':
package main
import (
"context"
"fmt"
"log"
"net"
"sync"
"google.golang.org/grpc"
subscriberpb "path/to/subscriber/proto"
)
type subscriberServer struct {
subscriptions map[string][]chan<- *subscriberpb.SubscribeResponse
mu sync.Mutex
}
func (s *subscriberServer) Subscribe(req *subscriberpb.SubscribeRequest, stream subscriberpb.Subscriber_SubscribeServer) error {
s.mu.Lock()
for _, topic := range req.Topics {
ch := make(chan *subscriberpb.SubscribeResponse)
s.subscriptions[topic] = append(s.subscriptions[topic], ch)
}
s.mu.Unlock()
// 在这里实现订阅消息的逻辑
for {
select {
case <-stream.Context().Done():
return nil
default:
for _, topic := range req.Topics {
channels := s.subscriptions[topic]
message := <-channels[0]
if err := stream.Send(message); err != nil {
return err
}
}
}
}
}
func main() {
lis, err := net.Listen("tcp", ":50052")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
subscriberpb.RegisterSubscriberServer(s, &subscriberServer{
subscriptions: make(map[string][]chan<- *subscriberpb.SubscribeResponse),
})
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
'publisher_client.go':
package main
import (
"context"
"log"
"os"
"time"
"google.golang.org/grpc"
publisherpb "path/to/publisher/proto"
)
func main() {
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := publisherpb.NewPublisherClient(conn)
topic := os.Args[1]
message := os.Args[2]
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, err = c.Publish(ctx, &publisherpb.PublishRequest{
Topic: topic,
Message: message,
})
if err != nil {
log.Fatalf("Publish failed: %v", err)
}
log.Printf("Message published successfully")
}
'subscriber_client.go':
package main
import (
"context"
"fmt"
"io"
"log"
"os"
"google.golang.org/grpc"
subscriberpb "path/to/subscriber/proto"
)
func main() {
conn, err := grpc.Dial("localhost:50052", grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := subscriberpb.NewSubscriberClient(conn)
topics := os.Args[1:]
ctx := context.Background()
stream, err := c.Subscribe(ctx, &subscriberpb.SubscribeRequest{
Topics: topics,
})
if err != nil {
log.Fatalf("Subscribe failed: %v", err)
}
for {
message, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("Error receiving message: %v", err)
}
fmt.Printf("收到来自主题 %s 的消息:%s\n", message.Topic, message.Message)
}
}
这个示例中,我们定义了两个gRPC服务,一个用于发布消息,另一个用于订阅消息。发布者通过gRPC调用Publish方法来发布消息,订阅者通过gRPC调用Subscribe方法来订阅感兴趣的主题。订阅者通过流式RPC接收发布者发布的消息。
在'publisher_server.go'中,我们实现了Publish方法,用于处理发布者发布的消息。在'subscriber_server.go'中,我们实现了Subscribe方法,用于处理订阅者的订阅请求,并通过通道将订阅者和发布者连接起来。在'publisher_client.go'中,我们向发布者发送一个消息。在'subscriber_client.go'中,我们向订阅者发送订阅请求,并通过流式RPC接收发布者发布的消息。
你可以根据自己的需求修改这个示例程序,并在其中实现你自己的业务逻辑。
更多相关信息:
原文地址: https://www.cveoy.top/t/topic/pcuf 著作权归作者所有。请勿转载和采集!