一、发布/订阅
发布/订阅(Pub/Sub)方式,在互联网中应用是极其广泛的,可以不负责任的说,只要上过网就用到过这种消息通信模式。发布/订阅基于异步通信模式,允许消息的生产者(Publisher)和消费者(subscriber)通过中间代理(Broker)进行通信 。发布者和订阅者解耦,它们互相无感,订阅者只关注自己的主题(Topic)即可。这种方式可以实现高效、灵活的消息分发和控制。
大家都知道,只要解耦,就意味着整个消息机制的可扩展性大大增强,这也是发布/订阅应用广泛的一个重要原因。
发布/订阅方式的主要核心模块包括:
1、发布者(Publisher):其实就是提到的消息的生产者,它可以把消息分配到主题(Topic)
2、订阅者(Subscriber):就是消息的消费者,它可以根据自己的爱好来接收某个主题下的消息
3、消息代理(Broker):消息的分发者,控制着消息的路由,维护发布者和订阅者的对应关系
4、主题(Topic):主题可以认为是收音机的频道,它是一个逻辑概念,是发布者和订阅者间消息分类组织的交互通道
这种方式的优越性很高,它实现生产者和消费者间的完全解耦,并且使用了异步的通信机制,消除了同步的等待,提高了吞吐量且可以实现削峰添谷。同时,主题机制又可以使订阅者灵活的选择和删除相关的消息接收。而中间代理的出现则增加了消息的广播,即发布许/订阅方式实现了多对多的场景。在其内部又利用缓存机制,保证了历史数据的安全消费,达到了消息离线和在线的整体逻辑控制。
一般来说,常见的MQ框架等都支持这种方式,如Kafka、Pulsar、ActiveMQ、RocketMQ、ZeroMQ、RabbitMQ等等吧,对于互联网开发者来说这种方式已经是司空见惯了。
二、与推拉机制的关系
如果思考一下,发现发布/订阅方式与推拉机制有着很相似的应用场景,而其实现的逻辑也大抵相同。其实实际情况也差不多,大家可以看发布/订阅模式当成一种更高级的推拉模式的封装应用。不过发布/订阅方式由于抽象层更高,实现了更高层次的解耦并可以在分布式系统中实现动态路由。同时,在发布/订阅方式中,由于使用了borker中间代理人的角色,而不是简单的服务端和客户端的直接联系,这也是为什么这种方式解耦更好的原因所在。
虽然说一般情况下是broker将消息推送到订阅者,但也允许订阅者自行拉取相关的消息,这个就看设计实现者的意图了。典型的如Kafka就是如此。另外,在很多的发布/订阅方式中,采用了长轮询的方式,其实这就一种混合模式。看到这里,应该就明白了设计不是千篇一律的,它一定是动态的、发展的和可适应性的。实际的需求和条件决定了一切。
大家可以看推拉模式当成发布/订阅方式的基础实现,而后者是前者更高层次的抽象。这也是设计原则中经常提到的,面向抽象编程的一个重要体现。
三、应用场景
现在送牛奶的其实就有点类似这个行为,消费者可以订不同的奶(鲜奶、酸奶等Topic),也可以随时取消或增加。而生产者则会让送货员按时按期送达消费者而不用二者见面(异步)。而一个奶箱可以一个单元共享(多播、缓存)。所以说,计算机中应用的技术,基本都可以在现实世界上进行映射,毕竟计算机的出现就是为了解决现实中的问题。因此大家不要迷信技术。
明白了发布/订阅方式的主要原理就可以想象得到其应用的主要场景了:
1、通知系统:如订单更新、新闻推送、物流变化等等
2、日志处理系统:将日志分类到主题,然后交订阅者进行处理分析
3、IOT控制:这个大家用得很多,但感觉很少。它主要是对传感器数据进行主题划分然后交到感 兴趣的订阅方处理,比如马路上的监控探头
4、微服务通信:这个就和开发者比较接近了,特别是互联网后台的开发者,此处不展开
其实大家明白了发布/订阅方式后,一些细节在设计中非常重要,比如消息的传递形式,是使用JSON还是流或者其它;另外如何设置消息的过滤;需不需要消息的持久化以及是否支持重试保证消息的准确送达。另外,对一些安全等级高的,是不是要划分可靠性的等级,如何处理一些异常的状态,如消息无法正常投递等等。
它们当中相当多的问题是互相牵制的,比如消息内容形式就影响消息的过滤,重试又可能影响可靠性。所以在实际应用中,就需要设计者根据应用的特点来进行取舍,不能教条。
发布/订阅方式多用于互联网的分布式系统中,但不代表在本地无法使用。当然,本地使用常见的一些框架可能有些重或者说得不偿失,那么就可以自己利用一些设计模式(如观察者模式等)进行模拟实现。实现的手段不是关键,关键在设计者的目的是否达到。
四、实例分析
下面利用ZeroMQ实现一个简单的发布/订阅的例子:
//Pub
#include <zmq.hpp>
#include <iostream>
#include <string>
#include <chrono>
#include <thread>int main() {zmq::context_t context(1);zmq::socket_t pub(context, ZMQ_PUB);pub.bind("tcp://*:5555"); sleep(1); int count = 0;while (true) {std::string topic = "news"; std::string message = "msg " + std::to_string(++count);// subject+delimiter+contentzmq::message_t zmqTopic(topic.size());memcpy(zmqTopic.data(), topic.c_str(), topic.size());zmq::message_t zmqMsg(message.size());memcpy(zmqMsg.data(), message.c_str(), message.size());publisher.send(zmqTopic, zmq::send_flags::sndmore);publisher.send(zmqMsg, zmq::send_flags::none);sleep(1);}return 0;
}//Sub
#include <zmq.hpp>
#include <iostream>
#include <string>int main() {zmq::context_t context(1);zmq::socket_t sub(context, ZMQ_SUB);sub.connect("tcp://localhost:5555"); const std::string topicFilter = "news";sub.set(zmq::sockopt::subscribe, topicFilter);while (true) {zmq::message_t rTopic;zmq::message_t rMsg;sub.recv(rTopic);sub.recv(rMsg);std::string topic(static_cast<char*>(rTopic.data()), rTopic.size());std::string msg(static_cast<char*>(rMsg.data()), rMsg.size());std::cout << "recv msg: [" << topic << "] " << msg << std::endl;}return 0;
}
代码不复杂,就不展开说明了。
五、总结
目前,智能驾驶这个赛道火得很,但其中的车联网系统中就广泛使用了这种机制。所以说,掌握好原理才能更好的拓展一项技术的应用,而不只是机械的模仿。需要说明的是,本篇不是分析发布/订阅机制内部实现的文章,而重点在于将其如何应用于设计当中。尽管篇幅不大,但希望给大家一个另外的角度做为切入点,也就是说,要从不同的角度来看待一个技术问题,才能更好更全面的明白其在整个技术框架中的作用。