C++使用开源ConcurrentQueue库处理自定义业务数据类

news/2024/11/16 0:12:58/

ConcurrentQueue_0">ConcurrentQueue开源库介绍

ConcurrentQueue是一个高性能的、线程安全的并发队列库。它旨在提供高效、无锁的数据结构,适用于多线程环境中的数据交换。concurrentqueue 支持多个生产者和多个消费者,并且提供了多种配置选项来优化性能和内存使用。

ConcurrentQueue_3">ConcurrentQueue使用

0x01 使用场景说明

我的数据平台在接收到四种不同的业务数据时,需要按数据分类写进RocketMQ

0x02 自定义类用来存放和区分数据流

  • 设计BusinessFlowMsg
  1. 该类有定义消息的类型
  2. 该类中设计ST_BusinessSign结构体消息头,用来区分消息和获取消息体的长度
  3. BusinessFlowMsg类可以存放的数据长度为512KB
#ifndef BUSINESSFLOWMSG_HPP
#define BUSINESSFLOWMSG_HPP#include <string.h>#define MSG_ROCKETMQ_PNG 0x01
#define MSG_ROCKETMQ_AIS 0x02
#define MSG_ROCKETMQ_ROUTE 0x03
#define MSG_ROCKETMQ_VOYAGE 0x04#pragma pack(push)
#pragma pack(1)// 消息头
typedef struct s_BusinessSign
{int sign; // 业务标识unsigned int length; // 消息体的长度
}ST_BusinessSign;#pragma pack(pop)class BusinessFlowMsg
{
public:BusinessFlowMsg() = default;~BusinessFlowMsg() = default;char* get_data(){return _data;}int data_size(){return businessSign.length;}char* get_body(){return _data + sizeof(ST_BusinessSign);}int body_size(){return data_size() - sizeof(ST_BusinessSign);}ST_BusinessSign* header(){return &businessSign;}bool set_data(const char* data, int length, int sign){if(length > (max_body_len + sizeof(ST_BusinessSign))){return false;}businessSign.sign = sign;businessSign.length = length;memcpy(_data + sizeof(ST_BusinessSign), data, length);return true;}private:enum{max_body_len = 512 * 1024 // 512KB};ST_BusinessSign businessSign;char _data[max_body_len];
};#endif // BUSINESSFLOWMSG_HPP

0x03 创建PngUnit类模拟接到不同的业务数据

  • PngUnit类型创建了四个线程来模拟不同的数据流。
  • PngUnit类多线程中并未使用互斥锁,因为ConcurrentQueue是一个线程安全的并发队列库,事实证明确实如此。
#ifndef PNGUNIT_H
#define PNGUNIT_H#include <thread>
#include <mutex>class PngUnit
{
public:PngUnit();~PngUnit() = default;void start();void sendPNG(int sign);void sendAIS(int sign);void sendRoute(int sign);void sendVoyage(int sign);private:std::thread m_th_png;std::thread m_th_ais;std::thread m_th_route;std::thread m_th_voyage;// std::mutex queue_mutex;  // 互斥锁
};#endif // PNGUNIT_H

#include "pngunit.h"
#include <unistd.h>
#include "rocketmqutils.h"
#include "BusinessFlowMsg.hpp"
#include "json11/json11.hpp"PngUnit::PngUnit()
{}void PngUnit::start()
{// m_th = std::thread([this](){//     sendPNG(100);// });if(m_th_png.joinable()){printf("[%s:%d] %s\n", __FILE__, __LINE__, "m_th_png is running");return;}m_th_png = std::thread(std::bind(&PngUnit::sendPNG, this, MSG_ROCKETMQ_PNG));m_th_ais= std::thread(std::bind(&PngUnit::sendAIS, this, MSG_ROCKETMQ_AIS));m_th_route= std::thread(std::bind(&PngUnit::sendRoute, this, MSG_ROCKETMQ_ROUTE));m_th_voyage= std::thread(std::bind(&PngUnit::sendVoyage, this, MSG_ROCKETMQ_VOYAGE));
}void PngUnit::sendPNG(int sign)
{while (true){BusinessFlowMsg pngMsg;const char* pngFile = "1234567890ABCDEF";int fileLen = strlen(pngFile) + 1;pngMsg.set_data(pngFile, fileLen, sign);{// std::lock_guard<std::mutex> lock(queue_mutex);if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(pngMsg)){printf("Failed to set PNG message data");}}sleep(1);}
}void PngUnit::sendAIS(int sign)
{json11::Json::object obj = {{"message", "AIS"},{"response", "success"}};std::string jsonStr = json11::Json(obj).dump();while (true){BusinessFlowMsg aisMsg;int jsonStrLen = jsonStr.size();aisMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);{// std::lock_guard<std::mutex> lock(queue_mutex);if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(aisMsg)){printf("Failed to set AIS message data");}}sleep(2);}
}void PngUnit::sendRoute(int sign)
{json11::Json::object obj = {{"message", "Route"},{"response", "success"}};std::string jsonStr = json11::Json(obj).dump();while (true){BusinessFlowMsg routeMsg;int jsonStrLen = jsonStr.size();routeMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);{// std::lock_guard<std::mutex> lock(queue_mutex);if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(routeMsg)){printf("Failed to set ROUTE message data");}}sleep(3);}
}void PngUnit::sendVoyage(int sign)
{json11::Json::object obj = {{"message", "Voyage"},{"response", "success"}};std::string jsonStr = json11::Json(obj).dump();while (true){BusinessFlowMsg voyageMsg;int jsonStrLen = jsonStr.size();voyageMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);{// std::lock_guard<std::mutex> lock(queue_mutex);if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(voyageMsg)){printf("Failed to set VOYAGE message data");}}sleep(4);}
}

RocketMQUtilsConcurrentQueueRocketMQ_261">0x04 创建RocketMQUtils类,在ConcurrentQueue队列中获取数据写进RocketMQ

#ifndef ROCKETMQUTILS_H
#define ROCKETMQUTILS_H#include <thread>
#include <concurrentqueue/moodycamel/concurrentqueue.h>
#include "BusinessFlowMsg.hpp"class RocketMQUtils
{
public:static RocketMQUtils* Instance();private:RocketMQUtils();~RocketMQUtils()=default;RocketMQUtils(const RocketMQUtils &) = delete;RocketMQUtils& operator=(const RocketMQUtils &) = delete;RocketMQUtils(RocketMQUtils &&) = delete;RocketMQUtils& operator=(RocketMQUtils &&) = delete;public:void start();void push();void poll();bool write(char *data, int len, int sign);public:moodycamel::ConcurrentQueue<BusinessFlowMsg> g_businessQueue;private:static RocketMQUtils* _instance;std::thread _pushThread;
};#endif // ROCKETMQUTILS_H

#include "rocketmqutils.h"
#include <unistd.h>RocketMQUtils * RocketMQUtils::_instance = nullptr;RocketMQUtils *RocketMQUtils::Instance()
{if(_instance == nullptr){_instance = new RocketMQUtils();}return _instance;
}RocketMQUtils::RocketMQUtils()
{}void RocketMQUtils::start()
{if(_pushThread.joinable()){return;}_pushThread = std::thread(&RocketMQUtils::push, this);
}void RocketMQUtils::push()
{while (true){BusinessFlowMsg busiMsg;if(g_businessQueue.try_dequeue(busiMsg)){write(busiMsg.get_body(), busiMsg.header()->length, busiMsg.header()->sign);}else{printf("[%s:%d] %s\n", __FILE__, __LINE__, "g_businessQueue is empty");sleep(2);}}
}void RocketMQUtils::poll()
{}bool RocketMQUtils::write(char *data, int len, int sign)
{std::string msg(data, len);if (sign == MSG_ROCKETMQ_PNG){printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());}else if (sign == MSG_ROCKETMQ_AIS){printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());}else if (sign == MSG_ROCKETMQ_ROUTE){printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());}else if (sign == MSG_ROCKETMQ_VOYAGE){printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());}else{printf("[%s:%d] data sign error\n", __FILE__, __LINE__);}
}

0x05 使用演示

#include <iostream>
#include <memory>
#include "rocketmqutils.h"
#include "pngunit.h"using namespace std;int main()
{cout << "==Start==" << endl;RocketMQUtils* rocketmq = RocketMQUtils::Instance();rocketmq->start();std::shared_ptr<PngUnit> ptrPngUnit = std::make_shared<PngUnit>();ptrPngUnit->start();getchar();cout << "==Over==" << endl;return 0;
}

在这里插入图片描述


http://www.ppmy.cn/news/1547306.html

相关文章

Flink使用SQL Gateway提交SQL Job到远程集群

从Flink 1.16.0开始集成了SQL Gateway功能&#xff0c;提供了多种客户端远程并发执行SQL的能力。不用再使用提交jar包的方式来创建任务了。 我是使用filnk 1.17.1版本。 官网关于SQL Gateway的讲解&#xff1a;https://nightlies.apache.org/flink/flink-docs-release-1.17/z…

单片机 外部中断实验 实验三

实验三 外部中断实验 实验目的 1、掌握MCS-51单片机外部中断的原理。 2、掌握MCS-51单片机外部中断程序的设计方法及其过程。 3、掌握MCS-51单片机外部中断的电路应用。 实验任务 利用外部中断方式&#xff0c;实现通过按键切换流水灯的流向。流水灯形式自定&#xff…

Rust实战项目与未来发展——跨平台应用开发项目实践

第十章&#xff1a;实战项目与未来发展 第一节&#xff1a;跨平台应用开发项目实践 随着移动设备、桌面设备和Web平台之间界限的模糊&#xff0c;跨平台应用开发已成为开发者日常工作中不可或缺的一部分。随着技术栈的不断演进&#xff0c;开发者有更多选择来构建高效、易维护…

[AI] 从游戏到现实:强化学习的应用与挑战

随着AI技术的快速发展,强化学习(Reinforcement Learning, RL)逐渐成为人工智能领域的一个重要分支。尤其是在游戏领域,RL展示了极大的潜力:它可以在没有预先标记的数据情况下,通过智能体的互动和反馈自主学习。然而,强化学习的影响力远远超越了游戏本身,它的理念和方法…

风险数据集市整体架构及技术实现

引言 在当今大数据时代&#xff0c;风险数据集市作为金融机构的核心基础设施之一&#xff0c;扮演着至关重要的角色。它不仅为银行、保险等金融机构提供了全面、准确的风险数据支持&#xff0c;还帮助这些机构实现了风险管理的精细化和智能化。本文将深入探讨一种基于大数据La…

Leetcode 791 Custom Sort String

题意&#xff1a;给定两个字符串&#xff0c;第一个字符串order&#xff0c;给定字符出现的先后顺序。 第二个字符串需要按照第一个字符串的顺序重新排列。没有在order字符串中出现的数组随意排列 https://leetcode.com/problems/custom-sort-string/ 解答&#xff1a;先根据…

IP数据云 识别和分析tor、proxy等各类型代理

在网络上使用代理&#xff08;tor、proxy、relay等&#xff09;进行访问的目的是为了规避网络的限制、隐藏真实身份或进行其他的不正当行为。 对代理进行识别和分析可以防止恶意攻击、监控和防御僵尸网络和提高防火墙效率等&#xff0c;同时也可以对用户行为进行分析&#xff…

【系统设计】理解带宽延迟积(BDP)、吞吐量、延时(RTT)与TCP发送窗口的关系:优化网络性能的关键

在设计和优化网络性能时&#xff0c;理解 带宽延迟积&#xff08;BDP&#xff09;、吞吐量、延时&#xff08;RTT&#xff09; 和 TCP发送窗口 之间的关系至关重要。这些概念相互影响&#xff0c;决定了网络连接的性能上限&#xff0c;尤其是在高带宽、高延迟的环境中&#xff…