LevelDB 源码阅读:如何优雅地合并写入和删除操作

ops/2025/1/20 4:31:54/

LevelDB 支持写入单个键值对和批量写入多个键值对,这两种操作的处理流程本质上是相同的,都会被封装进一个 WriteBatch 对象中,这样就可以提高写操作的效率。

在 LevelDB 中,WriteBatch 是通过一个简单的数据结构实现的,其中包含了一系列的写入操作。这些操作被序列化(转换为字节流)并存储在内部的一个字符串中。每个操作都包括操作类型(如插入或删除),键和值(对于插入操作)。

当 WriteBatch 被提交给数据库时,其内容被解析并应用到 WAL 日志和 memtable 中。不管 WriteBatch 中包含多少操作,它们都将作为一个整体进行处理和日志记录。

WriteBatch 的实现主要涉及到 4 个文件,接下来一起看看。

  1. include/leveldb/write_batch.h:对外暴露的接口文件,定义了 WriteBatch 类的接口。
  2. db/write_batch_internal.h:内部实现文件,定义了 WriteBatchInternal 类,提供了一些操作 WriteBatch 的方法。
  3. db/write_batch.cc:WriteBatch 类的实现文件,实现了 WriteBatch 类。
  4. db/write_batch_test.cc:WriteBatch 类的测试文件,用于测试 WriteBatch 的功能。

WriteBatch 接口设计

我们先来看 write_batch.h 文件,这里定义了 WriteBatch 类对外暴露的一些接口。 LevelDB 代码中的注释十分清晰,不过这里先省略注释:

class LEVELDB_EXPORT WriteBatch {public:class LEVELDB_EXPORT Handler {public:virtual ~Handler();virtual void Put(const Slice& key, const Slice& value) = 0;virtual void Delete(const Slice& key) = 0;};WriteBatch();// Intentionally copyable.WriteBatch(const WriteBatch&) = default;WriteBatch& operator=(const WriteBatch&) = default;~WriteBatch();void Put(const Slice& key, const Slice& value);void Delete(const Slice& key);void Clear();size_t ApproximateSize() const;void Append(const WriteBatch& source);Status Iterate(Handler* handler) const;private:friend class WriteBatchInternal;std::string rep_;  // See comment in write_batch.cc for the format of rep_
};

其中 WriteBatch::Handler 是一个抽象基类,定义了处理键值对操作的接口,只包括 Put 和 Delete 方法。这样的设计允许 WriteBatch 类实现与具体存储操作解耦,使得 WriteBatch 不必直接知道如何将操作应用到底层存储(如 MemTable)。

通过继承 Handler 类,可以创建多种处理器,它们可以以不同的方式实现这些方法。比如:

  1. MemTableInserter: 定义在 db/write_batch.cc 中,将键值操作存储到 MemTable 中。
  2. WriteBatchItemPrinter:定义在 db/dumpfile.cc 中,将键值操作打印到文件中,可以用来测试。

另外还有一个 friend class WriteBatchInternal 作为 WriteBatch 的友元类,能够访问其私有和受保护成员。WriteBatchInternal 主要用来封装一些内部操作,这些方法不需要对外暴露,只在内部用到。通过将内部操作方法隐藏在 WriteBatchInternal 中,保持了对象的接口清晰,可以自由地修改内部实现而不影响到使用这些对象的代码

WriteBatch 使用方法

在应用层,我们可以通过 WriteBatch 来批量写入多个键值对,然后通过 DB::Write 方法将 WriteBatch 写入到数据库中。

这里 WriteBatch 支持 Put 和 Delete 操作,可以合并多个 WriteBatch。如下使用示例:

WriteBatch batch;
batch.Put("key1", "value1");
batch.Put("key2", "value2");
batch.Delete("key3");// 合并另一个批次
WriteBatch another_batch;
another_batch.Put("key4", "value4");
batch.Append(another_batch);// 写入数据库
db->Write(writeOptions, &batch);

WriteBatch 实现细节

那么 WriteBatch 是怎么实现的呢?关键在 db/write_batch.cc,该类中有一个 private 成员 std::string rep_ 来存储序列化后的键值操作。我们先来看看这里的存储数据协议:

