1.整体框架的设计
首先我们要来大概的梳理一下我们的高并发内存池的整体框架设计
在现代很多开发环境其实都是多核多线程,在申请内存的情况下,就必然会存在激烈的锁竞争问题。如果我们需要要实现内存池,必须要考虑以下几方面的问题。
1.性能问题
2.在多线程的环境下,锁竞争的问题
3.内存碎片问题
而整个内存池就由3个部分组成
1.thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每一个线程独享一个cache,这也就是这个并发线程池高效的地方。
2.central cache:中心缓存是所有thread cache(线程)共享的,thread cache是按需从central cache中获取对象。而当thread cache中有剩余的对象的时候,central cache就会在合适的时机从thread cache中回收对象,避免一个线程占用太多的内存,而其它线程的内存吃紧的时候,central cache就会扮演一个中心按需调度对象的角色。由于central cache是各种thread cache的中心缓存,所以这里势必会有thread cache同时申请调度内存,所以会存在线程的竞争,所以从这里取内存对象是需要加锁的,首先这里使用的是桶锁,凄恻只有thread cache没有内存对象的时候才会招central cache。
3.page cache:页缓存是在central cache上面的一层缓存,春促的内存是以页为单位存储及分配的,central cache没有内存对象时候,从page cache分配出一定的数量page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,环节内存碎片的问题
2.thread cache的设计
首先要明白thread cache 是一个哈希桶的结构,每个桶是按一个桶的位置来映射大小的内存块的自由链表。每个线程都有一个自己的thread cache,这样在申请内存的时候就考研直接从thread cache上申请,这种操作是无锁的。
从图中可以看到,每一个自由链表的后面都挂着一块与之对应字节的小对象,比如8byte的后面就挂着8字节的对象,16byte的后面就挂着16字节的对象,当然,这个小对象是从central cache中申请过来的,当他的后面是NULL的时候,就会从central cache中来申请。
这也就可以引出我们的自由链表的小对象使用法则:
当一个线程需要某个字节数的空间的时候,比如7字节的空间,就会通过某种计算的规则,计算出自由链表的下标,然后从该下标的thread cache中寻找内存块,如果当前的自由链表里面不是空的,就会头删一块空间给线程来用,否则,就向central cache中申请对象。
我们可以把自由链表的操作方法放在一个FreeList类当中,而在自由链表中,挂起的对象的存储方式是以一个对象的前4/8个字节来存储后面一个对象的地址,因此我们在插入对象的时候,就可以使用之前的方法
static const size_t MAX_BYTES = 256 * 1024;static void*& NextObj(void* obj)
{return *(void**)obj;
}//切分好的小对象的自由链表
class FreeList
{
public:void push(void* obj){NextObj(obj) = _freeList;_freeList = obj;}void pop(){assert(_freeList);void* obj = _freeList;_freeList = NextObj(obj);}bool empty(){return _freeList == nullptr;}size_t& maxSize(){return _maxSize;}void pushRange(void* start, void* end){NextObj(end) = _freeList;_freeList = start;}
private:void* _freeList = nullptr;size_t _maxSize = 1;
};
从代码中可以看到,我在里面设置了一个静态变量MAX_BYTES,借着上面的图中我们可以看到,自由链表的字节大小是从8字节开始,然后递增,最多到256KB的,这是一个居中的取法,如果从1字节开始存,那如果要存到256KB上限,是需要25万6千多个指针来进行存储。虽然指针的数量还好,但是还是比较麻烦,我在这里参考了高手们的对齐规则写法,就直接写出来了。
1.自由链表指针的对齐规则
虽然对齐产生的内碎片会引起一定程度的空间浪费,但按照上面的对齐规则,我们可以将浪费率控制到百分之十左右。需要说明的是,1~128这个区间我们不做讨论,因为1字节就算是对齐到2字节也有百分之五十的浪费率,这里我们就从第二个区间开始进行计算。
根据上面的公式,我们要得到某个区间的最大浪费率,就应该让分子取到最大,让分母取到最小。比如129~1024这个区间,该区域的对齐数是16,那么最大浪费的字节数就是15,而最小对齐后的字节数就是这个区间内的前16个数所对齐到的字节数,也就是144,那么该区间的最大浪费率也就是15 ÷ 144 ≈ 10.42 % 。同样的道理,后面两个区间的最大浪费率分别是127 ÷ 1152 ≈ 11.02 %
// 整体控制在最多10%左右的内碎⽚浪费
// [1,128] 8byte对⻬ freelist[0,16)
// [128+1,1024] 16byte对⻬ freelist[16,72)
// [1024+1,8*1024] 128byte对⻬ freelist[72,128)
// [8*1024+1,64*1024] 1024byte对⻬ freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对⻬ freelist[184,208)
我们可以创建一个SizeList来存放对齐规则
//对齐规则
class SizeList
{
public:// 整体控制在最多10%左右的内碎⽚浪费// [1,128] 8byte对⻬ freelist[0,16)// [128+1,1024] 16byte对⻬ freelist[16,72)// [1024+1,8*1024] 128byte对⻬ freelist[72,128)// [8*1024+1,64*1024] 1024byte对⻬ freelist[128,184)// [64*1024+1,256*1024] 8*1024byte对⻬ freelist[184,208)//alignNum对齐数static inline size_t _RoundUp(size_t byte, size_t alignNum){return (((byte + alignNum - 1)) & ~(alignNum - 1));}//size需要的空间大小static inline size_t RoundUp(size_t size){if (size <= 128){return _RoundUp(size, 8);}else if (size <= 1024){return _RoundUp(size, 16);}else if (size <= 8 * 1024){return _RoundUp(size, 128);}else if (size <= 64 * 1024){return _RoundUp(size, 1024);}else if (size <= 256 * 1024){return _RoundUp(size, 8 * 1024);}else{assert(false);return -1;}}static inline size_t _Index(size_t bytes, size_t align_shift){//(8,3) 下标是0 自由链表后面挂着的是8字节的对象//(15,3) 下标是1 自由链表后面挂着的是16字节的对象//(127,3) 下标是15 自由链表后面挂着的是128字节的对象return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);// 每个区间有多少个链static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128) {return _Index(bytes, 3);}else if (bytes <= 1024) {return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 81024) {return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];}else if (bytes <= 64 * 1024) {return _Index(bytes - 8 * 1024, 10) + group_array[2] +group_array[1] + group_array[0];}else if (bytes <= 256 * 1024) {return _Index(bytes - 64 * 1024, 13) + group_array[3] +group_array[2] + group_array[1] + group_array[0];}else {assert(false);}return -1;}static size_t NumMoveSize(size_t size){assert(size > 0);//size是一个对象大小int num = MAX_BYTES / size;if (num < 2){num = 2;}if (num > 512){num = 512;}return num;}};
我们从这个对齐的规则就可以看出,一共是需要208个自由指针来链接挂起对象,而对齐规则我大概来说一下。
对齐规则以8字节对齐是最合适的,而不是4字节或者6字节,为什么这么说呢?
因为如果有空间挂在自由链表后面的话,他前面一个对象要存储后面对象的指针,如果是4字节来对齐的话,那么如果环境是32位的话是可以存储的,那如果是64位的话就会崩溃了。不管怎么说,他都至少是要能存储后面的指针才行。
我们把代码拆开来看,我们计算对齐数是按照对齐规则来计算的
对齐规则:
可以看到,如果我们需要[1,128]之内大小的对象,对齐就按照八字节来对齐,如果我们需要的大小是8字节,所以就走第一个逻辑,传给_RoundUp的参数是(8,8),通过里面的计算,返回的对齐数就是8。对齐数有什么用呢?对齐数的作用是从central cache中申请内存的。
———————————————————————————————————————————
如果给了一定size的对象,那么我们该怎么去计算他从那个thread cache中获取对象呢?
也就是获取数组的下标,我们该怎么做呢?
先来看看这段代码
static inline size_t _Index(size_t bytes, size_t align_shift){//(8,3) 下标是0 自由链表后面挂着的是8字节的对象//(15,3) 下标是1 自由链表后面挂着的是16字节的对象//(127,3) 下标是15 自由链表后面挂着的是128字节的对象return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);// 每个区间有多少个链static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128) {return _Index(bytes, 3);}else if (bytes <= 1024) {return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 81024) {return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];}else if (bytes <= 64 * 1024) {return _Index(bytes - 8 * 1024, 10) + group_array[2] +group_array[1] + group_array[0];}else if (bytes <= 256 * 1024) {return _Index(bytes - 64 * 1024, 13) + group_array[3] +group_array[2] + group_array[1] + group_array[0];}else {assert(false);}return -1;}static size_t NumMoveSize(size_t size){assert(size > 0);//size是一个对象大小int num = MAX_BYTES / size;if (num < 2){num = 2;}if (num > 512){num = 512;}return num;}
来截取一部分的代码来讲解
就比如我取的bytes是15,那通过 _Index中的计算,可以得出下标是1;如果取的是127,则计算出的下标就是15
以上不管是计算数组下标还是进行对齐,都设计成了static inline,这是因为每一次进程运行的时候,会经常来申请空间,而每一次申请,就会访问一次这几个函数,使用static inline可以使得程序更加高效
2.thread cache类的整体设计
目前来看thread cache的方法肯定是有一个自己的"开空间"和"释放空间",而thread cache他下面又挂着许多的自由链表,我们可以用一个FreeList类型的数组来存放这些自由链表。
所以我们可以这样设计thread cache类
#include "Common.h"class ThreadCache
{
public:void* Allocate(size_t size);void Deallocate(void* ptr, size_t size);
private:FreeList _FreeList[NFREELIST];
};//tls来解决高并发的问题
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
可以看到thread cache里面有三个接口,一个是Allocate,一个是Deallocate
1.Allocate()
这个接口是从_FreeList数组中找到某个映射到的下标,然后再去取对象出来。
void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);size_t alignSize = SizeList::RoundUp(size);//对齐数size_t index = SizeList::Index(size);//哈希桶数组下标if (!(_FreeList[index].empty())){//如果自由链表不是空的,就头删一块空间还给系统调用_FreeList[index].pop();}else{//否则就向中心Cache来申请空间return FetchFromCentralCache(index, alignSize);}
}
简单解释一下这个代码。
首先我们已经知道这个申请空间是需要传递一个所需要空间大小的值的,也就是参数size,然后首先计算一下对齐数,然后计算对应的哈希桶下标,比如我传进来的大小是127,那么他计算出来的index就是15,所以我们就在下标为15里面找有没有挂起的对象,如果有,就头删,给程序用,否则就从Central cache中申请对象,由于先前已经计算出了对象所映射的哈希桶和对齐数,所以我们在申请的时候,只需要传入所映射的哈希桶的一个下标和对齐数即可
2.Deallocate()
这个函数的主要功能就是回收空间。
void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);size_t index = SizeList::Index(size);_FreeList[index].push(ptr);
}
他的主要逻辑就是首先计算需要释放的对应的桶下标,然后把ptr插入到index的位置。
3.threadcacheTLS无锁访问
我们在创建thread cache对象的时候,是希望每个线程都独享一个自己的thread cache对象的,那为什么要这样设计呢?
这是因为如果thread cache对象作为一个全局变量或者静态变量的时候,就可能会导致多个线程同时初始化thread cache对象,就会进阶导致内存泄漏(多次new ThreadCache),旧实例没有被释放,或者可能导致链表损坏(多个线程操作同一链表的头节点,引发指针错乱)
所以我们需要在thread cache对象前面加上一个 static _declspec(thread) 来保证每个线程都会独享一个自己的thread cache对象,就不会出现同时来对其进行初始化和操作,从而避免了并发的问题。
//tls来解决高并发的问题
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
但不是每个线程被创建时就立马有了属于自己的thread cache,而是当该线程调用相关申请内存的接口时才会创建自己的thread cache,因此在申请内存的函数中会包含以下逻辑。
//申请空间
static void* ConcurrentAlloc(size_t size)
{if (pTLSThreadCache == nullptr){pTLSThreadCache = new ThreadCache;}return pTLSThreadCache->Allocate(size);
}//回收空间
static void* ConcurrentFree(void* ptr)
{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, 0);
}
(我们可以在这里举一个例子,比如我的thread cache是都有一个独享的thread cache对象,如果对象空间不够用的话,就会从central cache中申请,如果thread cache的两个线程的对象都计算的index假设是1,那就同时向central cache中申请空间,那这样的话就会存在线程竞争,就会出现并发的现象)
3.central cache的设计
与thread cache的结构类似,central cache也是一个哈希桶的结构,映射关系是和thread cache一样的,不同的是他每个哈希桶都挂着的是Span List结构,不过每个映射桶下面的Span中的大内存被按照映射关系切割成了小块内存对象被悬挂在span的自由链表中
如下图,如果是8byte的话,后面挂着的span都会被切割成8byte
1.申请内存
1.当thread cache中没有内存的时候,就会批量向central cache申请一些内存对象,这里批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法,central cache也有一个哈希映射的spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。
2.central cache映射的spanlist中所有的span都没有内存以后,则需要向span cache申请一个新的span对象,拿到span以后将span管理的内存按大小切好作为自由链表连接到一起,然后从span中取对象给thread cache
3.central cache中挂着的span中使用了use_count来记录分配了多少歌对象出去,分配一个span对象给了thread cache,这个use_count就++
2.释放内存
当thread cache过长或者线程销毁,有些对象用不到的时候,则会将内存释放回central cache中,释放回来的时候,use_count就回--。当use_count == 0的时候就表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。
3.管理连续多个页大块内存跨度结构(span)
对于span结构体,我们首先要搞明白一个叫做页号的东西
在32位的环境下,他的一个size_t就是2^32次方,一页大概就是4G或者8G,如果是按照8K来算算,那么就得是2^32/2^13=2^19个页,其实这还好,但如果是64位的环境下,那就是2^64/2^13=2^51个页,这太多了,代码就会溢出,那么想解决这个问题,我们就可以使用条件编译。
#ifdef _WIN64typedef unsigned long long PAGE_ID;#elif _WIN32typedef size_t PAGE_ID;
当环境是64位的时候,PAGE_ID就是longlong型的,在32位下,就是size_t 类型的,这样就很好的解决了不同环境下的PAGE_ID会溢出的问题。
整体结构设计
struct Span
{PAGE_ID _pageID = 0;size_t _n = 0;Span* _next = nullptr;Span* _prve = nullptr;size_t _useCount = 0;;void* _freeList = nullptr;
};
值得一提的是这个span设计成了一个双向链表的形式
那既然是链表,就得把他们链接起来,所以接下来的设计就非常简单了。
struct SpanList
{
public:mutex _mtx;SpanList(){_head = new Span;_head->_next = _head;_head->_prve = _head;}void insert(Span* pos, Span* newspan){assert(pos);assert(newspan);//prve newspan posSpan* _prve = pos->_prve;_prve->_next = newspan;newspan->_prve = _prve;newspan->_next = pos;pos->_prve = newspan;}void eraser(Span* pos){assert(pos);assert(pos != _head);//prve pos nextSpan* prve = pos->_prve;Span* next = pos->_next;prve->_next = next;next->_prve = prve;}private:Span* _head = nullptr;};
4.central cache的代码设计
为了保证central cache的单一性,在设计central cache类的时候,我们就可以设置为单例模式,也就是设置成全局唯一的对象。单例模式分成懒汉模式和饿汉模式,我在这里设置成的是一开是就有,就是饿汉模式
//设计成单例模式
class CentralCache
{
public:static CentralCache* getInst(){return &_sInst;}//从中心缓存获取一定数量的对象给thread cachesize_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);private:SpanList _spanList[NFREELIST];private:CentralCache(){}CentralCache(const CentralCache&) = delete;static CentralCache _sInst;};
这边的自由链表的底层逻辑和前面的一样,都是设置一个相应的对象类的数组来存储自由链表。
如果是我们在申请对象的时候,怎么来控制central cache给他多少对象才合适呢?如果thread cache只要1个,那我给他10个,就会造成浪费,如果要100个,我给他10个,又会不够用。
为此,这里使用了一种算法来进行控制。
static size_t NumMoveSize(size_t size){assert(size > 0);//size是一个对象大小int num = MAX_BYTES / size;if (num < 2){num = 2;}if (num > 512){num = 512;}return num;}
这里就是设置成了一个给出的对象的上下限。
这里可以大概的描述一下,已知MAX_BYTES = 256*1024(256K),如果我想要一个8字节的,那就除8,大概就是3w多个,那给3w多个就很浪费,所以上限就给了个512.
5.FetchFromCentralCache()
然后我们就可以设计从central cache申请对象到thread cache的函数了。
现在我们来思考一下thread cache怎么从central cache申请空间。
如果我thread cache此时需要一个对象,那你就给一个对象吗?需要一个就给一个?这不太好吧?如此一来,频繁地申请必定会带来效率的损失。那如果一次给10个呢?那我只需要一个是不是就浪费了,我们也得达成一种小对象给多一点,大对象给少一点的想法,所以,为了平衡这两者之间的差异,就有这一种办法。
这就是慢开始的调节算法
直接来看代码
//从Central Cache中申请内存到thread Cache
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{//慢开始反馈调节算法//size越小,一次向centralcache要的batchNum就越大//size越大,一次向centralcache要的batchNum就越小size_t batchNum = std::min(_FreeList[NFREELIST].maxSize(),SizeList::NumMoveSize(size));if (batchNum == _FreeList[NFREELIST].maxSize()){_FreeList[NFREELIST].maxSize() += 1;}void* start = nullptr;void* end = nullptr;size_t actrualNum = CentralCache::getInst()->FetchRangeObj(start, end, batchNum, size);if (actrualNum == 1){assert(start == end);return start;}else{_FreeList[index].pushRange(NextObj(start),end);return start;}}
总体思路:这个是一个从central cache申请对象到thread cache的功能,batchNum就是一个来确定要给多少的对象,这里使用了一个上下限的方法,下限是1,上限就是用MAX_BYTES/size
的数,这个之前就讲过,现在不赘述。
之前我们提出的问题就是你如果需要1个那我就给你一个吗?显然不行,所以我们在这里加入了if来判断,如果这个batchNum == 1的话,那么这个maxSize() += 1,这么做的目的就是在下次进入这个函数的时候,能够直接给个2,因为你又从central cache申请空间,如果这次不多给你一点,你用完了又来申请,这就势必会导致效率的降低,直到一直申请他一直++,直到与NumMoveSize相等的时候,这个时候batchNum就与NumMoveSize相等了,size对象越小,NumMoveSize越高,反之,NumMoveSize越小。
而如果central cache里面没有我想要的那么多个对象,那该怎么办?
所以我们要创建一个实际数字,他接收的是FetchRangeObj()这个函数里面的数值,有多少就给多少,此时我们要断言一下,他最少都要给1个,有没有可能一个都没有呢?这个是不可能的,如果没有的话,会在前面抛出异常,这一步根本走不到
如果他不是1的话,就走下面的else那部分了这里是干嘛的呢?
继续看下面
6.FetchRangeObj()
这个函数是从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{size_t index = SizeList::Index(size);_spanList[index]._mtx.lock();Span* span = getOneSpan(_spanList[index], size);assert(span);assert(span->_freeList);start = span->_freeList;end = start;size_t actrualNum = 1;//给1的原因是因为跟end实际走的步伐是差一的//batchNum = 4// start// 1 -> 2 -> 3 -> 4// endsize_t i = 0;while (i < batchNum - 1 && NextObj(end) != nullptr){end = NextObj(end);i++;actrualNum++;}span->_freeList = NextObj(end);NextObj(end) = nullptr;_spanList[index]._mtx.unlock();return actrualNum;
}
这里是代码的讲解,首先我们要计算出我们要的是几号桶的对象
由于这是从中心缓存申对象到threadcache的,很大概率会出现并发现象,所以必须要加上互斥锁来解决这个并发问题。
然后我们创建一个span对象,通过getOneSpan(后面会讲)来进行获取一个span传过来
所以自然而然地,start指针就指向了span的头指针,这里我们把end一开始也指向start
由于我们需要传过去的对象大小就是start指针到end指针指向的这么多的对象,所以这个时候我们需要找到end到底指向的是什么
所以我们在while这个逻辑里面规定的i < batchNum - 1并且nextobj(end) 不是空指针,就让我们的end指针一直往后走,然后actrualNum实际走的步数就++
最后end就走到了我们所需要去的对象大小的最后一个位置,这个时候span->_freeList就指向了end的下一个位置,也就是说,这个时候我们已经取到了strat到end的对象大小,那这个时候原本的span的头节点是不是要重新指向空闲的对象?所以这就是被分割成了两个链表,一个是我们所需要的start ---- end ,一个是span
然后就是我们end后面就设置成空指针
4.PageCache的设计
由于全局只有一个PageCache,所以跟central cache一样,它也被设计成单例模式
直接来看代码
//设计的PageCache一共是有128个桶,为什么要这样设计呢?
#pragma once
#include "Common.h"class PageCache
{
public:mutex _pageMtx;static PageCache* getInst(){return &_sInst;}Span* NewSpan(size_t k);private:SpanList _spanLists[NPAGELIST];//thread cache用完了对象去找central cache,central cache用完了就去找PageCache,所以也要设计成单例模式PageCache() {};PageCache(const PageCache&) = delete;static PageCache _sInst;
};
我们来分析一下pagecache的结构
其实跟central cache的类设计差不多,因为都设置成了单例模式,就不必多说了
解析来就可以看我们上面出现过但并未讲解过的代码:getOneSpan
1.getOneSpan()
顾名思义,就是获取一个新的Span,我们要清楚的知道,central cache的构造就是每个哈希桶挂着的是SpanList链表结构,但是呢,每个映射桶下面的span中的大块内存被按照映射关系切成了一个个小内存块对象挂在span的自由链表中。比如第一个桶是8byte,那他后面挂着的span就是按照8byte来切分的,如果是16byte,那就是按照16byte来切分,从下面这张图我们可以看到,每个span中又挂着许许多多的小块内存
当一个thread cache向central cache申请对象,且central cache中没有对象的时候,此时central cache就会向page cache申请对象
直接来看看代码
Span* getOneSpan(SpanList& _spanList, size_t size)
{Span* iterator = _spanList.begin();while (iterator != _spanList.end()){if (iterator->_freeList != NULL){return iterator;}else{ iterator = iterator->_next;}//cout << iterator->_next;}//先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来就不会阻塞_spanList._mtx.unlock();PageCache::getInst()->_pageMtx.lock();Span* span = PageCache::getInst()->NewSpan(SizeList::NumMovePage(size));PageCache::getInst()->_pageMtx.unlock();//计算span的起始地址和大块内存的字节数//起始地址=页号*8*1024 也就是等于 页号 << 13char* start_address = (char*)(span->_pageID << PAGE_SHIFT);size_t bytes = span->_n << PAGE_SHIFT; //计算对象的内存大小char* end = start_address + bytes; //末端地址就是起始地址 + 对象大小span->_freeList = start_address;start_address += size;void* tail = span->_freeList;int i = 1;while (start_address < end){i++;NextObj(tail) = start_address;tail = NextObj(tail);start_address += size;}_spanList._mtx.lock();_spanList.pushfront(span);return span;
}
在前面我们知道了他传的参数是这样的
从这里传递的是_spanList[index],代表的是传递几号桶需要一个可用的Span,函数的作用就是从_spanList中获取一个可用的Span,用于分配内存块
首先函数开始用iterator来便利_spanList,检查每个Span的ferrList是否不为null,如果找到了可用的Span就立刻返回,如果没有找到,就需要Page Cache重新申请一个新的span了,我们在这里假设他能找到一个可用的Span,接着往下看代码。
此时我们已经知道了一个可用的Span,我们就得计算一下他的其实地址起始地址的计算方法iu是页号 左移 13 位,这里我们以8K为一页,如果以4K为一页的话,就是左移12,我们这里选用前者。
然后计算Span的大小也是,用有多少页来左移13位,就得出了他有多少字节
那末端地址就很简单了,直接用起始地址+字计数就是到了末端地址
此时我们让span的_freeList指向起始地址,然后我们再让起始地址往后加一个size的大小,因为如果他这按照的是以size来切分的话,那他后面一块的地址就是起始地址+size
然后我们用tail来指向span的freeList,用tail来走这个隐式链表,接下来在循环里面,就让tail一直走,走到end的位置,然后把这一段span插入到_spanList里面去。
最后返回他
2.NewSpan()
上面的情况是我们假设把page cache申请好的,然后直接来使用,略过了申请的过程,现在我们来谈谈申请的过程,也就是newspan
这个函数就是当central cache需要从page cache中申请对象的时候,才会被调用,具体是怎么申请的呢?
我们来直接看看代码
//获取一个K
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGELIST);//首先检查_spanLists里面有没空闲的span,如果有,就直接返回if (!(_spanLists[k].empty())){return _spanLists->popFront();}//如果没有,就往后面的span寻找,然后切分else{for (size_t i = k + 1; i < NPAGELIST; i++){if (!(_spanLists[i].empty())){Span* nSpan = _spanLists[i].popFront();Span* kSpan = new Span;kSpan->_pageID = nSpan->_pageID;kSpan->_n += k;nSpan->_pageID += k;nSpan->_n -= k;_spanLists[nSpan->_n].pushfront(nSpan);return kSpan;}}}Span* bigSpan = new Span;void* ptr = SystemAlloc(NPAGELIST - 1);bigSpan->_pageID = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = PAGE_SHIFT - 1;_spanLists[bigSpan->_n].pushfront(bigSpan);return NewSpan(k);
}
接下来直接讲解代码。
这个函数的作用就是获取一个K页的page,比如说我想获取一个4页的page,那我进来这个函数的时候首先就先检查一下当前_spanList[k],也就是_spanList[4]号桶是否为空,如果他不是空,就证明有空闲的span,直接把这个span使用popFront方法把空闲的span弹出,然后直接返回给上一层函数(getOneSpan),那如果当前的桶,假设是_spanList[4]没有的话,那就往下一个桶来寻找,已知我们当前的k=4,那么我们就从5号桶开始寻找,假设我们找到了10号桶不是空的话,那么接下来的做法就是把10号桶中的10页page切分成一个有4页的page和一个有6页的page,这个4页的page就继承当前的10页的page的id,然后返回给central cache,剩下的6页的page,就挂到相对应的6页的桶上面去。这么说有点抽象,来画个图理解一下。
以上是page cache中存在空闲的span,那如果说找完了128个桶都没有呢?
那这个时候就需要向系统来申请了,使用virtualalloc来申请对象,此时获取一个申请完成之后的指针,把这个指针右移13位,就成了页号,然后把他插入到一个大的span里面。
———————————————————————————————————————————
以上是内存申请的部分,已经完结了,接下来就是内存释放的部分了
———————————————————————————————————————————
5.thread cache的内存回收
我们都知道,thread cache在被释放空间的时候,首先会计算一个桶,然后把那个空间插入到相对应的桶就行了,但是随着被释放的空间越来越多,那相应的桶后面挂着的对象就会越来越长,在tcmalloc里面,他会判断当前的内存是否是大于2M,如果大于的话,就会进行回收的动作。
而我们这里就搞简单点,就算他当后面挂着的对象长度大于一次性批量申请的长度的时候,我就走释放逻辑
void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);size_t index = SizeList::Index(size);_FreeList[index].push(ptr);//当链表的长度大于一次批量申请的内存的时候,就开始释放if (_FreeList[index].Size() >= _FreeList[index].maxSize()){ListTooLong(_FreeList[index], size);}}
此时我们需要实现一个size()的功能,这个很简单,多加一个_size变量,然后当插入一个对象的时候就++size,pop一个对象的时候就--size,如果说是插入一段对象?那就直接+=n(插入对象的个数)就行了
这边还有一个接口是ListTooLong需要实现,接下来我们来实现一下
1.ListTooLong()
void ThreadCache:: ListTooLong(FreeList& list, size_t size)
{void* start = nullptr;void* end = nullptr;list.popRange(start, end, list.maxSize());CentralCache::getInst()->ReleaseListToSpan(start, size);
}
这个函数的逻辑就是显而易见的,首先把当前的freelist一部分的对象pop出来,然后把他回收到central cache中
2.popRange()
这个就是pop掉一部分的对象,直接来看代码
void popRange(void*& start, void*& end, size_t n){assert(n >= _size);start = _freeList;end = start;for (size_t i = 0; i < n - 1; i++){end = NextObj(end);}_freeList = NextObj(end);NextObj(end) = nullptr;_size -= n;}
我们这个时候就可用让end来走,一直走到后面
假如我们要释放掉4个对象,那我们就走三步
刚好就到我们所需要释放的最后一个对象,然后把end的下一个位置置空,让_freeList指向新的对象
6.central cache的内存回收
对于central cache的回收,我们重新来捋一下central cache的结构
可以看到,每一个桶后面都挂着很多个span,然后span下面又挂着许多的小内存块。
那这么说,有些内存块不被使用的时候,就会被还回来,那么我怎么知道他是哪个span的内存块呢?
这边参考大佬的想法后,得出一个结论,就是在分配对象出去之前,把对象和每一个span做一个映射
这里考虑使用unordered_map来进行构建映射
把每一个span都对应一个id,放在pagecache.h里面
所以,我们可以在切分一个span的时候,就把这个映射关系拉上去
//建立id和kspan的映射,方便central cache回收时查找对应的spanfor (size_t i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageID + i] = kSpan;}
kSpan一共有_n页,所以我们从页号开始,把kSpan里面的所有的页全都映射到kSpan里面,让我们知道这些页是属于kSpan的
然后我们的page cache还需要一个函数
1.MapObjectToSpan()
这个函数就是从map里面来寻找一个相对应的span的
直接来看代码
Span* PageCache::MapObjectToSpan(void* obj)
{PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);auto ret = _idSpanMap.find(id);if (ret != _idSpanMap.end()){return ret->second;}else{assert(false);return nullptr;}
}
前面说了,如果我们想要通过页号来计算地址的话,就右移13来计算,反之,如果想要通过地址来寻找页号的话,那就左移13,通过计算我们算出一个id,然后通过find来查找是否有id这个的span,如果有,就返回map<id,span*>中的span指针,否则就返回空
2.ReleaseListToSpan()
这个函数就是把对象回收到Span里面
首先来看看代码
size_t CentralCache::ReleaseListToSpan(void*& start, size_t size)
{size_t index = SizeList::Index(size);_spanList->_mtx.lock();while (start){void* next = NextObj(start);Span* span = PageCache::getInst()->MapObjectToSpan(start);NextObj(start) = span->_freeList;span->_freeList = start;span->_useCount--;if (span->_useCount == 0){_spanList[index].eraser(span);span->_freeList = nullptr;span->_next = nullptr;span->_prve = nullptr;_spanList[index]._mtx.unlock();PageCache::getInst()->_pageMtx.lock();PageCache::getInst()->RelaseSpantoPageCache(span);PageCache::getInst()->_pageMtx.unlock();_spanList[index]._mtx.lock();}start = next;}_spanList->_mtx.unlock();return 0;
}
首先一进来就先计算我们需要回收的几号桶,然后上个锁,避免别的其它进程来使用或者归还
(也就是说,我们在对线程的使用或者归还的时候,是需要加互斥锁来规避并发行为的)
接着把我们传过来的start传入到MapObjectToSpan(),找到了这个span之后,就对当前的span的自由链表进行头插,然后usecount--
当usecount == 0的时候,就说明所有切分出去的小块内存全都回收回来了,这个span就可以再回收给page cache,使他进行前后合并
所以这个时候我们就可以把这块大的span给eraser掉,然后把该清空的清空,再把解了了
接下来,就是把归还到central cache的对象归还到page cache了
7.page cache的内存回收
在page cache中的内存回收,他回收过来的span并不能发直接挂上去,如果有许许多多被切分成1页2页这种的,那他如果直接挂上去,下次申请的时候,肯定不会再有更大更连续的空间了,所以,对于回收回来的span,我们更期望的是尝试把他进行前后合并,合并之后再挂上去
合并有三个原则,第一个原则就是找不到的id不合并,第二个原则就是正在使用的span不合并,第三个原则就是如果页数加起来大于128的页不合并
那怎么才能知道当前的span有没有正在被使用呢?
多加一个_isUse变量,那为什么不直接使用usecount来作为标记呢?
举个例子,当我们的central cache 刚申请到一个span的时候,还未被切分,此时的usecount就是0,那么就会被认为是还未被使用的span,那你还没切分就把他拿回去回收合并了,这肯定不合适
我们使用isuse在什么时候才标记为被使用的状态呢?
当我们从page cache中获取一个新的span到central cache的时候,在被切分之前,此时就应该标记成被使用的状态了。
怎么合并呢?我们来看下代码
void PageCache:: RelaseSpantoPageCache(Span* span)
{while (1){PAGE_ID prve_id = span->_pageID - 1;auto ret = _idSpanMap.find(prve_id);//如果找不到这个id,就不合并if (ret != _idSpanMap.end()){break;}Span* prveSpan = ret->second;//说明前一个span正在被人使用,不合并if (prveSpan->_isUse == true){break;}if (prveSpan->_n + span->_n > NPAGELIST-1){break;}span->_pageID = prveSpan->_pageID;span->_n += prveSpan->_n;_spanLists[prveSpan->_n].eraser(prveSpan);delete prveSpan;}//向后合并while (1){PAGE_ID nextid = span->_pageID + span->_n;auto ret = _idSpanMap.find(nextid);if (ret != _idSpanMap.end()){break;}Span* nextSpan = ret->second;if (nextSpan->_n + span->_n > NPAGELIST - 1){break;}nextSpan->_pageID = span->_pageID;span->_n += nextSpan->_pageID;_spanLists[nextSpan->_n].eraser(nextSpan);delete nextSpan;}_spanLists[span->_n].pushfront(span);span->_isUse = false;_idSpanMap[span->_pageID] = span;_idSpanMap[span->_pageID + span->_n] = span;}
首先我们需要尝试找到当前页号的前一个页,比如我现在的id是1000页,那么我们就需要尝试寻找前面是否有id为999的页,所以我们用一个prve_id来记录前一个页的id,如果找不到,那就不要合并,所以此时的页号就变成了前一页的页号,然后n就是当前的span和前一个span相加
由于此时的prvespan和span已经合并了,那么他有可能在某个地方挂着,所以我们需要把他给eraser掉
接下来的向后合并页是一个道理,需要注意的是,他的下一个页号就是当前页号+一共有多少页
合并完成之后,就需要把合并好的一个span挂到当前的_spanList里面,然后映射一下起始页号和末端页号,方便进行下一次的合并
8.大于256KB的大块内存申请问题
1.内存申请
在这个标题以上,我们都只是考虑了申请的内存块是低于256K,那么如果当我们需要申请的内存块是大于256K的时候,我们该如何去处理呢?
如果说我们申请或释放超过256K的内存块,我们这个时候就可以直接向堆来申请内存。
那我们在申请空间的函数里面就可以这样写
if (size > MAXBYTE){size_t alignSize = SizeList::RoundUp(size);size_t kPage = alignSize >> PAGE_SHIFT;PageCache::getInst()->_pageMtx.lock();Span* span = PageCache::getInst()->NewSpan(kPage);PageCache::getInst()->_pageMtx.unlock();void* ptr = (void*)(span->_pageID << PAGE_SHIFT);return ptr;}
总体上的流程还是跟普通的内存块申请一样,但是针对大于256K的地方要做特殊处理
首先就是计算对齐数,我们进到RoundUp函数里面看看是什么样子
我们可以看到,在RoundUp函数里面,我多加了一个条件语句,当他走到这里的时候,就说明此时申请的页已经是大于256K了,然后就计算出对齐数,刚好是257*8*1024,说明了就是按页来对齐,总共有257页
那这个时候就要进入NewSpan里面了,这边是大于了256K,那该怎么去创建一个新的span呢?
if (k > NPAGELIST - 1){void* ptr = SystemAlloc(k);Span* span = new Span; span->_pageID = (PAGE_ID)ptr >> PAGE_SHIFT;span->_n = k;_idSpanMap[span->_pageID] = span;return span;}
也没什么好讲的,就是单独拎出来处理,然后拿map来映射一下
最后就计算一下起始地址,很简单,还是用id来左移13位就可以计算出起始地址了,这就是申请的过程
2.内存回收
内存申请是从堆中申请的,那么释放也就是放回堆里面
if (size > MAXBYTE){PageCache::getInst()->_pageMtx.lock();PageCache::getInst()->RelaseSpantoPageCache(span);PageCache::getInst()->_pageMtx.unlock();}
还是一样,当要操作span的时候,上个锁
跳到了RelaseSpantoPageCache
//大于128页直接还给堆if (span->_n > NPAGELIST - 1){void* ptr = (void*)(span->_pageID << PAGE_SHIFT);SystemFree(ptr);//delete span;_spanPool.Delete(span);return;}
以下是SystemFree的方法
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32VirtualFree(ptr, 0, MEM_RELEASE);
#else// sbrk unmmap等
#endif
}
9.使用定长内存池配合脱离使用new
一开始我们在写定长内存池的时候,我们就提到过是可以在后面使用到的,那么该怎么把他们联动起来呢?
我们可以在pagecache中加一个这个,里面的类型是Span
然后把所有的new换成spanpool.new()
所有的delete换成_spanPool.delete()
10.释放对象时优化为不传对象大小
一开始我们要释放对象的时候,是需要传一个对象的大小的,这样还是太麻烦了,其实我们可以直接在span里面再加一个变量用来记录span的大小,然后在这里直接获取就行
然后在切分对象的时候获取这个东西
最后在释放的的时候也可以获取