什么是发布订阅
消息的发送者称为发布者(Publisher),消息的接收者称为订阅者(Subscriber);
发布订阅(Publish-Subscribe)是一种常用的分布式通信模式;
它基于消息传递实现了解耦和异步通信。
在发布订阅模式中,发布者和订阅者之间并不直接进行通信,而是通过消息代理进行交互。
消息的生产者(发布者)将消息发送到一个中心地点,称为消息代理(Message Broker),而消费者(订阅者)则从消息代理订阅感兴趣的消息类型。
发布订阅模式的基本工作原理:
发布者(Publisher):
发布者负责产生消息,并将其发送到消息代理。发布者不需要知道哪些订阅者会接收到消息,它只需要将消息发送给消息代理即可。
消息代理(Message Broker):
消息代理是发布订阅模式的核心组件。它接收到发布者发送的消息,并根据订阅者的注册信息将消息分发给相应的订阅者。消息代理可以是中心化的,也可以是分布式的。
订阅者(Subscriber):
订阅者通过注册自己感兴趣的消息类型来接收消息。当有新的消息到达消息代理时,消息代理会将该消息发送给所有订阅了该类型消息的订阅者。
主题(Topic):
主题是发布订阅模式中用于分类和组织消息的概念。发布者将消息发送到特定的主题,而订阅者则根据主题选择感兴趣的消息进行订阅。
发布订阅模式的特点和优势:
- 解耦:
发布者和订阅者之间通过消息代理进行通信,彼此之间不直接依赖,实现了解耦。发布者和订阅者可以独立演化,不需要知道对方的存在。 - 异步通信:
发布订阅模式是异步的,发布者和订阅者可以独立处理消息,提高系统的响应性能。 - 灵活性:
发布订阅模式支持动态增加或移除发布者和订阅者。系统可以根据需求灵活地调整消息的发送和接收。 - 可靠性:
消息代理负责消息的传递和分发,可以进行消息的持久化和可靠性保证,确保消息能够被正确地传递给订阅者。 - 一对多通信:
发布-订阅模式允许一个消息发布者将消息发送给多个订阅者。发布者不需要关心有多少个订阅者,只需将消息发布到相应的主题中,所有订阅该主题的订阅者都可以接收到消息。这种一对多的通信特点使得消息可以被广播到多个订阅者,满足多个接收方的需求。 - 扩展性:
发布订阅模式支持横向扩展,可以通过增加消息代理和订阅者来处理大量的消息和并发请求。
工作流程:
- 发布者将消息发布到消息代理或主题上。
- 消息代理或主题接收到消息后,将消息存储在内部的缓冲区中。
- 订阅者通过订阅特定的主题或者在消息代理上注册自己的兴趣,以表明对某种类型的消息感兴趣。
- 消息代理或主题根据订阅者的注册信息,将消息推送给相应的订阅者。
- 订阅者接收到消息后进行相应的处理。
常用场景:
实时数据处理:
在大规模数据处理系统中,发布-订阅模式可以用于实时数据流的传输和处理。例如,订阅者可以订阅某个主题,接收实时的数据更新。
消息队列:
发布-订阅模式广泛应用于消息队列系统中,用于解耦消息的发送者和接收者。消息发布者将消息发布到队列上,而消息订阅者从队列上接收并处理消息。
事件驱动架构:
发布-订阅模式可以用于实现事件驱动架构,其中发布者产生事件并将其发布到代理或主题上,而订阅者监听事件并根据需要进行相应的响应。
分布式系统通信:
在分布式系统中,不同节点之间需要进行通信和协作。发布-订阅模式可以用于节点之间的消息传递和事件触发。
实时推送:
在Web应用程序中,可以使用发布-订阅模式实现实时推送功能。例如,服务器可以将消息发布到特定主题上,而浏览器作为订阅者接收并展示推送的数据。
代码示例
#include <iostream>
#include <vector>
#include <thread>
#include <functional>
#include <mutex>// 定义一个简单的消息结构
struct Message {std::string topic;std::string content;
};// 定义订阅者类
class Subscriber {
public:Subscriber(const std::string& topic) : topic(topic) {}void notify(const Message& message) {if (message.topic == topic) {std::cout << "Received message on topic '" << topic << "': " << message.content << std::endl;}}private:std::string topic;
};// 定义发布者类
class Publisher {
public:void subscribe(Subscriber* subscriber, const std::string& topic) {std::unique_lock<std::mutex> lock(mutex);subscribers[topic].push_back(subscriber);}void unsubscribe(Subscriber* subscriber, const std::string& topic) {std::unique_lock<std::mutex> lock(mutex);if (subscribers.count(topic) > 0) {auto& topicSubscribers = subscribers[topic];topicSubscribers.erase(std::remove(topicSubscribers.begin(), topicSubscribers.end(), subscriber), topicSubscribers.end());}}void publish(const Message& message) {std::unique_lock<std::mutex> lock(mutex);if (subscribers.count(message.topic) > 0) {auto& topicSubscribers = subscribers[message.topic];for (auto subscriber : topicSubscribers) {subscriber->notify(message);}}}private:std::mutex mutex;std::map<std::string, std::vector<Subscriber*>> subscribers;
}int main() {Publisher publisher;// 创建订阅者并订阅特定主题Subscriber subscriber1("topic1");Subscriber subscriber2("topic2");publisher.subscribe(&subscriber1, "topic1");publisher.subscribe(&subscriber2, "topic2");// 发布消息Message message1{"topic1", "Hello, topic1!"};Message message2{"topic2", "Hello, topic2!"};publisher.publish(message1);publisher.publish(message2);// 取消订阅并删除订阅者publisher.unsubscribe(&subscriber1, "topic1");publisher.unsubscribe(&subscriber2, "topic2");return 0;
}
缺点
- 无法保证消息的顺序性:
在发布-订阅模式中,消息是通过主题进行传递的,而不是直接发送给特定的订阅者。这种间接的消息传递机制可能导致消息的顺序性无法得到保证。如果订阅者对于消息的顺序有严格的要求,那么发布-订阅模式可能无法满足这个需求。 - 可能存在消息丢失风险:
在发布-订阅模式中,消息发布者将消息发布到主题中,而订阅者通过订阅感兴趣的主题来接收消息。如果订阅者未能及时订阅或处理消息,那么可能会出现消息丢失的情况。这种消息丢失风险需要在系统设计和实现时进行充分考虑和处理。 - 可能存在订阅者处理压力:
在发布-订阅模式中,一个主题可以有多个订阅者,而每个订阅者都可能接收到大量的消息。如果某个订阅者的处理能力有限,那么可能会导致该订阅者处理压力过大,影响系统的整体性能和可靠性。在设计时需要考虑如何平衡消息的分发和订阅者的处理能力。 - 需要管理主题和订阅关系:
在发布-订阅模式中,需要管理主题和订阅关系的注册和取消注册过程。这可能需要引入额外的管理机制和复杂性。如果主题和订阅关系的管理不当,可能会导致系统出现混乱和不一致的情况。 - 可能存在消息过滤复杂性:
在发布-订阅模式中,订阅者可以选择订阅感兴趣的主题来接收消息。然而,当系统中存在大量主题和订阅者时,可能会导致消息过滤的复杂性增加。订阅者需要维护自己感兴趣的主题列表,并且必须处理来自多个主题的消息,这可能增加了订阅者的负担。