+---------------+---------------+----------------------------------------+
|   Sequence    |     Count     |                Data                    |
|  (8 bytes)    |   (4 bytes)   |                                        |
+---------------+---------------+----------------------------------------+|                 |                   |v                 v                   v+-------+         +-------+          +-------+|Record1|         |Record2|   ...    |RecordN|+-------+         +-------+          +-------+|                 |v                 v+-----------------+ +-----------------+| kTypeValue      | | kTypeDeletion   || Varstring Key   | | Varstring Key   || Varstring Value | |                 |+-----------------+ +-----------------+Varstring (可变长度字符串):
+-------------+-----------------------+
| Length (varint32) | Data (uint8[])  |
+-------------+-----------------------+

该字符串前 12 个字节是头部元数据部分,包括 8 个字节的序列号和 4 个字节的 count 数。接下来是一个或多个操作记录,每个记录包含一个操作类型和键值对。操作类型是一个字节,可以是 Put 或者 Delete 操作。键和值都是可变长度的字符串,格式为 varstring。

LevelDB 的序列号机制

rep_ 头部 8 个字节代表64位的数字 sequence(序列号),WriteBatchInternal 友元类提供了两个方法来获取和设置 sequence number,内部是用 EncodeFixed64 和 DecodeFixed64 方法来编解码 64 位的序列号。

SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {return SequenceNumber(DecodeFixed64(b->rep_.data()));
}void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {EncodeFixed64(&b->rep_[0], seq);
}

序列号是 LevelDB 中的全局递增标识符,用于实现版本控制和操作排序。每个 WriteBatch 在执行时会获得一段连续的序列号,批次内的每个操作(Put/Delete)都会分配到其中的一个序列号。序列号在 LevelDB 中有三个核心作用:

  1. 版本控制:LevelDB 中的每个 key 可以有多个版本,每个版本都对应一个序列号。在读取时,通过比较序列号来确定应该返回哪个版本的值。较大的序列号表示更新的版本。
  2. 多版本并发控制(MVCC):写操作获取新的序列号,创建 key 的新版本。读操作可以指定序列号,访问该序列号时间点的数据快照。这种机制使得读写操作可以并发执行,无需互相阻塞。
  3. 故障恢复:WAL(预写日志)中记录了操作的序列号。系统重启时,通过序列号可以准确重建崩溃时的数据状态,避免重复应用已持久化的操作。

这种设计让 LevelDB 既保证了数据一致性,又实现了高效的并发控制。

设置序列号的逻辑在 DBImpl::Write 方法中,首先获取当前最大序列号,然后为 WriteBatch 分配一个新的序列号。

Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {// ...uint64_t last_sequence = versions_->LastSequence();// ...if (status.ok() && updates != nullptr) {  // nullptr batch is for compactionsWriteBatch* write_batch = BuildBatchGroup(&last_writer);WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);last_sequence += WriteBatchInternal::Count(write_batch);// ...}
}

如果 WriteBatch 包含多个操作,那么这些操作会连续地分配序列号。在写入 WAL 日志时,会将 WriteBatch 的序列号写入到日志中,这样在恢复时可以根据序列号来恢复操作的顺序。写入 memtable 之后,会更新当前最大序列号,以便下次分配。

count 记录操作数

头部还有 4 个字节的 count,用于记录 WriteBatch 中包含的操作数。这里每次 put 或者 delete 操作都会增加 count 的值。如下示例:

WriteBatch batch;
batch.Put("key1", "value1");  // count = 1
batch.Put("key2", "value2");  // count = 2
batch.Delete("key3");         // count = 3
int num_ops = WriteBatchInternal::Count(&batch);  // = 3

在合并两个 WriteBatch 的时候,也会累计两部分的 count 的值,如下 WriteBatchInternal::Append 方法:

void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {SetCount(dst, Count(dst) + Count(src));assert(src->rep_.size() >= kHeader);dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
}

使用 count 的地方主要有两个,一个是在迭代这里每个记录的时候,会用 count 来做完整性检查,确保没有遗漏操作。

Status WriteBatch::Iterate(Handler* handler) const {Slice input(rep_);...if (found != WriteBatchInternal::Count(this)) {return Status::Corruption("WriteBatch has wrong count");} else {return Status::OK();}
}

另一个是在 db 写入的时候,根据 count 可以预先知道需要分配多少序列号,保证序列号连续性。如下 DBImpl::Write:

WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);

支持的各种操作

在头部的 sequence 和 count 之后,rep_ 紧跟着的是一系列的记录,每个记录包含一个操作类型和键值。这里记录可以通过 Put 和 Delete 方法来添加,其中 Put 方法的实现如下:

