Linux System V - 消息队列与责任链模式

server/2025/2/25 21:45:30/

概念

消息队列是一种以消息为单位的进程间通信机制,允许一个或多个进程向队列中发送消息,同时允许一个或多个进程从队列中接收消息。消息队列由内核维护,具有以下特点:

  • 异步通信:发送方和接收方不需要同时运行,消息会存储在队列中。

  • 消息优先级:支持消息的优先级排序。

  • 消息分类:每条消息都有一个类型,可以根据类型选择性地接收。

  • 持久性:即使发送方或接收方意外退出,消息仍然保留在队列中。

内核结构

在 /usr/include/linux/msg.h 中,可以查看到消息队列的结构如下:

struct msqid_ds {
struct ipc_perm msg_perm;
struct msg msg_first; / first message on queue,unused */
struct msg msg_last; / last message in queue,unused */
__kernel_time_t msg_stime; /* last msgsnd time */
__kernel_time_t msg_rtime; /* last msgrcv time */
__kernel_time_t msg_ctime; /* last change time */
unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */
unsigned long msg_lqbytes; /* ditto */
unsigned short msg_cbytes; /* current number of bytes on queue */
unsigned short msg_qnum; /* number of messages in queue */
unsigned short msg_qbytes; /* max number of bytes on queue */
__kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */
__kernel_ipc_pid_t msg_lrpid; /* last receive pid */
};

其中 struct ipc_perm 是 System V IPC(进程间通信)机制中用于描述 IPC 对象(如消息队列、信号量集或共享内存段)权限和所有者信息的数据结构。它定义了谁可以访问和操作这些 IPC 资源,以及它们的访问权限,其中该结构体位于 /usr/include/linux/ipc.h 内,定义如下:

struct ipc_perm {
key_t __key; /* Key supplied to xxxget(2) */
uid_t uid; /* Effective UID of owner */
gid_t gid; /* Effective GID of owner */
uid_t cuid; /* Effective UID of creator */
gid_t cgid; /* Effective GID of creator */
unsigned short mode; /* Permissions */
unsigned short __seq; /* Sequence number */
};

接口说明

以下函数均包含在头文件 <sys/ipc.h> 和 <sys/msg.h> 中

msgget

功能:调用创建或获取一个消息队列

int msgget(key_t key, int msgflg);
  • key:唯一标识消息队列的键值,通常通过 ftok 函数生成。

我们可以看到 key 的类型为 key_t,实际上也是一个整数,不过要保证其数值的唯一性,要获取该参数 key 我们需要使用 ftok 函数,它会根据一个文件路径和一个项目标识符(proj_id)通过一系列算法生成一个几乎唯一的键值。

  • pathname 是一个指向存在的文件路径的指针,实际上可以随便写。
  • proj_id 是一个整数,通常是一个字符常量,其低8位被用于生成键值,实际上可以随便写。

  • msgflg:指定消息队列的权限和行为,由九个权限标志构成,它们的用法和创建文件时使用的mode模式标志是一样的。

其中最为常用的shmflg如下:

  • IPC_CREAT:如果共享内存段不存在,则创建一个新的共享内存段,存在则返回 shmid。
  • IPC_EXCL:与IPC_CREAT一起使用时,确保创建新的共享内存段,如果共享内存段已存在则失败。
  • 权限位:shmflg的低9位用于设置共享内存段的权限,与文件系统的权限位相同。
#include <sys/ipc.h>
#include <sys/msg.h>
#include <iostream>
using namespace std;
int main()
{key_t key = ftok("/home/lbk/lesson27/test.cc", 8888);int msgid = msgget(key, IPC_CREAT);return 0;
}

msgsnd

功能:用于向消息队列中发送消息。

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
参数
  • msgid:消息队列的 ID(由 msgget() 返回)。

  • msgp:指向消息结构的指针。消息结构的第一个字段必须是 long 类型,表示消息类型,如下。

struct msgbuf {
long mtype; /* message type, must be > 0 */
char mtext[1]; /* message data */
};
// 以⼀个long int⻓整数开始,接收者函数将利⽤⽤这个⻓整数确定消息的类型
  • msgsz:消息的大小(不包括消息类型字段)。

  • msgflg:标志位,常用值:

    • 0:阻塞模式,直到消息被发送。

    • IPC_NOWAIT:非阻塞模式,如果队列已满,立即返回。

返回值
  • 成功时返回 0

  • 失败时返回 -1 并设置 errno

