以下是一个基于 gRPC 实现多发布者多订阅者的 C++ 示例程序:

// 服务端实现
#include <iostream>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include <grpcpp/grpcpp.h>

#include "pubsub.grpc.pb.h"

using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;

using pubsub::PubSub;
using pubsub::PublishRequest;
using pubsub::SubscribeRequest;
using pubsub::SubscribeResponse;

class PubSubServiceImpl final : public PubSub::Service {
public:
    Status Publish(ServerContext* context, const PublishRequest* request,
                   pubsub::PublishResponse* response) override {
        std::string topic = request->topic();
        std::string message = request->message();

        std::cout << "Received message: " << message << " for topic: " << topic << std::endl;

        // 将消息添加到相应的订阅者的消息队列中
        for (const auto& subscriber : subscribers_[topic]) {
            subscriber->add_message(message);
        }

        return Status::OK;
    }

    Status Subscribe(ServerContext* context, const SubscribeRequest* request,
                     grpc::ServerWriter<SubscribeResponse>* writer) override {
        std::string topic = request->topic();

        std::cout << "New subscriber for topic: " << topic << std::endl;

        // 创建一个新的 Subscriber 对象并添加到订阅者列表中
        std::shared_ptr<Subscriber> subscriber = std::make_shared<Subscriber>(writer);
        subscribers_[topic].push_back(subscriber);

        // 持续向客户端发送消息直到客户端关闭连接
        while (true) {
            std::string message = subscriber->get_message();
            if (!message.empty()) {
                SubscribeResponse response;
                response.set_message(message);
                writer->Write(response);
            } else {
                break;
            }
        }

        // 从订阅者列表中移除该订阅者
        subscribers_[topic].remove(subscriber);

        return Status::OK;
    }

private:
    // 订阅者类,用于保存订阅者的消息队列和写入器
    class Subscriber {
    public:
        explicit Subscriber(grpc::ServerWriter<SubscribeResponse>* writer) : writer_(writer) {}

        void add_message(const std::string& message) {
            messages_.push_back(message);
        }

        std::string get_message() {
            if (!messages_.empty()) {
                std::string message = messages_.front();
                messages_.pop_front();
                return message;
            } else {
                return "";
            }
        }

    private:
        grpc::ServerWriter<SubscribeResponse>* writer_;
        std::deque<std::string> messages_;
    };

    // 使用 unordered_map 保存订阅者列表
    std::unordered_map<std::string, std::list<std::shared_ptr<Subscriber>>> subscribers_;
};

void RunServer() {
    std::string server_address("0.0.0.0:50051");
    PubSubServiceImpl service;

    ServerBuilder builder;
    builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
    builder.RegisterService(&service);

    std::unique_ptr<Server> server(builder.BuildAndStart());
    std::cout << "Server listening on " << server_address << std::endl;

    server->Wait();
}

int main() {
    RunServer();

    return 0;
}
// 客户端实现
#include <iostream>
#include <memory>
#include <string>

#include <grpcpp/grpcpp.h>

#include "pubsub.grpc.pb.h"

using grpc::Channel;
using grpc::ClientContext;
using grpc::ClientReader;
using grpc::Status;

using pubsub::PubSub;
using pubsub::PublishRequest;
using pubsub::PublishResponse;
using pubsub::SubscribeRequest;
using pubsub::SubscribeResponse;

class PubSubClient {
public:
    PubSubClient(std::shared_ptr<Channel> channel) : stub_(PubSub::NewStub(channel)) {}

    void Publish(const std::string& topic, const std::string& message) {
        PublishRequest request;
        request.set_topic(topic);
        request.set_message(message);

        PublishResponse response;

        ClientContext context;

        Status status = stub_->Publish(&context, request, &response);

        if (status.ok()) {
            std::cout << "Message published successfully" << std::endl;
        } else {
            std::cout << "Failed to publish message: " << status.error_code() << ": " << status.error_message()
                      << std::endl;
        }
    }

    void Subscribe(const std::string& topic) {
        SubscribeRequest request;
        request.set_topic(topic);

        ClientContext context;

        std::shared_ptr<ClientReader<SubscribeResponse>> reader(stub_->Subscribe(&context, request));

        SubscribeResponse response;

        while (reader->Read(&response)) {
            std::cout << "Received message: " << response.message() << std::endl;
        }

        Status status = reader->Finish();

        if (status.ok()) {
            std::cout << "Subscription finished" << std::endl;
        } else {
            std::cout << "Failed to subscribe: " << status.error_code() << ": " << status.error_message() << std::endl;
        }
    }

private:
    std::unique_ptr<PubSub::Stub> stub_;
};

int main() {
    std::string server_address("localhost:50051");

    PubSubClient client(grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials()));

    // 发布消息
    client.Publish("topic1", "message1");
    client.Publish("topic2", "message2");

    // 订阅消息
    client.Subscribe("topic1");
    client.Subscribe("topic2");

    return 0;
}

以上示例程序演示了一个简单的发布-订阅系统,其中包含一个服务端和一个客户端。服务端实现了 PublishSubscribe 两个 gRPC 接口,用于发布消息和订阅消息。客户端通过调用服务端的接口来发布和订阅消息。

服务端使用 unordered_map 来保存不同主题的订阅者列表。当收到一个发布请求时,服务端将消息添加到相应主题的订阅者的消息队列中。当有新的订阅请求时,服务端将创建一个新的订阅者对象并将其添加到相应主题的订阅者列表中,然后持续向客户端发送消息直到客户端关闭连接。

客户端通过调用 Publish 接口来发布消息,通过调用 Subscribe 接口来订阅消息。订阅消息时,客户端会持续从服务端接收消息并打印出来,直到订阅结束。

你可以根据自己的需求修改和扩展以上示例程序。


原文地址: https://www.cveoy.top/t/topic/o98S 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录