void WriteBatch::Put(const Slice& key, const Slice& value) {WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);rep_.push_back(static_cast<char>(kTypeValue));PutLengthPrefixedSlice(&rep_, key);PutLengthPrefixedSlice(&rep_, value);
}

这里更新了 count,然后添加了 kTypeValue 操作类型,接着是添加 key 和 value。Delete 操作类似,count 计数也是要加 1,然后操作类型是 kTypeDeletion,最后只用添加 key 即可。

void WriteBatch::Delete(const Slice& key) {WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);rep_.push_back(static_cast<char>(kTypeDeletion));PutLengthPrefixedSlice(&rep_, key);
}

上面是往 rep_ 中添加记录,那么如何从 rep_ 中解析出这些记录呢?这里 WriteBatch 类中提供了一个 Iterate 方法,该方法遍历 rep_ 中的每条记录,然后通过传入的 Handler 接口来灵活处理这些记录。

此外该方法的实现中还有数据格式验证,会检查头部大小、操作类型、操作数量是否匹配。可以返回 Corruption 错误,表示数据格式不正确等。Iterate 核心代码如下:

Status WriteBatch::Iterate(Handler* handler) const {Slice input(rep_);if (input.size() < kHeader) {return Status::Corruption("malformed WriteBatch (too small)");}input.remove_prefix(kHeader);Slice key, value;int found = 0;while (!input.empty()) {found++;char tag = input[0];input.remove_prefix(1);switch (tag) {case kTypeValue:if (GetLengthPrefixedSlice(&input, &key) &&GetLengthPrefixedSlice(&input, &value)) {handler->Put(key, value);} else {return Status::Corruption("bad WriteBatch Put");}break;case kTypeDeletion:if (GetLengthPrefixedSlice(&input, &key)) {handler->Delete(key);} else {return Status::Corruption("bad WriteBatch Delete");}break;default:return Status::Corruption("unknown WriteBatch tag");}}// ...
}

前面提过 Handler 是 WriteBatch 的抽象基类,可以传入不同的实现。在 LevelDB 写数据的时候,这里传入的是 MemTableInserter 类,该类将操作数据存储到 MemTable 中。具体可以调用这里的实现:

Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {MemTableInserter inserter;inserter.sequence_ = WriteBatchInternal::Sequence(b);inserter.mem_ = memtable;return b->Iterate(&inserter);
}

整体上看 WriteBatch 负责存储键值操作的数据,进行编码解码等,而 Handler 负责具体处理里面的每条数据。这样 WriteBatch 的操作就可以被灵活地应用到不同场景中,方便扩展。

测试用例分析

最后再来看看 write_batch_test.cc,这里提供了一些测试用例,用于测试 WriteBatch 的功能。

首先定义了一个 PrintContents 函数,用来输出 WriteBatch 中的所有操作记录。这里用 MemTableInserter 将 WriteBatch 中的操作记录存储到 MemTable 中,然后通过 MemTable 的迭代器遍历所有记录,并保存到字符串中。

这里测试用例覆盖了下面这些情况:

  1. Empty:测试空的 WriteBatch 是否正常;
  2. Multiple:测试多个 Put 和 Delete 操作,验证总的 count 数目和每个操作的序列号是否正确;
  3. Corruption:先写进去数据,然后故意截断部分记录,测试能读取尽量多的正常数据;
  4. Append:测试合并两个 WriteBatch,验证合并后序列号的正确性,以及合并空 WriteBatch;
  5. ApproximateSize:测试 ApproximateSize 方法,计算 WriteBatch 的近似大小;

这里通过测试用例,基本就能知道怎么使用 WriteBatch 了。比较有意思的是,前面在看 Append 代码的时候,没太留意到合并后这里序列号是用谁的。这里结合测试用例,才发现取的目标 WriteBatch 的序列号。

TEST(WriteBatchTest, Append) {WriteBatch b1, b2;WriteBatchInternal::SetSequence(&b1, 200);WriteBatchInternal::SetSequence(&b2, 300);b1.Append(b2);ASSERT_EQ("", PrintContents(&b1));b2.Put("a", "va");b1.Append(b2);ASSERT_EQ("Put(a, va)@200", PrintContents(&b1));// ...
}

总结

