gRPC 实现的多发布者多订阅者 C++ 示例程序
以下是一个基于 gRPC 实现的多发布者多订阅者的 C++ 示例程序:
// 文件: pubsub.proto
syntax = "proto3";
package pubsub;
message Message {
string content = 1;
}
service Publisher {
rpc Publish(Message) returns (google.protobuf.Empty);
}
service Subscriber {
rpc Subscribe(google.protobuf.Empty) returns (stream Message);
}
// 文件: pubsub_server.cpp
#include <iostream>
#include <memory>
#include <string>
#include <unordered_map>
#include <grpcpp/grpcpp.h>
#include "pubsub.grpc.pb.h"
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using pubsub::Message;
using pubsub::Publisher;
using pubsub::Subscriber;
class PubSubServiceImpl final : public Publisher::Service, public Subscriber::Service {
public:
Status Publish(ServerContext* context, const Message* request, google::protobuf::Empty* response) override {
std::string content = request->content();
std::cout << "Received message: " << content << std::endl;
// 将消息发送给所有订阅者
for (const auto& subscriber : subscribers_) {
grpc::ServerWriter<Message>* writer = subscriber.second;
writer->Write(*request);
}
return Status::OK;
}
Status Subscribe(ServerContext* context, const google::protobuf::Empty* request, grpc::ServerWriter<Message>* writer) override {
std::string subscriber_id = std::to_string(reinterpret_cast<int64_t>(writer));
subscribers_[subscriber_id] = writer;
// 保持长连接,等待订阅者断开连接
while (!context->IsCancelled()) {
// 等待新消息到达
}
subscribers_.erase(subscriber_id);
return Status::OK;
}
private:
std::unordered_map<std::string, grpc::ServerWriter<Message>*> subscribers_;
};
void RunServer() {
std::string server_address("0.0.0.0:50051");
PubSubServiceImpl service;
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
}
int main() {
RunServer();
return 0;
}
// 文件: pubsub_client.cpp
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
#include "pubsub.grpc.pb.h"
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using pubsub::Message;
using pubsub::Publisher;
using pubsub::Subscriber;
class PublisherClient {
public:
PublisherClient(std::shared_ptr<Channel> channel)
: stub_(Publisher::NewStub(channel)) {}
void Publish(const std::string& content) {
Message request;
request.set_content(content);
google::protobuf::Empty response;
ClientContext context;
Status status = stub_->Publish(&context, request, &response);
if (status.ok()) {
std::cout << "Message published: " << content << std::endl;
} else {
std::cout << "Failed to publish message: " << status.error_message() << std::endl;
}
}
private:
std::unique_ptr<Publisher::Stub> stub_;
};
class SubscriberClient {
public:
SubscriberClient(std::shared_ptr<Channel> channel)
: stub_(Subscriber::NewStub(channel)) {}
void Subscribe() {
google::protobuf::Empty request;
ClientContext context;
std::unique_ptr<grpc::ClientReader<Message>> reader(stub_->Subscribe(&context, request));
Message message;
while (reader->Read(&message)) {
std::string content = message.content();
std::cout << "Received message: " << content << std::endl;
}
Status status = reader->Finish();
if (status.ok()) {
std::cout << "Subscription finished" << std::endl;
} else {
std::cout << "Failed to subscribe: " << status.error_message() << std::endl;
}
}
private:
std::unique_ptr<Subscriber::Stub> stub_;
};
int main() {
std::string server_address("localhost:50051");
PublisherClient publisher(grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials()));
SubscriberClient subscriber(grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials()));
// 发布消息
publisher.Publish("Hello");
publisher.Publish("World");
// 订阅消息
subscriber.Subscribe();
return 0;
}
在终端中,首先运行 pubsub_server.cpp 启动服务器,然后运行 pubsub_client.cpp 来发布和订阅消息。可以在终端中看到相应的输出。
这个示例程序实现了一个简单的发布-订阅系统,支持多个发布者和多个订阅者。发布者可以发布消息,订阅者可以订阅消息并接收到发布者发送的消息。
原文地址: https://www.cveoy.top/t/topic/o98N 著作权归作者所有。请勿转载和采集!