#include <sys/ipc.h>
#include <sys/msg.h>
#include <iostream>
using namespace std;
struct msg_buffer
{long msg_type;char *msg_text;
};
int main()
{key_t key = ftok("/home/lbk/lesson27/test.cc", 8888);int msgid = msgget(key, IPC_CREAT);msg_buffer sendbuf = {1, "hello world!"};msgsnd(msgid, &sendbuf, sizeof(sendbuf.msg_text), 0);return 0;
}

msgrcv

功能:用于从消息队列中接收消息。

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
参数
  • msgid:消息队列的 ID。

  • msgp:指向消息结构的指针。

  • msgsz:消息的最大接收大小(不包括消息类型字段)。

  • msgtyp:指定接收的消息类型:

    • 0:接收队列中第一个消息。

    • >0:接收指定类型的消息。

    • <0:接收类型小于等于该值的最小类型消息。

  • msgflg:标志位:

    • 0:阻塞模式,直到消息可用。

    • IPC_NOWAIT:非阻塞模式,如果队列为空,立即返回。

返回值
  • 成功时返回实际接收的消息大小。

  • 失败时返回 -1 并设置 errno

#include <sys/ipc.h>
#include <sys/msg.h>
#include <iostream>
using namespace std;
struct msg_buffer
{long msg_type;char *msg_text;
};
int main()
{key_t key = ftok("/home/lbk/lesson27/test.cc", 8888);int msgid = msgget(key, IPC_CREAT);msg_buffer sendbuf = {1, "hello world!"};msgsnd(msgid, &sendbuf, sizeof(sendbuf.msg_text), 0);msg_buffer receivebuf;msgrcv(msgid, receivebuf.msg_text, 1024, 2, 0);cout << receivebuf.msg_text << endl;return 0;
}

msgctl

功能:用于对消息队列进行控制操作,如获取状态、设置属性或删除队列。

int msgctl(int msqid, int cmd, struct msqid_ds *buf);
参数
  • msgid:消息队列的 ID。

  • cmd:指定操作命令:

    • IPC_STAT:获取消息队列的状态信息,存储到 buf 中。

    • IPC_SET:设置消息队列的权限和属性。

    • IPC_RMID:删除消息队列。

  • buf:用于存储或修改消息队列的状态信息。

返回值
  • 成功时返回 0

  • 失败时返回 -1 并设置 errno

#include <sys/ipc.h>
#include <sys/msg.h>
#include <iostream>
using namespace std;
struct msg_buffer
{long msg_type;char *msg_text;
};
int main()
{key_t key = ftok("/home/lbk/lesson27/test.cc", 8888);int msgid = msgget(key, IPC_CREAT);msg_buffer sendbuf = {1, "hello world!"};msgsnd(msgid, &sendbuf, sizeof(sendbuf.msg_text), 0);msg_buffer receivebuf;msgrcv(msgid, receivebuf.msg_text, 1024, 2, 0);cout << receivebuf.msg_text << endl;// 删除消息队列if (receivebuf.msg_text == "quit"){msgctl(msgid, IPC_RMID, nullptr) == -1;}return 0;
}

基本通信示例

MessageQueue.hpp

#include <sys/ipc.h>
#include <sys/msg.h>
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
using namespace std;#define SIZE 1024
#define PATH "/home/lbk/lesson27/MessageQueue.hpp"
#define PROJETID 6666
#define CREATE_MESSAGEQUEUE (IPC_CREAT | IPC_EXCL | 0666)
#define GET_MESSAGEQUEUE (IPC_CREAT)
#define SERVERTYPE 1
#define CLIENTTYPE 2
enum
{KEY_ERROR = 1,MSGGET_ERRO,
};
struct msg_buffer
{long msg_type;char msg_text[SIZE];
};
class MessageQueue
{
public:void Create(int flag){key_t key = ftok(PATH, PROJETID);if (key < 0){cerr << "ftok error" << endl;exit(KEY_ERROR);}_msgid = msgget(key, flag);if (_msgid < 0){cerr << "msgget error" << endl;exit(MSGGET_ERRO);}}bool SendMessage(const string &in, long type){msg_buffer buf;buf.msg_type = type;strcpy(buf.msg_text, in.c_str());int n = msgsnd(_msgid, &buf, SIZE, 0);if (n < 0){cout << "SendMessage error" << endl;return false;}return true;}bool ReceiveMessage(string *out, long type){msg_buffer buf;int n = msgrcv(_msgid, &buf, SIZE, type, 0);if (n < 0){cout << "ReceiveMessage error" << endl;return false;}buf.msg_text[n] = 0;*out = buf.msg_text;return true;}void Destroy(){msgctl(_msgid, IPC_RMID, nullptr);}private:int _msgid;
};class MessageQueueServer : public MessageQueue
{
public:MessageQueueServer(){Create(CREATE_MESSAGEQUEUE);}~MessageQueueServer(){Destroy();}
};class MessageQueueClient : public MessageQueue
{
public:MessageQueueClient(){Create(GET_MESSAGEQUEUE);}
};