通过深入分析 LevelDB 的 WriteBatch 实现,我们可以清晰地看到其设计精妙之处。WriteBatch 通过将多个写入和删除操作封装在一起,不仅提高了写操作的效率,还简化了并发控制和故障恢复的实现。有几个亮点值得借鉴:

  1. 批量操作:WriteBatch 允许将多个 Put 和 Delete 操作合并为一个批次,减少了频繁的 I/O 操作,提升了写入性能。
  2. 序列号机制:通过全局递增的序列号,LevelDB 实现了多版本并发控制(MVCC),确保了读写操作的一致性。
  3. Handler 抽象:通过 Handler 接口,WriteBatch 将操作的具体实现与存储逻辑解耦,使得代码更加灵活和可扩展。
  4. 数据格式验证:在解析 WriteBatch 时,LevelDB 会进行严格的数据格式验证,确保数据的完整性和正确性。

当然本篇只是分析 WriteBatch 的实现,并没有串起 LevelDB 的整个写入流程,后续文章我们会继续分析,写入一个 key 的完整流程。


http://www.ppmy.cn/ops/151569.html

相关文章

使用 Python 开发一个 AI Agent 自媒体助手示例

1. 项目背景 随着自媒体行业的快速发展&#xff0c;内容创作者需要处理大量重复性任务&#xff0c;例如撰写文章、生成标题、优化关键词、分析数据等。通过开发一个 AI Agent 自媒体助手&#xff0c;可以帮助创作者高效完成这些任务&#xff0c;节省时间并提升内容质量。 本文…

远程接口调用

目录 GET请求 案例1&#xff1a; 案例2&#xff1a; 案例3&#xff1a;查询新闻列表 POST请求 PUT请求 DELETE请求 通用 传json参数 在服务端使用java语言&#xff0c;向远程接口发起请求&#xff0c;得到响应数据的方法。实现远程接口调用方法很多&#xff0c;这里…

周末总结(2024/01/18)

工作 人际关系核心实践&#xff1a; 要学会随时回应别人的善意&#xff0c;执行时间控制在5分钟以内 坚持每天早会打招呼 遇到接不住的话题时拉低自己&#xff0c;抬高别人(无阴阳气息) 朋友圈点赞控制在5min以内&#xff0c;职场社交不要放在5min以外 职场的人际关系在面对利…

SpringBoot中Get请求和POST请求接收参数详解

1、Get请求 1.1 方法形参接收参数 这种方式一般适用参数比较少的情况&#xff0c;并且前后端参数名称必须保持一致 RestController RequestMapping(“/user”) Slf4j public class DemoController { GetMapping("/query") public void getStudent(String name,Strin…

CentOS 9 Stream 上安装飞书客户端

要在 CentOS 9 Stream 上安装飞书客户端&#xff0c;你可以通过以下步骤来实现&#xff1a; 步骤 1: 下载飞书的 Linux 客户端 飞书客户端没有直接在官方仓库中提供 CentOS 版本的软件包&#xff0c;但你可以从飞书官方网站下载 Linux 客户端。 打开浏览器&#xff0c;访问飞…

信安大赛-应急响应

Ubuntu应急响应 1 提交攻击者的IP地址 2 识别攻击者使用的操作系统 3 找出攻击者资产收集所使用的平台 4 提交攻击者目录扫描所使用的工具名称 5 提交攻击者首次攻击成功的时间&#xff0c;格式&#xff1a;DD /MM/YY:HH:MM:SS 6 找到攻击者写入的恶意后门文件&…

今天你学C++了吗?——C++中的STL

♥♥♥~~~~~~欢迎光临知星小度博客空间~~~~~~♥♥♥ ♥♥♥零星地变得优秀~也能拼凑出星河~♥♥♥ ♥♥♥我们一起努力成为更好的自己~♥♥♥ ♥♥♥如果这一篇博客对你有帮助~别忘了点赞分享哦~♥♥♥ ♥♥♥如果有什么问题可以评论区留言或者私信我哦~♥♥♥ ✨✨✨✨✨✨ 个…

AI 音频工具合集

&#x1f423;个人主页 可惜已不在 &#x1f424;这篇在这个专栏AI_可惜已不在的博客-CSDN博客 &#x1f425;有用的话就留下一个三连吧&#x1f63c; 目录 前言: 正文: ​ 前言: 在科技蓬勃发展的时代&#xff0c;AI 音频工具宛如璀璨之星闪耀登场。它是声音的魔法师&a…