申请/释放 内存大小 | 申请方式 | 释放方式 |
---|---|---|
x≤256KB(32页) | 向ThreadCache申请 | 释放给ThreadCache |
32页<x≤128页 | 向PageCache申请 | 释放给PageCache |
x>128页 | 向堆申请 | 释放给堆 |
一、解决大于256KB的大块内存申请
(一)申请大于256KB的内存
比如申请257KB的内存大小,我们选择8KB作为对齐数,也就是
将257KB转换为字节:257*1024 = 263168字节;
代入公式计算对齐后的内存大小为:
(257KB+8KB-1)/8KB*8KB = 271359;
PageCache的申请范围为:32页<x≤128页
257KB对齐后的内存大小大约是33.12页,因此可以直接在PageCache中申请内存。
但当申请内存大于128页时,只能直接向操作系统申请内存了。
(二)以32页内存(256KB)和128页内存作为分界线
通过申请内存的大小,判断向PageCache申请还是直接向堆申请,无论哪一种申请,我们都需要进行一下内存对齐,得到对齐后需要申请的内存大小,然后再申请这份实际上需要申请的内存大小。
因此我们仍然需要通过RoundUp()函数中size与alignSize的映射关系来计算得到大于256KB后需要申请的对齐内存,然后再通过除以每页大小,得到页的数量。
通过申请的也得数量判断是:
- 直接调用NewSpan()来申请一个Span。
- 通过操作系统直接申请内存。
这个Span可以直接给大于256KB内存的对象使用,也可以拆分进入ThreadCache给小于256KB的对象使用。
在大于256KB的情况分两种:
当256KB < size < 12881024KB时,直接向PageCache申请内存,申请到合适的Span进行使用;
当size>12881024KB时,超过了PageCache能够提供的内存,因此直接向堆申请。一般情况下,256KB就足够使用了。
// 申请size字节,通过对齐数获取对齐后的字节数static inline size_t _RoundUp(size_t size, int alignNum){// 计算得到对齐后的字节数return (size + alignNum - 1) & ~(alignNum - 1);}static inline size_t RoundUp(size_t size){//assert(size <= MAX_BYTES); // ThreadCache中申请的内存大小最大为256KBif (size <= 128){return _RoundUp(size, 8);}else if (size <= 1024){return _RoundUp(size, 64);}else if (size <= 1024 * 8){return _RoundUp(size, 128);}else if (size <= 1024 * 64){return _RoundUp(size, 1024);}else if (size <= 1024 * 256){return _RoundUp(size, 8 * 1024);}else{/* 这里是申请内存大于256KB时的情况 */return _RoundUp(size,1 << PAGE_SHIFT);}}
这样我们就要修改部分代码,
比如在concurrentAlloc()函数中,我们应该分情况通过不同方式申请内存。
static void* concurrentAlloc(size_t size)
{if (size > MAX_BYTES){// 申请对象内存大于256KB,小于128页(128*8KB),直接向PageCache申请内存// 大于128页内存时,直接向系统申请内存// 无论申请多大的内存,在空间中占据都是对齐后的内存字节数// 因此我们仍然需要将大于256KB的内存进行对齐,而对齐就需要我们之前编写的RoundUp函数size_t alignSize = ClassSize::RoundUp(size); // 记得完善这个函数// 通过获取到的对齐后字节数,我们可以得到它需要多少页,每页是8KBsize_t page = alignSize >> PAGE_SHIFT;/* 访问CentralCache或是PageCache时,都要注意使用锁确保他们结构安全 */PageCache::GetInstance()._pageMtx.lock();Span* span = PageCache::GetInstance().NewSpan(page);PageCache::GetInstance()._pageMtx.unlock();// 通过申请到的Span得到我们申请内存的首地址void* ptr = (void*)(span->_pageId << PAGE_SHIFT);return ptr;}else{/* 申请内存大小为256KB范围内时,向ThreadCache申请内存 */// 1.为每份线程创建属于自己的ThreadCacheif (pTLSThreadCache == nullptr){pTLSThreadCache = new ThreadCache; // 这里的new后续会进行优化,现阶段先使用}return pTLSThreadCache->ThreadAlloc(size);}
}
(三)通过NewSpan()申请K页内存的Span(k可能小于128或大于128)
我们早期的NewSpan()中有判定k是小于128的。但是现在大于128页的申请页要通过NewSpan()来实现。
// 访问PageCache时记得加锁
Span* PageCache::NewSpan(size_t k)
{assert(k > 0);if (k > NPAGES - 1){/* 当申请的内存大于128页 *//* 向系统申请内存 */void* ptr = SystemAlloc(k);Span* span = new Span;// 记得更新Span自己的信息span->_pageId = (PageID)ptr >> PAGE_SHIFT;span->_n = k;// 建立页号与span之间的映射_idSpanMap[span->_pageId] = span;return span;}else{/* 当申请的内存小于等于128页 */// PageCache对应哈希桶中的Span是否存在if (!spanPageLists[k].Empty()){Span* kSpan = spanPageLists[k].PopFront();for (PageID i = 0; i < kSpan->_n; ++i){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}else{// 往上找大页,切割for (size_t i = k + 1; i < NPAGES; ++i){if (!spanPageLists[i].Empty()){// 不为空,切割Span* kSpan = new Span;Span* nSpan = spanPageLists[i].PopFront();kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_n -= k;nSpan->_pageId += k;spanPageLists[nSpan->_n].PushFront(nSpan);_idSpanMap[nSpan->_pageId] = nSpan;_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;for (PageID i = 0; i < kSpan->_n; ++i){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}}// 没有大页,就向系统申请Span* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PageID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;spanPageLists[bigSpan->_n].PushFront(bigSpan);return NewSpan(k);}}
}
记得在Span发生变化时对她的成员变量及时更新,首先是对应的页号_pageId和页数_n,其次是页号与span之间的映射关系,还有注意此span中的_isUse的状态。
二、释放大于256KB的内存
(一)释放大于256KB内存
线程释放内存的情况仍然分为三种。
- 释放小于256KB的内存,是一步步通过ThreadCache、CentralCache、PageCache回收内存,达到释放内存目的。
- 释放32页<x≤128页内存时,直接将对象释放在PageCache,为了减少外碎片,PageCache中的Span仍然需要进行合并操作。
- 释放x>128页内存时,因为内存量巨大,我们都是直接从堆申请内存,也直接释放到堆。
在前面释放内存的过程中,我们的RealseSpanToPageCache(span);函数满足将对象释放到PageCache中。
因此当释放对象时,我们需要先找到该对象对应的Span,在释放对象时我们能够得知该对象的起始地址ptr,通过这个起始地址计算出起始页号_pageId,通过在申请对象时建立的映射关系得到该对象对应的Span。
// 封装线程释放内存
static void concurrentFree(void* obj, size_t size)
{if (size <= MAX_BYTES){assert(pTLSThreadCache);// 回收内存对象释放的内存pTLSThreadCache->ThreadFree(obj, size);}else{Span* span = PageCache::GetInstance().MapObjectToSpan(obj);// 访问PageCache中的ReleaseSpanToPageCachePageCache::GetInstance()._pageMtx.lock();PageCache::GetInstance().ReleaseSpanToPageCache(span);PageCache::GetInstance()._pageMtx.lock();}
}
在RealseSpanToPageCache(span)中,我们也要考虑两种情况。
一是释放的内存是32页<x≤128页的。我们可以直接还给PageCache,然后进行合并。
二是释放的内存时x>128页的,这部分内存是我们直接向堆申请的,也就是直接还给堆,然后将这个Span对象给delete掉。
// 回收central cache中的空闲span
void PageCache::RealseSpanToPageCache(Span* span)
{// 释放的内存大于128页时if (span->_n > NPAGES-1){// 找到地址,释放对象 ,delete无用处的spanvoid* ptr = (void*)(span->_pageId << PAGE_SHIFT);SystemFree(ptr);delete span;return;}// 合并span的前后页,解决外碎片问题// 1.向前合并while (1){// span的前一页 对应的span(在page cache中存在)PageID prevId = span->_pageId - 1;auto ret = _idSpanMap.find(prevId);if (ret == _idSpanMap.end()){break; // 没有前一页,无法合并,跳出循环}// 有前一页,判断是否被使用中// span的页号为前一页对应span的首页号,合并页数Span* prevSpan = ret->second;if (prevSpan->_isUse == true){break;}// 有前一页,没有被使用中,合并后是否大于128页if (prevSpan->_n + span->_n > NPAGES - 1){break;}// 有前一页,没有被使用,合并后小于128页-->可以合并span->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;//将prevSpan从对应的双链表中移除_spanlists[prevSpan->_n].Erase(prevSpan);delete prevSpan;}//2、向后合并while (1){PageID nextId = span->_pageId + span->_n;auto ret = _idSpanMap.find(nextId);// 没有后面的页号(还未向系统申请),停止向后合并if (ret == _idSpanMap.end()){break;}// 后面的页号对应的span正在被使用,停止向后合并Span* nextSpan = ret->second;if (nextSpan->_isUse == true){break;}// 合并出超过128页的span无法进行管理,停止向后合并if (nextSpan->_n + span->_n > NPAGES - 1){break;} // 有后一页,没有被使用,合并后小于128页-->可以合并span->_n += nextSpan->_n;// 将nextSpan从对应的双链表中移除_spanlists[nextSpan->_n].Erase(nextSpan);delete nextSpan;}// 将合并后的span挂到对应的双链表当中_spanlists[span->_n].PushFront(span);// 建立新的映射 -> 该span与其首尾页的映射_idSpanMap[span->_pageId] = span;_idSpanMap[span->_pageId + span->_n - 1] = span;// 将该span设置为未被使用的状态span->_isUse = false;
}
//直接将内存还给堆
inline static void SystemFree(void* ptr)
{
#ifdef _WIN64VirtualFree(ptr, 0, MEM_RELEASE);
#elif _WIN32VirtualFree(ptr, 0, MEM_RELEASE);
#else//linux下sbrk unmmap等
#endif
}
(二)测试–申请释放32页<x≤128页内存
//找page cache申请
void* p1 = concurrentAlloc(257 * 1024); //257KB
ConcurrentFree(p1, 257 * 1024);
申请内存时:
释放内存时:
释放时进入ConcurrentFree()函数中,由于释放内存时大于256KB,小于128页大小的内存,所以我们直接释放到PageCache中。
进入RealseSpanToPageCache()中,小于128页大小的内存,我们要释放在PageCache中。
(三)测试–申请释放 128页<x 内存
//找堆申请
void* p2 = concurrentAlloc(129 * 8 * 1024); //129页
ConcurrentFree(p2, 129 * 8 * 1024);
申请的内存是129页(大于128页):
释放内存:
三、使用定长内存池作为组件优化
tcmalloc是要在高并发场景下替代malloc进行内存申请的,所以在需要使用到malloc的地方,我们都要替换成tcmalloc,因此我们应当避免在tcmalloc实现中再使用malloc函数。如何避免?这就需要使用我们之前的定长内存池了。
定长内存池的函数替换new和delete
将代码中使用到new的地方替换为调用定长内存池当中的New函数,将代码中使用delete的地方替换为调用定长内存池当中的Delete函数。替换代码这里就不一一展示了。
#include "ObjectMemoryPool.h"
// 用定长内存池作为组件脱离new
// 类外创建全局静态对象
ObjectPool<Span> _spanPool;
// 这里可能出现问题,实在无法纠正的话,就使用_head = new Span;
// 或者将这部分代码额外放在其他头文件中
class SpanList
{
private:Span* _head;public:// 桶锁std::mutex _mtx;public:SpanList(){// 初始化_head = _spanPool.New();_head->_next = _head;_head->_prev = _head;}// ……
};
如代码块所示,将上列代码整体(包括头文件)放在了Common.h文件的最下端。
因为定长内存池文件中包含Common.h,要定义静态内存池又需要引用ObjectMemoryPool.h文件,因此为了不影响Common.h其他变量的使用,将这部分代码放置在最下方。
注意PageCache中也需要new Span。
设置静态内存池tcPool
由于ThreadCache是线程私有的,因此将它的内存池tcPool设置为静态的,保持全局只有一个tcPool,让所有线程创建自己的ThreadCache的同时,都从着一个定长内存池中申请内存。
if(pTLSThreadCache == nullptr)
{static ObjectPool<ThreadCache> tcPool;pTLSTreadCache = tcPool.New();
}
该定长内存池中申请内存需要加锁
需要注意的是,该定长内存池中申请内存需要加锁,防止多个线程同时在tcPool 中申请自己的ThreadCache对象,从而导致线程安全问题(可能两个或多个线程申请的空间重复的现象)。
因为是多对一、同时进行的关系,所以需要加锁确保tcPool中的资源被有效利用。
/* 申请内存大小为256KB范围内时,向ThreadCache申请内存 */
// 1.为每份线程创建属于自己的ThreadCache
if (pTLSThreadCache == nullptr)
{// pTLSThreadCache = new ThreadCache; // 这里的new后续会进行优化,现阶段先使用// 使用定长内存池作为组件脱离newstatic ObjectPool<ThreadCache> tcPool;// 加锁 tcPool._objectMtx.lock();// 从定长内存池中申请内存来创建ThreadCache对象,这里假设存在allocate方法用于分配内存 pTLSThreadCache = tcPool.New();// 解锁 tcPool._objectMtx.unlock();// ……
}
如果定长内存池 是基于标准库的容器(如vector等)构建的定长内存池,可能需要对容器的操作进行加锁保护以确保线程安全。
四、设计管理互斥锁,加强_idSpanMap的线程安全
在前面多个ThreadCache申请一个定长内存池时,我们通过加锁增加线程安全。
那么在涉及到多线程环境下对_idSpanMap的操作时,确实存在线程安全隐患。如果不加以处理,可能会导致数据不一致、程序崩溃等问题。
在释放内存的过程中,可能存在存在多个线程同时释放的可能,也就是说这个时候多个线程都会去访问MapObjectToSpan()函数去访问_idSpanMap。
而此时此刻也有线程在申请内存或在释放Span时,都会导致_idSpanMap中的元素发生变化。
当申请Span或释放Span时,都会修改_idSpanMap,多个线程同时进行这些操作可能会相互干扰。例如,一个线程正在修改_idSpanMap的某个元素,而另一个线程同时也在尝试读取或修改相同的元素,这就会产生未定义的行为。
在MapObjectToSpan()函数中读取_idSpanMap也面临同样的风险。如果在读取的过程中,_idSpanMap被其他线程修改,那么读取到的数据可能是不准确的。
我们观察前面的代码,查看哪里在使用_idSpanMap时没有管理互斥锁,并将其修改完善。比如在CentralCache回收内存时,判断内存对象所属的Span中,并没有加锁保护。
Span* span = PageCache::GetInstance().MapObjectToSpan(start);
对MapObjectToSpan()进行管理互斥锁
Span* PageCache::MapObjectToSpan(void* obj)
{PAGEID id = ((PAGEID)obj>>PAGE_SHIFT);std::unique_lock<std::mutex> lock(_pageMtx); // 出了作用域会自动解锁// …………
}
确保我们在读取_idSpanMap时,_idSpanMap不会被其他线程修改。
五、优化concurrengtFree()
当我们使用malloc函数申请内存时,需要指明申请内存的大小;而当我们使用free函数释放内存时,只需要传入指向这块内存的指针即可。
如何通过对象地址获取对应的内存大小?
目前实现的内存池在释放对象时除了需要传入指向该对象的指针,还需要传入该对象的大小。如果我们也想做到在释放对象时不用传入对象的大小,那么我们就需要建立对象地址与对象大小之间的映射关系。
在此之前,我们有 通过对象的地址找到其对应的Span的映射关系(通过_pageId找到对应的Span),因此我们可以在Span结构中再增加一个_objSize成员,该成员代表着这个Span管理的内存块被切成的一个个对象的大小。
在代码中完善Span与_objSize一一对应的代码
所有的Span都是从PageCache中拿出来的,因此每当我们调用NewSpan获取到一个k页的Span时,接下来是就应该将这个Span的_objSize保存下来;
// 这里的size是对齐后的字节数,也就是申请的内存大小 // 这是在GetOneSpan()中Span* span = PageCache::GetInstance().NewSpan(ClassSize::NumMovePage(size));span->_isUse = true;span->_objSize = size;// 这是在concurrentAlloc()函数中// 内存申请大于256KB时Span* span = PageCache::GetInstance().NewSpan(kpage);span->_objSize = size;
注意,当线程申请内存时,无论是256KB以内的对象要将Span的成员补齐,256KB以外的对象,在申请时同样要注意补齐Span中的成员变量。
只输入对象指针即可释放对象
通过MapObjectToSpan()函数获取到对应的Span,再读取到对应的Span中的内存大小,接着进行分类释放。
// 封装线程释放内存
static void concurrentFree(void* obj)
{// 现在通过对应的指针找到对应的Span,通过Span找到对应的size也就是申请的内存大小Span* span = PageCache::GetInstance().MapObjectToSpan(obj);size_t size = span->_objSize;if (size <= MAX_BYTES){assert(pTLSThreadCache);// 回收内存对象释放的内存pTLSThreadCache->ThreadFree(obj, size);}else{// 访问PageCache中的ReleaseSpanToPageCachePageCache::GetInstance()._pageMtx.lock();PageCache::GetInstance().ReleaseSpanToPageCache(span);PageCache::GetInstance()._pageMtx.lock();}
}
六、对比malloc测试
创建一个BenchMark.cpp文件用于性能测试
#include"ConcurrentAlloc.h"// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&, k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){//v.push_back(malloc(16));v.push_back(malloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}std::cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc " << ntimes << "次: 花费:" << malloc_costtime << " ms\n";std::cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次free " << ntimes << "次: 花费:" << free_costtime << " ms\n";std::cout << nworks << "个线程并发malloc & free " << nworks * rounds * ntimes << "次,总计花费:" << (malloc_costtime + free_costtime) << " ms\n";
}// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){//v.push_back(ConcurrentAlloc(16));v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){ConcurrentFree(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}std::cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次ConcurrentAlloc " << ntimes << "次: 花费:" << malloc_costtime << " ms\n";std::cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次ConcurrentFree " << ntimes << "次: 花费:" << free_costtime << " ms\n";std::cout << nworks << "个线程并发ConcurrentAlloc & ConcurrentFree " << nworks * rounds * ntimes << "次,总计花费:" << (malloc_costtime + free_costtime) << " ms\n";}int main()
{size_t n = 1000;cout << "==========================================================" << endl;BenchmarkConcurrentMalloc(n, 4, 10);cout << endl << endl;BenchmarkMalloc(n, 4, 10);cout << "==========================================================" << endl;return 0;
}
在测试函数中,通过clock函数分别获取到每轮次申请和释放所花费的时间,然后将其对应累加到malloc_costtime和free_costtime上。最后我们就得到了nworks个线程跑rounds轮,每轮申请或释放ntimes次所申请或消耗的时间。
创建线程时让线程执行的是lambda表达式,而我们这里在使用lambda表达式时,以值传递的方式捕捉了变量k,以引用传递的方式捕捉了其他父作用域中的变量,因此我们可以将各个线程消耗的时间累加到一起。
在申请内存和释放内存的过程中,malloc_costtime和free_costtime可能被多个线程同时进行累加操作的,造成线程安全的问题。因此在定义这两个变量时使用atomic类模板,这时对它们的操作就是原子操作了(操作要么完全执行,要么完全不执行,不会处于中间状态)。
无论是固定大小内存的申请和释放,还是不同大小内存的申请和释放,可以发现,本项目实现的内存池比原生的malloc和free的效率要低不少。
七、性能瓶颈分析
将BenchmarkMalloc(n, 4, 10);屏蔽,只观察BenchmarkConcurrentMalloc(n, 4, 10);的性能分析。要在Debug下进行。
通过在VS编译器中带有的性能分析的工具的分析,我们发现,原因主要是MapObjectToSpan函数中的锁导致了性能低下。
因此当前项目的瓶颈点就在锁竞争上面,需要解决调用MapObjectToSpan函数访问映射关系时的加锁问题。
八、使用基数树进行优化
(一)从数据结构来看为什么插入_idSpanMap时不需要加锁,读取时候却需要加锁呢?
- 当线程一和线程二同时运行时,读取_idSpanMap时,我们采用map和unordered_map都是需要加锁的,确保我们在读取映射关系时,映射关系不会被改动。
- 我们了解map的底层数据结构是红黑树,unordered_map的底层数据结构是哈希表,它们的结构在插入数据时都会发生变化,因此插入和读取的操作无法同时进行。
了解基数树可以通过这些:
Trees I: Radix trees [LWN.net]
基数树结构—radix_tree_address space radix树-CSDN博客
tcmalloc当中针对这一点使用了基数树进行优化,使得在读取这个映射关系时可以做到不加锁。
(二)基数树的结构
- 基数树(Radix Tree)是一种用于高效查找和存储键值对的树形数据结构。
可以提高查找效率,是典型的以空间换时间的做法。- 根节点: 基数树的起点,所有的键值对都是从根节点开始进行查找和存储的。
- 内部节点: 内部节点用于表示键的一部分。每个内部节点包含多个指针,指向其子节点。这些指针的数量和键的字符集大小有关。例如,如果键是由8位字节组成的,那么每个内部节点最多可以有256个指针。
- 叶子节点:叶子节点用于存储键值对中的值。每个叶子节点对应一个完整的键,并且包含与该键关联的值。
- 路径压缩:为了节省内存,基数树通常会对路径进行压缩。这意味着如果某个节点只有一个子节点,那么这个节点和它的子节点可以合并成一个节点。这样可以减少树的高度,从而提高查找效率。
- 前缀共享:基数树允许多个键共享相同的前缀。这意味着如果多个键有相同的前缀,那么这些键在树中会共享相同的路径,直到它们的前缀不再相同为止。
单层基数树
单层基数树实际采用的就是直接定址法,每一个页号对应span的地址就存储在数组中在以该页号为下标的位置。
#include"Common.h"
//
// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:static const int LENGTH = 1 << BITS;void** array_;public:typedef uintptr_t Number;//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {/* 构造函数TCMalloc_PageMap1 */// 负责富世华array_ explicit TCMalloc_PageMap1() {//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));size_t size = sizeof(void*) << BITS;size_t alignSize = SizeClass::_RoundUp(size, 1<<PAGE_SHIFT);array_ = (void**)SystemAlloc(alignSize>>PAGE_SHIFT);memset(array_, 0, sizeof(void*) << BITS);}// 用于根据k获取对应的值,但前提是k必须在有效范围内void* get(Number k) const {if ((k >> BITS) > 0) {return NULL;}return array_[k];}// 根据k设置了对应的Span,同样要求k在有效范围内// void set(Number k, void* v) {array_[k] = v;}
};
BITS表示地址空间的位数,LENGTH表示基数树种某一层的节点数量(也就是基于这些位数产生的不同分支数量),BITS在32位下等于32-PAGE_SHIFT,64位下等于64-PAGE_SHIFT.因为32位下,232(32位下内存)/213 (8KB为一页)= 2^19(页数),计算出存储页号LENGTH 需要多少位,同理62位下也是(但是一层或两层用来放置51位的内存是不足的,因此需要三层)。
用数组表示单层基数树,下标对应着页号,数组中的内容是Span的地址BITS对应的是该平台下存储页号占的最大位数,LEHGIH表示的是页数,大小位2^BITS。
/* 构造函数TCMalloc_PageMap1 */// explicit关键字用于修饰构造函数,防止隐式转换。explicit TCMalloc_PageMap1() {//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));size_t size = sizeof(void*) << BITS;size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);memset(array_, 0, sizeof(void*) << BITS);}
构造函数TCMalloc_PageMap1 通过将sizeof(void*)左移BITS位得到的size(内存页的大小),将要申请的内存大小通过_RoundUp函数计算出如果以8KB对齐后的对齐后字节数,也就是我们要向系统申请(SystemAlloc)的内存大小。
最后使用memset函数初始化array_。
二层基数树
二层基数树初始状态只需要为第一层数组开辟空间,第二层数组按需开辟。
// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.static const int ROOT_BITS = 5;static const int ROOT_LENGTH = 1 << ROOT_BITS;static const int LEAF_BITS = BITS - ROOT_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodesvoid* (*allocator_)(size_t); // Memory allocatorpublic:typedef uintptr_t Number;//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap2() {//allocator_ = allocator;memset(root_, 0, sizeof(root_));PreallocateMoreMemory();}// 页号k右移14位,高位的5位值变化到低位,也就可以得到下标// 这个位置是一个指向下一层的指针// 页号k与第二层的页数按位与 得到在第二层的页数void* get(Number k) const {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 || root_[i1] == NULL) {return NULL;}return root_[i1]->values[i2];}void set(Number k, void* v) {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);ASSERT(i1 < ROOT_LENGTH);root_[i1]->values[i2] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> LEAF_BITS;// Check for overflowif (i1 >= ROOT_LENGTH)return false;// Make 2nd level node if necessaryif (root_[i1] == NULL) {//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));//if (leaf == NULL) return false;static ObjectPool<Leaf> leafPool;Leaf* leaf = (Leaf*)leafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] = leaf;}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {// Allocate enough to keep track of all possible pagesEnsure(0, 1 << BITS);}
};
由 ROOT_LENGTH和LEAF_LENGTH可以得知我们固定第一层页数为25页,第二层页数为214页。32位在物理空间中申请的内存与第一层是一样的,不同的是前5位决定了Span在第一层的哪个槽,后14位决定了Span在第二层的哪个槽。
Ensure()用于当需要建立页号与其 Span 之间的映射关系时,如果用于映射该页号的空间没有开辟时,则会为它开辟。
// 页号k右移14位,高位的5位值变化到低位,也就可以得到下标// 这个位置是一个指向下一层的指针// 页号k与第二层的页数按位与 得到在第二层的页数void* get(Number k) const {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 || root_[i1] == NULL) {return NULL;}return root_[i1]->values[i2];}void set(Number k, void* v) {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);ASSERT(i1 < ROOT_LENGTH);root_[i1]->values[i2] = v;}
如果页号 k 是一个32位的整数,右移14位后,原来高位的14位被丢弃,低位的18位保留。然后将原来高位的5位(因为右移14位后,这5位到了低位)的值变化到低位以得到下标。这时这个下标也就是第一层对应的位置,同时这个位置存储着指向下一层(可能是多层页表结构中的下一层)的指针。
在页表结构中,可能存在多层页表。当进行页号 k 与第二层的页数按位与操作时,目的是在第二层页表中确定对应的页数。按位与操作会将两个操作数对应的二进制位进行与运算(只有当对应的两位都为1时,结果位才为1),这样可以根据页号的某些位信息在第二层页表中定位到特定的页数。
因此这个就可以定位到对应的位置插入或查找到对应值了。
三层基数树
与二层基数树原理相同。
将基数树比作一个目录,我们有一条路径k,接下来只要按照单元标题、章节标题、知识点部分一层层的往下找路径k对应的线索,就能找到对应的讲解内存(Span)
// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:// How many bits should we consume at each interior levelstatic const int INTERIOR_BITS = (BITS + 2) / 3; // Round-upstatic const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;// How many bits should we consume at leaf levelstatic const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Interior nodestruct Node {Node* ptrs[INTERIOR_LENGTH];};// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Node* root_; // Root of radix treevoid* (*allocator_)(size_t); // Memory allocatorNode* NewNode() {Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));if (result != NULL) {memset(result, 0, sizeof(*result));}return result;}public:typedef uintptr_t Number;explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {allocator_ = allocator;root_ = NewNode();}void* get(Number k) const {const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 ||root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {return NULL;}return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];}void set(Number k, void* v) {ASSERT(k >> BITS == 0);const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);// Check for overflowif (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)return false;// Make 2nd level node if necessaryif (root_->ptrs[i1] == NULL) {Node* n = NewNode();if (n == NULL) return false;root_->ptrs[i1] = n;}// Make leaf node if necessaryif (root_->ptrs[i1]->ptrs[i2] == NULL) {Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));if (leaf == NULL) return false;memset(leaf, 0, sizeof(*leaf));root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {}
};
(三)将基数树用于内存池
我们已经创建好了基数树。
现在需要在程序应用到_idSpanMap时将二者结合起来。
// _idSpanMap在创建Span与页数之间的关系时使用set函数//_idSpanMap[kSpan->_pageId + i] = kSpan;_idSpanMap.set(kSpan->_pageId + i, kSpan);// _idSpanMap在读取Span与页数之间的关系时使用get函数/*std::unique_lock<std::mutex> lock(_pageMtx);auto ret = _idSpanMap.find(id);if (ret != _idSpanMap.end()){return ret->second;}else{assert(false);return nullptr;}*/auto ret = (Span*)_idSpanMap.get(id);// PageCache.hprivate://std::unordered_map<PAGE_ID, Span*> _idSpanMap;TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;
需要注意的是,我们使用基数树的目的就是为了解决调用MapObjectToSpan函数访问映射关系时的加锁问题,所以一定不要忘记把锁关掉。
(四)使用基数树优化结果测试
优化以后性能要高于malloc。
(1)固定大小时:性能快一倍左右
(2)不固定大小时:
可以自己设置值大一些,不过我的电脑不佳,设置数值太大,跑出来很慢。
九、打包成静态库
实际Google开源的tcmalloc是会直接用于替换malloc的,不同平台替换的方式不同。比如基于Unix的系统上的glibc,使用了weak alias的方式替换;而对于某些其他平台,需要使用hook的钩子技术来做。
对于我们当前实现的项目,可以考虑将其打包成静态库或动态库。
项目源码链接:https://gitee.com/shimir/concurrent-memory-pool/tree/master/ConcurrentMemoryPool