server.cc

#include "MessageQueue.hpp"
int main()
{MessageQueueServer server;string message;while (message != "quit"){server.ReceiveMessage(&message, CLIENTTYPE);cout << "server receive: " << message << endl;}return 0;
}

client.cc

#include "MessageQueue.hpp"
int main()
{MessageQueueClient client;string message;while (message != "quit"){getline(cin, message);client.SendMessage(message, CLIENTTYPE);}return 0;
}

makefil

责任链模式

新需求:
  • client 发送给 server 的输⼊内容,拼接上时间,进程pid信息
  • server 收到的内容持久化保存到⽂件中
  • ⽂件的内容如果过⼤,要进⾏切⽚保存并在指定的⽬录下打包保存,命令⾃定义
解决⽅案:责任链模式
⼀种⾏为设计模式,它允许你将请求沿着处理者链进⾏传递。每个处理者都对请求进⾏检查,以决定是否处理它。如果处理者能够处理该请求,它就处理它;否则,它将请求传递给链中的下⼀个处理者。这个模式使得多个对象都有机会处理请求,从⽽避免了请求的发送者和接收者之间的紧耦合。

主要特点

  • 解耦请求和处理:请求的发送者不需要知道具体的处理者是谁,只需要将请求发送到链上即可。

  • 动态组合处理逻辑:可以在运行时动态地添加或修改处理者,灵活地调整处理逻辑。

  • 职责分离:每个处理者只负责处理自己能够处理的请求,无法处理的请求则传递给下一个处理者。

代码示例

HandleText.hpp

