以下是一个基于 gRPC 实现的多发布者多订阅者的 C++ 示例程序:

// 文件: pubsub.proto
syntax = "proto3";

package pubsub;

message Message {
  string content = 1;
}

service Publisher {
  rpc Publish(Message) returns (google.protobuf.Empty);
}

service Subscriber {
  rpc Subscribe(google.protobuf.Empty) returns (stream Message);
}
// 文件: pubsub_server.cpp
#include <iostream>
#include <memory>
#include <string>
#include <unordered_map>

#include <grpcpp/grpcpp.h>

#include "pubsub.grpc.pb.h"

using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using pubsub::Message;
using pubsub::Publisher;
using pubsub::Subscriber;

class PubSubServiceImpl final : public Publisher::Service, public Subscriber::Service {
public:
  Status Publish(ServerContext* context, const Message* request, google::protobuf::Empty* response) override {
    std::string content = request->content();
    std::cout << "Received message: " << content << std::endl;

    // 将消息发送给所有订阅者
    for (const auto& subscriber : subscribers_) {
      grpc::ServerWriter<Message>* writer = subscriber.second;
      writer->Write(*request);
    }

    return Status::OK;
  }

  Status Subscribe(ServerContext* context, const google::protobuf::Empty* request, grpc::ServerWriter<Message>* writer) override {
    std::string subscriber_id = std::to_string(reinterpret_cast<int64_t>(writer));
    subscribers_[subscriber_id] = writer;

    // 保持长连接,等待订阅者断开连接
    while (!context->IsCancelled()) {
      // 等待新消息到达
    }

    subscribers_.erase(subscriber_id);

    return Status::OK;
  }

private:
  std::unordered_map<std::string, grpc::ServerWriter<Message>*> subscribers_;
};

void RunServer() {
  std::string server_address("0.0.0.0:50051");
  PubSubServiceImpl service;

  ServerBuilder builder;
  builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  builder.RegisterService(&service);

  std::unique_ptr<Server> server(builder.BuildAndStart());
  std::cout << "Server listening on " << server_address << std::endl;
  server->Wait();
}

int main() {
  RunServer();
  return 0;
}
// 文件: pubsub_client.cpp
#include <iostream>
#include <memory>
#include <string>

#include <grpcpp/grpcpp.h>

#include "pubsub.grpc.pb.h"

using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using pubsub::Message;
using pubsub::Publisher;
using pubsub::Subscriber;

class PublisherClient {
public:
  PublisherClient(std::shared_ptr<Channel> channel)
    : stub_(Publisher::NewStub(channel)) {}

  void Publish(const std::string& content) {
    Message request;
    request.set_content(content);

    google::protobuf::Empty response;
    ClientContext context;

    Status status = stub_->Publish(&context, request, &response);
    if (status.ok()) {
      std::cout << "Message published: " << content << std::endl;
    } else {
      std::cout << "Failed to publish message: " << status.error_message() << std::endl;
    }
  }

private:
  std::unique_ptr<Publisher::Stub> stub_;
};

class SubscriberClient {
public:
  SubscriberClient(std::shared_ptr<Channel> channel)
    : stub_(Subscriber::NewStub(channel)) {}

  void Subscribe() {
    google::protobuf::Empty request;
    ClientContext context;

    std::unique_ptr<grpc::ClientReader<Message>> reader(stub_->Subscribe(&context, request));
    Message message;

    while (reader->Read(&message)) {
      std::string content = message.content();
      std::cout << "Received message: " << content << std::endl;
    }

    Status status = reader->Finish();
    if (status.ok()) {
      std::cout << "Subscription finished" << std::endl;
    } else {
      std::cout << "Failed to subscribe: " << status.error_message() << std::endl;
    }
  }

private:
  std::unique_ptr<Subscriber::Stub> stub_;
};

int main() {
  std::string server_address("localhost:50051");
  PublisherClient publisher(grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials()));
  SubscriberClient subscriber(grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials()));

  // 发布消息
  publisher.Publish("Hello");
  publisher.Publish("World");

  // 订阅消息
  subscriber.Subscribe();

  return 0;
}

在终端中,首先运行 pubsub_server.cpp 启动服务器,然后运行 pubsub_client.cpp 来发布和订阅消息。可以在终端中看到相应的输出。

这个示例程序实现了一个简单的发布-订阅系统,支持多个发布者和多个订阅者。发布者可以发布消息,订阅者可以订阅消息并接收到发布者发送的消息。

gRPC 实现的多发布者多订阅者 C++ 示例程序

原文地址: https://www.cveoy.top/t/topic/o98N 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录