以下是一个使用gRPC实现多发布者多订阅者模式的简单示例程序:

'publisher.proto':

syntax = 'proto3';

package publisher;

service Publisher {
  rpc Publish (PublishRequest) returns (PublishResponse) {}
}

message PublishRequest {
  string topic = 1;
  string message = 2;
}

message PublishResponse {
  bool success = 1;
  string error_message = 2;
}

'subscriber.proto':

syntax = 'proto3';

package subscriber;

service Subscriber {
  rpc Subscribe (SubscribeRequest) returns (stream SubscribeResponse) {}
}

message SubscribeRequest {
  repeated string topics = 1;
}

message SubscribeResponse {
  string topic = 1;
  string message = 2;
}

'publisher_server.go':

package main

import (
	"context"
	"fmt"
	"log"
	"net"

	"google.golang.org/grpc"

	publisherpb "path/to/publisher/proto"
)

type publisherServer struct{}

func (s *publisherServer) Publish(ctx context.Context, req *publisherpb.PublishRequest) (*publisherpb.PublishResponse, error) {
	// 在这里实现发布消息的逻辑
	topic := req.Topic
	message := req.Message

	fmt.Printf("收到来自主题 %s 的消息:%s\n", topic, message)

	return &publisherpb.PublishResponse{
		Success: true,
	},
	 nil
}

func main() {
	lis, err := net.Listen("tcp", ":50051")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	s := grpc.NewServer()
	publisherpb.RegisterPublisherServer(s, &publisherServer{})

	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

'subscriber_server.go':

package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"sync"

	"google.golang.org/grpc"

	subscriberpb "path/to/subscriber/proto"
)

type subscriberServer struct {
	subscriptions map[string][]chan<- *subscriberpb.SubscribeResponse
	mu            sync.Mutex
}

func (s *subscriberServer) Subscribe(req *subscriberpb.SubscribeRequest, stream subscriberpb.Subscriber_SubscribeServer) error {
	s.mu.Lock()
	for _, topic := range req.Topics {
		ch := make(chan *subscriberpb.SubscribeResponse)
		s.subscriptions[topic] = append(s.subscriptions[topic], ch)
	}

	s.mu.Unlock()

	// 在这里实现订阅消息的逻辑
	for {
		select {
		case <-stream.Context().Done():
			return nil
		default:
			for _, topic := range req.Topics {
				channels := s.subscriptions[topic]
				message := <-channels[0]
				if err := stream.Send(message); err != nil {
					return err
				}
			}
		}
	}
}

func main() {
	lis, err := net.Listen("tcp", ":50052")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	s := grpc.NewServer()
	subscriberpb.RegisterSubscriberServer(s, &subscriberServer{
		subscriptions: make(map[string][]chan<- *subscriberpb.SubscribeResponse),
	})

	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

'publisher_client.go':

package main

import (
	"context"
	"log"
	"os"
	"time"

	"google.golang.org/grpc"

	publisherpb "path/to/publisher/proto"
)

func main() {
	conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	c := publisherpb.NewPublisherClient(conn)

	topic := os.Args[1]
	message := os.Args[2]

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	_, err = c.Publish(ctx, &publisherpb.PublishRequest{
		Topic:   topic,
		Message: message,
	})
	if err != nil {
		log.Fatalf("Publish failed: %v", err)
	}

	log.Printf("Message published successfully")
}

'subscriber_client.go':

package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"google.golang.org/grpc"

	subscriberpb "path/to/subscriber/proto"
)

func main() {
	conn, err := grpc.Dial("localhost:50052", grpc.WithInsecure())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	c := subscriberpb.NewSubscriberClient(conn)

	topics := os.Args[1:]

	ctx := context.Background()
	stream, err := c.Subscribe(ctx, &subscriberpb.SubscribeRequest{
		Topics: topics,
	})
	if err != nil {
		log.Fatalf("Subscribe failed: %v", err)
	}

	for {
		message, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("Error receiving message: %v", err)
		}

		fmt.Printf("收到来自主题 %s 的消息:%s\n", message.Topic, message.Message)
	}
}

这个示例中,我们定义了两个gRPC服务,一个用于发布消息,另一个用于订阅消息。发布者通过gRPC调用Publish方法来发布消息,订阅者通过gRPC调用Subscribe方法来订阅感兴趣的主题。订阅者通过流式RPC接收发布者发布的消息。

在'publisher_server.go'中,我们实现了Publish方法,用于处理发布者发布的消息。在'subscriber_server.go'中,我们实现了Subscribe方法,用于处理订阅者的订阅请求,并通过通道将订阅者和发布者连接起来。在'publisher_client.go'中,我们向发布者发送一个消息。在'subscriber_client.go'中,我们向订阅者发送订阅请求,并通过流式RPC接收发布者发布的消息。

你可以根据自己的需求修改这个示例程序,并在其中实现你自己的业务逻辑。

更多相关信息:

gRPC实现多发布者多订阅者模式:详细示例与代码解析

原文地址: https://www.cveoy.top/t/topic/pcuf 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录