#pragma once
#include <iostream>
#include <string>
#include <memory>
#include <ctime>
#include <sstream>
#include <fstream>
#include <unistd.h>
#include <filesystem>
#include <sys/wait.h>
using namespace std;
class HandleText
{
public:HandleText(): _enable(true){}void Enable(){_enable = true;}void Disenable(){_enable = false;}void SetNextHandler(shared_ptr<HandleText> &handler){_next_handler = handler;}virtual void Excute(string &info) = 0;protected:bool _enable;shared_ptr<HandleText> _next_handler;
};class HandleTextFormat : public HandleText
{
public:void Excute(string &info) override{if (_enable){cout << "Format ..." << endl;stringstream ss;time_t now = time(nullptr);char buffer[80];strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", localtime(&now));ss << buffer << "-" << getpid() << ":" << info << "\n";info = ss.str();}if (_next_handler){_next_handler->Excute(info);}}
};#define DefaultFilePath "./tmp/"
#define DefaultFileName "test.log"
class HandleTextSaveFile : public HandleText
{
public:HandleTextSaveFile(){_path = DefaultFilePath;_name = DefaultFileName;if (!filesystem::exists(_path)){filesystem::create_directories(_path);}}void Excute(string &info) override{if (_enable){cout << "Save ..." << endl;string file = _path + _name;ofstream out(file, ios::app);if (!out.is_open())return;out << info;out.close();}if (_next_handler){_next_handler->Excute(info);}}private:string _path;string _name;
};const int maxline = 5;
class HandleTextBackup : public HandleText
{
public:HandleTextBackup(){_path = DefaultFilePath;_name = DefaultFileName;}void Excute(string &info) override{if (_enable){cout << "Backup ..." << endl;string file = _path + _name;ifstream in(file);if (!in.is_open())return;string line;int linecount = 0;while (getline(in, line)){linecount++;}if (linecount > maxline){cout << "消息⾏数超过" << maxline << ", 触发⽇志备份 " << endl;Backup();}}if (_next_handler){_next_handler->Excute(info);}}void Backup(){int pid = fork();string newfile = _name + "." + to_string(time(nullptr));if (pid == 0){chdir(_path.c_str());filesystem::rename(_name, newfile); // 重命名文件// 打包文件string tarfile = newfile + ".tar";execlp("tar", "tar", "-czf", tarfile.c_str(), newfile.c_str(), nullptr);exit(0);}waitpid(pid, nullptr, 0);string tmpfile = _path + newfile;filesystem::remove(tmpfile);}private:string _path;string _name;
};class HandlerEntry
{
public:HandlerEntry(){_format = make_shared<HandleTextFormat>();_save = make_shared<HandleTextSaveFile>();_backup = make_shared<HandleTextBackup>();_format->SetNextHandler(_save);_save->SetNextHandler(_backup);}void EnableHandler(bool isformat, bool issave, bool isbackup){isformat ? _format->Enable() : _format->Disenable();issave ? _save->Enable() : _save->Disenable();isbackup ? _backup->Enable() : _backup->Disenable();}void Run(string &info){_format->Excute(info);}private:shared_ptr<HandleText> _format;shared_ptr<HandleText> _save;shared_ptr<HandleText> _backup;
};


http://www.ppmy.cn/server/170614.html

相关文章

37、深度学习-自学之路-自己搭建深度学习框架-2、自动梯度计算

import numpy as npclass Tensor(object):import numpy as np&#xff1a;导入 numpy 库&#xff0c;用于处理数组相关操作。class Tensor(object)&#xff1a;定义了一个名为 Tensor 的类&#xff0c;继承自 object。__init__ 方法是类的构造函数&#xff0c;用于初始化 Tenso…

AI时代医疗大健康微服务编程提升路径和具体架构设计

一、引言 1.1 研究背景与意义 随着科技的飞速发展,人工智能(Artificial Intelligence,AI)已逐渐渗透至各个领域,医疗大健康领域亦不例外。人工智能与医疗大健康的融合,正引领着医疗行业迈向智能化、精准化、个性化的新时代,为解决医疗资源分布不均、提升医疗服务效率和…

C++程序员内功修炼——Linux C/C++编程技术汇总

在软件开发的宏大版图中&#xff0c;C 语言宛如一座巍峨的高山&#xff0c;吸引着无数开发者攀登探索。而 Linux 操作系统&#xff0c;以其开源、稳定、高效的特性&#xff0c;成为了众多开发者钟爱的开发平台。将 C 与 Linux 相结合&#xff0c;就如同为开发者配备了一把无坚不…

基于级联前向反向传播神经网络(FCBP)的数据回归预测【MATLAB】

级联前向反向传播神经网络&#xff08;Feedforward and Cascade Backpropagation Propagation Neural Network, FCBP&#xff09;是一种针对传统BP神经网络缺陷改进的深度学习模型。其核心通过级联连接结构和动态传播机制&#xff0c;显著提升了非线性建模能力与训练效率。 一…

TCP fast open

TCP Fast Open 复用 Cookie 快速恢复会话&#xff0c;减少 1 个 RTT 的延迟 传统 TCP 三次握手需 1.5 RTT才能传输应用数据&#xff0c;导致 HTTP 请求延迟较高 TCP Fast Open&#xff1a;为了解决传统 TCP 握手中的延迟问题&#xff0c;通过允许在首次 SYN 握手阶段携带应用数…

【C】堆的应用1 -- 堆排序

之前学习了堆&#xff0c;堆的一棵以顺序结构存储的完全二叉树&#xff0c;堆本身又氛围大根堆和小根堆&#xff0c;假设以大根堆为例&#xff0c;由于堆顶部元素是一棵二叉树里面最大的元素&#xff0c;所以如果每次都取堆顶的元素&#xff0c;那么取出的元素就是一个降序排列…

前端Sass面试题及参考答案

目录 什么是 Sass? Sass 和 CSS 的主要区别是什么? Sass 中如何处理列表? Sass 中如何处理映射(map)? Sass 中如何使用函数? Sass 中如何使用内置函数? Sass 中如何设置默认值? Sass 中的 @function 和 @mixin 有什么区别? Sass 中如何实现模块化? Sass 中…

深度学习之图像分类(二)

前言 文章主要是通过实战项目——食品分类来理解分类项目的整体流程。除此之外&#xff0c;还需要对半监督学习&#xff0c;迁移学习&#xff0c;数据增广&#xff0c;Adam和AdamW进行了解。 数据增广 图片增广&#xff08;Image Data Augmentation&#xff09;是深度学习中一种…