文章目录
- 0. 引言
- 1. 设计概要
- 1.1 主要组件
- 1.2 类关系图
- 1.3 工作流程
- 2. 代码实现
- 2.1. 定义数据结构
- 2.2. 实现 DataVisitor
- 2.3. 实现 DataDispatcher
- 2.4. 实现 Receiver
- 2.5. 实现具体的 DataVisitor
- 2.6. 示例主程序
- 2.7. 编译和运行
0. 引言
使用 C++ 实现一个类似CyberRT 架构的 DataVisitor
和 DataDispatcher
。在 CyberRT 中:
- Receiver 接收到消息后,会触发回调。
- 回调中调用 DataDispatcher(消息分发器)发布消息。
- DataDispatcher 是一个单例,负责所有的数据分发,并将数据放入对应的缓存中。
- 然后,DataDispatcher 会通知对应的协程(在此简化为线程)去处理消息。
- DataVisitor(消息访问器)是辅助类,用于管理数据处理过程,包括注册通知机制和绑定回调函数。
1. 设计概要
1.1 主要组件
-
- 单例模式。
- 管理所有
DataVisitor
。 - 分发数据到对应的
DataVisitor
的缓冲区。 - 通知
DataVisitor
处理数据。
-
- 负责特定类型数据的处理。
- 包含一个线程,等待
DataDispatcher
的通知。 - 绑定一个回调函数用于处理数据。
- 管理自己的数据缓冲区。
-
Receiver:
- 模拟消息接收器,接收到消息后调用
DataDispatcher
发布数据。
- 模拟消息接收器,接收到消息后调用
1.2 类关系图
以下类关系图,反映了 DataDispatcher
作为单例管理多个 DataVisitor
,并与 Receiver
交互的关系。
1.3 工作流程
- Receiver 接收到消息后,调用
DataDispatcher::Instance()->Dispatch(data)
。 - DataDispatcher 将数据放入对应的
DataVisitor
的缓冲区。 - DataDispatcher 通知对应的
DataVisitor
。 - DataVisitor 的线程被唤醒,取出数据并执行绑定的回调函数进行处理。
2. 代码实现
完整代码见:data-visitor-dispatcher
2.1. 定义数据结构
首先,定义一个通用的数据结构 Data
。
// data.h
#ifndef DATA_H
#define DATA_H#include <string>// 定义一个通用的数据类型
struct Data {int id;std::string content;
};#endif // DATA_H
DataVisitor_102">2.2. 实现 DataVisitor
DataVisitor
负责处理特定类型的数据。它包含一个线程,该线程等待 DataDispatcher
的通知,然后处理数据。
// data_visitor.h
#ifndef DATA_VISITOR_H
#define DATA_VISITOR_H#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>#include "data.h"class DataVisitor : public std::enable_shared_from_this<DataVisitor> {public:using Callback = std::function<void(std::shared_ptr<Data>)>;// 构造函数,绑定回调函数explicit DataVisitor(Callback callback) : callback_(callback), stop_flag_(false) {worker_thread_ = std::thread(&DataVisitor::ProcessData, this);}// 析构函数,确保线程安全停止~DataVisitor() {Stop();if (worker_thread_.joinable()) {worker_thread_.join();}}// 用于 DataDispatcher 发布数据时调用void Notify(std::shared_ptr<Data> data) {{std::lock_guard<std::mutex> lock(queue_mutex_);data_queue_.push(data);}cv_.notify_one();}private:// 数据处理线程函数void ProcessData() {while (!stop_flag_) {std::unique_lock<std::mutex> lock(queue_mutex_);cv_.wait(lock, [this]() { return stop_flag_ || !data_queue_.empty(); });while (!data_queue_.empty()) {auto data = data_queue_.front();data_queue_.pop();lock.unlock();// 执行绑定的回调函数if (callback_) {try {callback_(data);} catch (const std::exception& e) {// 处理回调中的异常,防止线程终止// 可以记录日志或采取其他措施// 这里简单输出错误信息std::cerr << "Exception in callback: " << e.what() << std::endl;}}lock.lock();}}}// 停止线程void Stop() {stop_flag_ = true;cv_.notify_all();}Callback callback_;std::thread worker_thread_;std::mutex queue_mutex_;std::condition_variable cv_;std::queue<std::shared_ptr<Data>> data_queue_;std::atomic<bool> stop_flag_;
};#endif // DATA_VISITOR_H
DataDispatcher_194">2.3. 实现 DataDispatcher
DataDispatcher
是一个单例,负责管理所有的 DataVisitor
并分发数据。
// data_dispatcher.h
#ifndef DATA_DISPATCHER_H
#define DATA_DISPATCHER_H#include <algorithm>
#include <memory>
#include <mutex>
#include <vector>#include "data.h"
#include "data_visitor.h"// 单例的 DataDispatcher
class DataDispatcher {public:// 获取单例实例static DataDispatcher& Instance() {static DataDispatcher instance;return instance;}// 禁止拷贝和赋值DataDispatcher(const DataDispatcher&) = delete;DataDispatcher& operator=(const DataDispatcher&) = delete;// 注册一个 DataVisitorvoid RegisterVisitor(const std::shared_ptr<DataVisitor>& visitor) {std::lock_guard<std::mutex> lock(mutex_);visitors_.emplace_back(visitor);}// 注销一个 DataVisitorvoid UnregisterVisitor(const std::shared_ptr<DataVisitor>& visitor) {std::lock_guard<std::mutex> lock(mutex_);visitors_.erase(std::remove(visitors_.begin(), visitors_.end(), visitor), visitors_.end());}// 分发数据到所有注册的 DataVisitorvoid Dispatch(const std::shared_ptr<Data>& data) {std::lock_guard<std::mutex> lock(mutex_);for (auto& visitor : visitors_) {if (visitor) {visitor->Notify(data);}}}private:// 私有构造函数DataDispatcher() = default;~DataDispatcher() = default;std::vector<std::shared_ptr<DataVisitor>> visitors_;std::mutex mutex_;
};#endif // DATA_DISPATCHER_H
2.4. 实现 Receiver
模拟消息接收器,每当接收到消息时,调用 DataDispatcher
分发数据。
// receiver.h
#ifndef RECEIVER_H
#define RECEIVER_H#include <functional>
#include <memory>
#include <string>#include "data.h"
#include "data_dispatcher.h"// Receiver,模拟消息接收器
class Receiver {public:using Callback = std::function<void(std::shared_ptr<Data>)>;// 构造函数,绑定回调函数explicit Receiver(Callback callback) : callback_(callback) {}// 模拟接收消息void ReceiveMessage(int id, const std::string& content) {auto data = std::make_shared<Data>();data->id = id;data->content = content;// 触发回调if (callback_) {callback_(data);}}private:Callback callback_;
};#endif // RECEIVER_H
DataVisitor_302">2.5. 实现具体的 DataVisitor
例如,创建 LoggingVisitor
和 ProcessingVisitor
,它们各自有不同的处理逻辑。
// logging_visitor.h
#ifndef LOGGING_VISITOR_H
#define LOGGING_VISITOR_H#include <iostream>
#include <memory>
#include "data_visitor.h"// LoggingVisitor,负责记录数据
class LoggingVisitor {public:// 创建一个 LoggingVisitor 的 DataVisitor 实例static std::shared_ptr<DataVisitor> Create() {return std::make_shared<DataVisitor>([](std::shared_ptr<Data> data) {std::cout << "[LoggingVisitor] Received data: ID=" << data->id << ", Content=\"" << data->content << "\""<< std::endl;});}
};#endif // LOGGING_VISITOR_H
// processing_visitor.h
#ifndef PROCESSING_VISITOR_H
#define PROCESSING_VISITOR_H#include <iostream>
#include <memory>
#include "data_visitor.h"// ProcessingVisitor,负责处理数据
class ProcessingVisitor {public:// 创建一个 ProcessingVisitor 的 DataVisitor 实例static std::shared_ptr<DataVisitor> Create() {return std::make_shared<DataVisitor>([](std::shared_ptr<Data> data) {// 简单示例:打印数据长度std::cout << "[ProcessingVisitor] Processed data ID=" << data->id << ", Length=" << data->content.length()<< std::endl;});}
};#endif // PROCESSING_VISITOR_H
2.6. 示例主程序
展示如何使用上述组件,实现数据接收、分发和处理。
// main.cpp
#include <chrono>
#include <iostream>
#include <memory>
#include <thread>#include "data_dispatcher.h"
#include "logging_visitor.h"
#include "processing_visitor.h"
#include "receiver.h"int main() {// 创建并注册 DataVisitorauto logger = LoggingVisitor::Create();auto processor = ProcessingVisitor::Create();DataDispatcher::Instance().RegisterVisitor(logger);DataDispatcher::Instance().RegisterVisitor(processor);// 创建 Receiver,并绑定 DataDispatcher 的 Dispatch 方法Receiver receiver([](std::shared_ptr<Data> data) { DataDispatcher::Instance().Dispatch(data); });// 模拟接收消息std::cout << "=== 接收第1条消息 ===" << std::endl;receiver.ReceiveMessage(1, "Hello, CyberRT!");std::cout << "=== 接收第2条消息 ===" << std::endl;receiver.ReceiveMessage(2, "Another data packet.");// 等待一段时间以确保所有消息被处理std::this_thread::sleep_for(std::chrono::seconds(1));// 注销一个 DataVisitorstd::cout << "\n=== 移除 LoggingVisitor ===" << std::endl;DataDispatcher::Instance().UnregisterVisitor(logger);// 再次接收消息std::cout << "=== 接收第3条消息 ===" << std::endl;receiver.ReceiveMessage(3, "Data after removing logger.");// 等待一段时间以确保消息被处理std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "\n=== 程序结束 ===" << std::endl;return 0;
}
2.7. 编译和运行
假设所有文件在同一目录下,可以使用以下命令进行编译:
g++ -std=c++14 main.cpp -o dispatcher -pthread
运行程序后,您将看到类似如下的输出:
test@pi:~/dataVisitor$ g++ -std=c++14 main.cpp -o dispatcher -pthread
test@pi:~/dataVisitor$ ./dispatcher
=== 接收第1条消息 ===
=== 接收第2条消息 ===
[LoggingVisitor] Received data: ID=[ProcessingVisitor] Processed data ID=11, Content="Hello, CyberRT!"
[LoggingVisitor] Received data: ID=2, Content="Another data packet."
, Length=15
[ProcessingVisitor] Processed data ID=2, Length=20=== 移除 LoggingVisitor ===
=== 接收第3条消息 ===
[ProcessingVisitor] Processed data ID=3, Length=27=== 程序结束 ===
注意,第三条消息只被 ProcessingVisitor
处理,因为 LoggingVisitor
已被移除。