NATS的发布订阅功能代码是怎么实现的?点对点模式的通信是怎么实现的?
NATS的发布订阅功能通过主题(subject)来实现。发布者(publisher)将消息发布到特定的主题上,订阅者(subscriber)则通过订阅这个主题来接收相应的消息。NATS支持不同的主题层级结构,例如"foo.bar.baz"和"foo.*",这使得订阅者可以订阅多个主题或者特定的子主题。
下面是一个基本的发布订阅模式的代码示例:
// 创建连接
nc, _ := nats.Connect("nats://localhost:4222")
// 发布消息
nc.Publish("foo.bar", []byte("Hello, World!"))
// 订阅主题
nc.Subscribe("foo.*", func(m *nats.Msg) {
fmt.Printf("Received message: %s\n", string(m.Data))
})
在点对点模式中,NATS支持两种通信方式:请求-响应(request-reply)和队列组(queue groups)。
请求-响应模式中,客户端(requestor)向服务端(responder)发送一个请求,服务端处理请求并返回响应。客户端会等待响应返回,因此这种方式通常用于需要同步处理的场景。下面是一个请求-响应模式的代码示例:
// 创建连接
nc, _ := nats.Connect("nats://localhost:4222")
// 发送请求并等待响应
msg, _ := nc.Request("foo", []byte("Hello, World!"), 100*time.Millisecond)
fmt.Printf("Received response: %s\n", string(msg.Data))
队列组模式中,多个消费者(workers)订阅同一个队列组,当消息发布到这个组时,只有其中一个消费者会处理该消息。这种方式通常用于平衡工作负载或实现高可用性。下面是一个队列组模式的代码示例:
// 创建连接
nc, _ := nats.Connect("nats://localhost:4222")
// 订阅队列组
nc.QueueSubscribe("foo", "worker_group", func(m *nats.Msg) {
fmt.Printf("Received message: %s\n", string(m.Data))
// 处理消息
m.Respond([]byte("OK"))
})
``
原文地址: https://www.cveoy.top/t/topic/hfH4 著作权归作者所有。请勿转载和采集!