目录
第一节:线程私有ThreadCache
第二节:线程申请/释放内存的函数
2-1.ConcurrentAlloc
2-2.ConcurrentFree
第三节:测试优化
第四节:基数树优化
第五节:再次测试
第六节:下期预告
第一节:线程私有ThreadCache
之前说过每个线程都应该有一个tc,那么如何让每一个线程私有一个tc呢?这就需要使用
_declspec了。
使用它在ThreadCache.h中定义一个tc指针:
// 每个线程独有一份该指针
_declspec(thread) static ThreadCache* pTlsTC = nullptr;
这样做的话不同线程访问的指针 pTlsTC 都是自己私有的那一份指针。
第二节:线程申请/释放内存的函数
虽然线程可以使用tc申请内存,但是还需要封装一层,再嵌套一层申请内存的函数。
首先创建一个名为ConcurrentAlloc.h的头文件,然后包含头文件:
#include"ThreadCache.h"
#include"PageCache.h"
2-1.ConcurrentAlloc
这个函数用来申请内存,首先要检查 pTlsTC 是否已经实例化,如果没有实例化一个tc:
// 线程申请私有的tc+申请size字节内存
static void* ConcurrentAlloc(size_t size) {if (pTlsTC == nullptr) { // 该线程没有tc就创建一个// 使用一个定长内存池管理所有tcstatic FixedLengthMemoryPool<ThreadCache> cmpool;pTlsTC = cmpool.New();}
}
然后调用tc的Allocate即可:
// 线程申请私有的tc+申请size字节内存
static void* ConcurrentAlloc(size_t size) {if (pTlsTC == nullptr) {static FixedLengthMemoryPool<ThreadCache> cmpool;pTlsTC = cmpool.New();}return pTlsTC->Allocate(size);
}
2-2.ConcurrentFree
这个函数用来释放线程申请的内存,同样的,调用tc的Deallocate即可:
static void ConcurrentFree(void* ptr) {assert(pTlsTC);Span* span = PageCache::GetInst()->MapObjToSpan(ptr);size_t size = span->_objSize;pTlsTC->Deallocate(ptr, size);
}
第三节:测试优化
测试时要与malloc进行对比, 多个线程并发执行多次申请内存和释放内存:
#pragma once
#include"ConcurrentAlloc.h"
#include<thread>
#include<iostream>
#include<vector>std::atomic<size_t> poolCostTime = 0; // 内存池花费时间
std::atomic<size_t> mallocCostTime = 0; // malloc花费时间
size_t nTimes = 10;void _ConWithMalloc() {std::vector<void*> vPtr1;std::vector<void*> vPtr2;vPtr1.reserve(nTimes);vPtr2.reserve(nTimes);size_t begin1 = clock();for (size_t i = 0; i < nTimes; i++) {vPtr1.push_back(ConcurrentAlloc(16 + i));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < nTimes; i++) {ConcurrentFree(vPtr1[i]);}size_t end2 = clock();begin1 = clock();for (size_t i = 0; i < nTimes; i++) {vPtr2.push_back(malloc(16 + i));}end1 = clock();begin2 = clock();for (size_t i = 0; i < nTimes; i++) {free(vPtr2[i]);}end2 = clock();mallocCostTime += (end1 - begin1) + (end2 - begin2);poolCostTime += (end1 - begin1) + (end2 - begin2);
}void ConWithMalloc(size_t nWorks) {// nWorks个线程,每个线程执行nTimes次申请和释放内存std::vector<std::thread> vThread(nWorks);for (size_t k = 0; k < nWorks; k++) {vThread[k] = std::thread(_ConWithMalloc);}for (auto& t : vThread)t.join();std::cout << "内存池花费:" << poolCostTime << " malloc花费:" << mallocCostTime << std::endl;// 重置时间poolCostTime = 0;mallocCostTime = 0;
}int main() {for (size_t i = 0; i < 100; i++) // 循环100次ConWithMalloc(100);return 0;
}
可以看到内存池的花销还是比较大的,这是因为pc中每次通过地址找span时都需要上锁,锁的花销太大了,所以需要找一种方式代替哈希映射,而且这种方式不需要上锁。
这种更好的方式就是基数树。
第四节:基数树优化
基数树就是一种多级映射,但是每个span都有自己专属的位置,这样访问一个span的时候其他span的位置就不会受到影响,也就不需要上锁了。
我的电脑是32位系统,那么就有2^32B空间,然后一页设置为8K,即2^13B,那么电脑中最多存在2^(32-13)个span,故需要2^19个位置来保存所有span。
在实际的容器中,对应位置保存一个指针来指向该span,又因为2^19这个数量级太大了,所以使用二级基数树即可。
创建一个名为RadixTree.h的头文件,放入以下内容:
#pragma once
#include"Common.h"
#include"FLMP.h"// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.static const int ROOT_BITS = 5;static const int ROOT_LENGTH = 1 << ROOT_BITS;static const int LEAF_BITS = BITS - ROOT_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodesvoid* (*allocator_)(size_t); // Memory allocatorpublic:typedef uintptr_t Number;//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap2() {//allocator_ = allocator;memset(root_, 0, sizeof(root_));PreallocateMoreMemory();}void* find(Number k) const {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 || root_[i1] == NULL) {return NULL;}return root_[i1]->values[i2];}void set(Number k, void* v) {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);assert(i1 < ROOT_LENGTH);root_[i1]->values[i2] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> LEAF_BITS;// Check for overflowif (i1 >= ROOT_LENGTH)return false;// Make 2nd level node if necessaryif (root_[i1] == NULL) {//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));//if (leaf == NULL) return false;static FixedLengthMemoryPool<Leaf> leafPool;Leaf* leaf = (Leaf*)leafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] = leaf;}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {// Allocate enough to keep track of all possible pagesEnsure(0, 1 << BITS);}
};
然后在pc中将_idSpanMap的类型修改为:
TCMalloc_PageMap2<32-PAGE_SHIFT> _idSpanMap;
然后函数PageCache::NewSpan记录id与其span的地方也替换成set,这里需要勘误一点,就是之前retSpan只保存了它的开头和结尾页,实际上中间页也需要保存,因为中间的内存也会被使用,有两处需要修改成以下代码:
// -----------记录id与span-------------
for (PAGE_ID i = 0; i < retSpan->_n; i++) {_idSpanMap.set(retSpan->_pageId + i,retSpan);
}
// ------------------------------------
其次MapObjToSpan修改成:
Span* PageCache::MapObjToSpan(void* obj) {PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;auto ret = (Span*)_idSpanMap.find(id);assert(ret);return ret;
}
其他的细节根据错误列表一一修改即可。
第五节:再次测试
使用如下代码进行精细的测试:
std::atomic<size_t> poolCostTime(0); // 内存池花费时间
std::atomic<size_t> mallocCostTime(0); // malloc花费时间
const size_t nTimes = 100; // 每个线程执行的内存分配/释放次数
const size_t allocationSize = 16; // 固定的内存分配大小void measureMemoryPerformance() {std::vector<void*> vPtr1;std::vector<void*> vPtr2;vPtr1.reserve(nTimes);vPtr2.reserve(nTimes);// 使用更高精度的时间测量auto start = std::chrono::high_resolution_clock::now();for (size_t i = 0; i < nTimes; i++) {vPtr1.push_back(ConcurrentAlloc(allocationSize));}for (size_t i = 0; i < nTimes; i++) {ConcurrentFree(vPtr1[i]);}auto end = std::chrono::high_resolution_clock::now();poolCostTime += std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();start = std::chrono::high_resolution_clock::now();for (size_t i = 0; i < nTimes; i++) {vPtr2.push_back(malloc(allocationSize));}for (size_t i = 0; i < nTimes; i++) {free(vPtr2[i]);}end = std::chrono::high_resolution_clock::now();mallocCostTime += std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
}void ConWithMalloc(size_t nWorks) {// nWorks个线程,每个线程执行measureMemoryPerformance函数std::vector<std::thread> vThread;vThread.reserve(nWorks);for (size_t k = 0; k < nWorks; k++) {vThread.emplace_back(measureMemoryPerformance);}for (auto& t : vThread) {t.join();}
}int main() {const size_t numIterations = 10; // 循环次数for (size_t i = 0; i < numIterations; i++) {poolCostTime = 0;mallocCostTime = 0;ConWithMalloc(100); // 使用100个线程进行测试std::cout << "内存池花费时间(微秒):" << poolCostTime << " malloc花费时间(微秒):" << mallocCostTime << std::endl;}return 0;
}
可以看见,内存池的速度已经超过malloc了。
第六节:下期预告
效率优化完成后,下一次将完善功能——大于MAX_BYTES(256kb)的内存申请功能。