组合文件锁+共享锁,并RAII 化,保证文件的跨进程线程读写安全。
demo模拟使用多个进程,每个进程包含多个线程对文件进行读写测试。
代码调用开源json库,需要下载到调试机器,编译时手动指定:
g++ -std=c++17 -pthread dbug.cpp -I/root/json/include
#include <cstddef>
#include <cstdlib>
#include <unistd.h>
#include <fstream>
#include <sys/stat.h>
#include <filesystem>
#include <iomanip>#include <sys/wait.h>
#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
#include <unordered_map>
#include <memory>
#include <string>
#include <fcntl.h>
#include <sys/file.h> // 包含文件锁相关的头文件
#include <sys/types.h>
#include <sys/wait.h>
#include <mutex> // 包含mutex头文件
#include <shared_mutex>
#include <nlohmann/json.hpp> // 引入 JSON 库
#include <stdexcept> // 包含标准异常类// #include <memory>
/* 写json文件 */
using json = nlohmann::json;
namespace fs = std::filesystem;
void writeToJsonFile(const std::string &filename, const json &data, int maxRetries = 3,int retryDelayMilliseconds = 100);
/* 读json文件 */
json readJsonFromFile(const std::string &filename, int maxRetries = 3,int retryDelayMs = 100);class FileLock {
//创建双重锁,文件锁实现进程同步,共享锁实现线程同步,设置为配套获取与释放
private:std::string fileName;int fileDesc;mutable std::shared_mutex rwLock; // 实现线程同步
public:FileLock(const std::string& file) : fileName(file) {// 打开文件(如果文件不存在则创建)fileDesc = open(fileName.c_str(), O_CREAT | O_RDWR, 0666);if (fileDesc == -1) {throw std::runtime_error("Failed to open file for locking: " + fileName);}}~FileLock() {if (fileDesc != -1) {close(fileDesc); // 关闭文件描述符}}// 禁止复制构造函数和赋值运算符FileLock(const FileLock&) = delete;FileLock& operator=(const FileLock&) = delete;// 锁定文件进行读取void lockRead() {rwLock.lock_shared(); // 获取共享锁(读锁)if (flock(fileDesc, LOCK_SH) == -1) { // 获取共享锁(读锁)// rwLock.unlock_shared(); // 释放共享锁throw std::runtime_error("Failed to lock file for reading: " + fileName);}}// 释放文件读取锁void unlockRead() {if (flock(fileDesc, LOCK_UN) == -1) { // 释放锁throw std::runtime_error("Failed to unlock file after reading: " + fileName);}rwLock.unlock_shared(); // 释放共享锁}// 锁定文件进行写入void lockWrite() {rwLock.lock(); // 获取独占锁(写锁)if (flock(fileDesc, LOCK_EX) == -1) { // 获取独占锁(写锁)// rwLock.unlock(); // 释放独占锁throw std::runtime_error("Failed to lock file for writing: " + fileName);}}// 释放文件写入锁void unlockWrite() {if (flock(fileDesc, LOCK_UN) == -1) { // 释放锁throw std::runtime_error("Failed to unlock file after writing: " + fileName);}rwLock.unlock(); // 释放独占锁}
};class FileManager {
private:// std::unordered_map<std::string, std::shared_ptr<FileLock>> fileLocks;std::unordered_map<std::string, FileLock> fileLocks;std::mutex managerMtx; // 用于保护 fileLocks 的互斥锁// 私有构造函数,禁止外部创建实例FileManager() = default;// 删除拷贝构造函数和赋值运算符FileManager(const FileManager&) = delete;FileManager& operator=(const FileManager&) = delete;public:// 获取单例实例static FileManager& getInstance() {static FileManager instance; // 线程安全的局部静态变量(C++11 及以上)return instance;}// 获取指定文件的锁FileLock& getFileLock(const std::string& fileName) {// std::shared_ptr<FileLock> getFileLock(const std::string& fileName) {std::lock_guard<std::mutex> guard(managerMtx);// // std::cout << "getFileLock:" << fileName << std::endl;// if (fileLocks.find(fileName) == fileLocks.end()) {// // 如果该文件锁不存在,创建一个新的锁对象// fileLocks[fileName] = FileLock(fileName); // 永久保存FileLock,减少调用// // fileLocks[fileName] = std::make_shared<FileLock>(fileName);//动态释放FileLock// }// return fileLocks[fileName];auto it = fileLocks.find(fileName);if (it == fileLocks.end()) {// 如果该文件锁不存在,创建一个新的锁对象并插入到 map 中it = fileLocks.emplace(fileName, fileName).first;}return it->second;}// 移除指定文件的锁(可选)void removeFileLock(const std::string& fileName) {std::lock_guard<std::mutex> guard(managerMtx);fileLocks.erase(fileName);}
};
//创建单例锁管理,永久存储文件锁
FileManager& manager = FileManager::getInstance();//将双重锁 RAII 化, 用于自动管理 FileLock 的写锁
class WriteLockGuard {
public:WriteLockGuard(FileLock& fileLock) : fileLock(fileLock) {fileLock.lockWrite(); // 加锁}~WriteLockGuard() {fileLock.unlockWrite(); // 自动解锁}private:FileLock& fileLock;
};//将双重锁 RAII 化, 用于自动管理 FileLock 的读锁
class ReadLockGuard {
public:ReadLockGuard(FileLock& fileLock) : fileLock(fileLock) {fileLock.lockRead(); // 加锁}~ReadLockGuard() {fileLock.unlockRead(); // 自动解锁}private:FileLock& fileLock;
};/******************************************************************************** 名称: checkJsonFile* 描述: 创建json文件* 作者: mkx* 参数: void* 返回: void******************************************************************************/
void checkJsonFile(const std::string &fileName) {try{// 检查文件是否存在if (fs::exists(fileName)) {return;}// 获取文件所在目录fs::path filePath(fileName);fs::path directory = filePath.parent_path();// std::cout << "filePath:" << filePath << std::endl;// std::cout << "directory:" << directory << std::endl;// 如果目录不存在,则创建目录if (!directory.empty() && !fs::exists(directory)) {// std::cout << "创建目录: " << directory << std::endl;// debug_log(DLOG_INFO, "Directory created: %s", directory.string().c_str());if (!fs::create_directories(directory)) {std::cerr << "无法创建目录: " << directory << std::endl;// debug_log(DLOG_ERROR, "Directory created fail: %s", directory.string().c_str());return;}}FileLock& fileLock = manager.getFileLock(fileName);//以文件名为KEY获取对应锁//WriteLockGuard加锁时会自动创建文件WriteLockGuard lock(fileLock); // 创建 WriteLockGuard 对象,自动加锁if (fs::exists(fileName)) {// debug_log(DLOG_INFO, "File created: %s", filePath.string().c_str());std::cout << "文件创建成功: " << std::endl;return;}}catch (const std::filesystem::filesystem_error &e){const std::string &errorMessage = e.what();// debug_log(DLOG_ERROR, "Error: %s", errorMessage.c_str());}catch (const std::exception &e){const std::string &errorMessage = e.what();// debug_log(DLOG_ERROR, "Error: %s", errorMessage.c_str());}
}/******************************************************************************** 名称: writeToJsonFile* 描述: 写json文件* 作者: mkx* 参数: void* 返回: void******************************************************************************/
void writeToJsonFile(const std::string &filename, const json &data, int maxRetries,int retryDelayMilliseconds)
{checkJsonFile(filename);// 内有双重锁!FileLock& fileLock = manager.getFileLock(filename);//以文件名为KEY获取对应锁for (int retryCount = 0; retryCount < maxRetries; ++retryCount){try{WriteLockGuard lock(fileLock); // 创建 WriteLockGuard 对象,自动加锁std::this_thread::sleep_for(std::chrono::seconds(1));std::ofstream outputFile(filename); //覆盖写入if (outputFile.is_open()) {// 将 JSON 数据写入文件,格式化输出(缩进为4个空格)std::cerr << data.dump(4) << std::endl;outputFile << data.dump(4) << std::endl; // 换行outputFile.close(); return;}else{throw std::ios_base::failure("Failed to open file for writing.");}}//释放WriteLockGuardcatch (const std::filesystem::filesystem_error &e){const std::string &errorMessage = e.what();// debug_log(DLOG_INFO, "File operation failed: %s", errorMessage.c_str());}catch (const std::ios_base::failure &e){const std::string &errorMessage = e.what();// debug_log(DLOG_INFO, "File operation failed: %s", errorMessage.c_str());}catch (const std::exception &e){const std::string &errorMessage = e.what();// debug_log(DLOG_INFO, "Error: %s", errorMessage.c_str());}// Introduce a delay before retryingstd::this_thread::sleep_for(std::chrono::milliseconds(retryDelayMilliseconds));}// debug_log(DLOG_ERROR, "writeToJsonFile failed, Retry 5 times!!!");
}/******************************************************************************** 名称: readJsonFromFile* 描述: 读取json文件* 作者: mkx* 参数: void* 返回: void******************************************************************************/
json readJsonFromFile(const std::string &filename, int maxRetries, int retryDelayMs)
{checkJsonFile(filename);FileLock& fileLock = manager.getFileLock(filename);//以文件名为KEY获取对应锁for (int attempt = 0; attempt < maxRetries; attempt++){try{ReadLockGuard lock(fileLock); // 创建 ReadLockGuard 对象,自动加锁// std::this_thread::sleep_for(std::chrono::seconds(1));// 打开文件并直接定位到末尾std::ifstream inputFile(filename, std::ios::ate | std::ios::binary); if (inputFile.is_open()){// 检查文件是否为空(避免创建空文件时要写入空json)if (inputFile.tellg() == 0) // 获取文件大小{std::cout << "R" << json().dump(4) << std::endl; // 使用 dump(4) 格式化输出,缩进为 4 个空格return json(); // 返回一个空的 JSON 对象}inputFile.seekg(0, std::ios::beg); // 重置文件指针到文件开头json loadedData;inputFile >> loadedData;inputFile.close();loadedData["WR"] = "R";std::cout << loadedData.dump(4) << std::endl; // 使用 dump(4) 格式化输出,缩进为 4 个空格return loadedData;}else{// debug_log(DLOG_INFO, "File %s not found. ", filename.c_str());std::cout << "else Loaded JSON " << std::endl;}}//释放ReadLockGuardcatch (const std::ios_base::failure &e){const std::string &errorMessage = e.what();// debug_log(DLOG_INFO, "File operation failed: %s", errorMessage.c_str());std::cout << "aaaaaaLoaded JSON " << std::endl;}catch (const std::exception &e){const std::string &errorMessage = e.what();// debug_log(DLOG_INFO, "Error: %s", errorMessage.c_str());std::cout << errorMessage.c_str() << std::endl;}// 重试之前等待一段时间std::this_thread::sleep_for(std::chrono::milliseconds(retryDelayMs));}// debug_log(DLOG_ERROR, "readJsonFromFile failed, Retry 5 times!!!");// 重试次数超过最大限制,返回空的 JSON 对象表示读取失败return json();
}// 使用fork创建进程并执行任务
void createProcess1(const std::string& proID,const std::string& fileName) {pid_t pid = fork();if (pid == 0) {json data = {{"name", fileName},{"age", proID},{"WR", "W"}}; // 在子进程中创建线程进行读写操作std::thread reader1(readJsonFromFile,fileName,1,100);std::thread reader2(readJsonFromFile,fileName,2,100);std::thread writer1(writeToJsonFile,fileName,data,3,100);std::thread writer2(writeToJsonFile,fileName,data,4,100);reader1.join();reader2.join();writer1.join();writer2.join();exit(0); // 退出子进程} else if (pid > 0) {// 父进程wait(NULL); // 等待子进程结束} else {std::cerr << "Fork failed!" << std::endl;}
}int main() {std::string fileName1 = "1.txt";std::string fileName2 = "2.txt";std::string fileName3 = "3.txt";std::string fileName4 = "4.txt";std::string fileName5 = "5.txt";std::string fileName6 = "6.txt";std::string fileName7 = "7.txt";std::string fileName8 = "8.txt";std::string fileName9 = "9.txt";std::string fileName0 = "0.txt";std::string fileNamea = "a.txt";std::string fileNameb = "b.txt";std::string fileNamec = "c.txt";std::string fileNamed = "d.txt";std::string fileNamee = "e.txt";std::string fileNamef = "f.txt";std::string fileNameg = "g.txt";// 创建多个进程进行并行执行std::vector<std::thread> threads;threads.emplace_back(createProcess1,"P1", fileName1);threads.emplace_back(createProcess1,"P2", fileName1);threads.emplace_back(createProcess1,"P3", fileName1);threads.emplace_back(createProcess1,"P4", fileName1);// 等待所有线程完成for (auto& t : threads) {t.join();}return 0;
}