英文源地址
紧接着上一篇进行b树的实现.
b树中删除操作
删除叶子节点
从叶子节点中删除一个key的代码很像其他nodeReplace*函数
func leafDelete(new BNode, old BNode, idx uint16) {new.setHeader(BNODE_LEAF, old.nkeys() - 1)nodeAppendRange(new, old, 0, 0, idx)nodeAppendRange(new, old, idx, idx+1, old.nkeys()-(idx+1))
}
递归删除
结构很像插入操作
func treeDelete(tree *BTree, node BNode, key []byte) BNode {idx := nodeLookupLE(node, key)switch node.btype() {case BNODE_LEAF:if !bytes.Equal(key, node.getKey(idx)) {return BNode{}}new := BNode{make([]byte, BTREE_PAGE_SIZE)}leafDelete(new, node, idx)return newcase BNODE_NODE:return nodeDelete(tree, node, idx, key)default:panic("bad node!")}
}
处理内部节点
不同之处在于我们需要合并节点而不是分裂节点. 一个节点可以合并到它的左或右兄弟节点中.nodeReplace*函数用于更新链接.
func nodeDelete(tree *BTree, node BNode, idx uint16, key []byte) BNode {kptr := node.getPtr(idx)updated := treeDelete(tree, tree.get(kptr), key)if len(updated.data) == 0 {return BNode{}}tree.del(kptr)new := BNode{make([]byte, BTREE_PAGE_SIZE)}mergeDir, sibling := shouldMerge(tree, node, idx, updated)switch {case mergeDir < 0:merged := BNode{data: make([]byte, BTREE_PAGE_SIZE)}nodeMerge(merged, sibling, updated)tree.del(node.getPtr(idx - 1))nodeReplace2KidN(new, node, idx-1, tree.new(merged), merged.getKey(0))case mergeDir > 0:merged := BNode{data: make([]byte, BTREE_PAGE_SIZE)}nodeMerge(merged, updated, sibling)tree.del(node.getPtr(idx + 1))nodeReplace2KidN(new, node, tree.new(merged), merged.getKey(0))case mergeDir == 0:if updated.nkeys() <= 0 {panic("wrong nkeys")}nodeReplaceKidN(tree, new, node, idx, updated)}return new
}func nodeMerge(new BNode, left BNode, right BNode) {new.setHeader(left.btype(), left.nkeys()+right.nkeys())nodeAppendRange(new, left, 0, 0, left.nkeys())nodeAppendRange(new, right, left.nkeys(), 0, right.nkeys())
}
合并条件
合并的条件是:
- 节点大小小于页大小的1/4(这是随意的)
- 有一个兄弟, 并且合并的结果不超过一个页大小
func shouldMerge(tree *BTree, node BNode, idx uint16, updated BNode) (int, BNode) {if updated.nbytes() > BTREE_PAGE_SIZE/4 {return 0, BNode{}}if idx > 0 {sibling := tree.get(node.getPtr(idx - 1))merged := sibling.nbytes() + updated.nbytes() - HEADERif merged <= BTREE_PAGE_SIZE {return -1, sibling}}if idx+1 < node.nkeys() {sibling := tree.get(node.getPtr(idx + 1))merged := sibling.nbytes() + updated.nbytes() - HEADERif merged <= BTREE_PAGE_SIZE {return +1, sibling}}return 0, BNode{}
}
删除操作就完成了
根节点
我们需要在树生长和收缩时跟踪根节点, 让我们从删除操作开始.
这是b树删除的最终接口. 在以下情况下, 树的高度将减少1.
1.根节点不是叶子节点
2.根节点只有一个子节点.
func (tree *BTree) Delete(key []byte) bool {if len(key) == 0 || len(key) >= BTREE_MAX_KEY_SIZE {panic("wrong key")}if tree.root == 0 {return false}updated := treeDelete(tree, tree.get(tree.root), key)if len(updated.data) == 0 {return false}tree.del(tree.root)if updated.btype() == BNODE_NODE && updated.nkeys() == 1 {tree.root = updated.getPtr(0)} else {tree.root = tree.new(updated)}return true
}
下面是最终插入的接口
func (tree *BTree) Insert(key []byte, val []byte) {if len(key) == 0 || len(key) > BTREE_MAX_KEY_SIZE || len(val) > BTREE_MAX_VAL_SIZE {panic("wrong info")}if tree.root == 0 {root := BNode{data: make([]byte, BTREE_PAGE_SIZE)}root.setHeader(BNODE_LEAF, 2)nodeAppendKV(root, 0, 0, nil, nil)nodeAppendKV(root, 1, 0, key, val)tree.root = tree.new(root)return}node := tree.get(tree.root)tree.del(tree.root)node = treeInsert(tree, node, key, val)nsplit, splitted := nodeSplit3(node)if nsplit > 1 {root := BNode{data: make([]byte, BTREE_PAGE_SIZE)}root.setHeader(BNODE_NODE, nsplit)for i, knode := range splitted[:nsplit] {ptr, key := tree.new(knode), knode.getKey(0)nodeAppendKV(root, uint16(i), ptr, key, nil)}tree.root = tree.new(root)} else {tree.root = tree.new(splitted[0])}
}
这做了2件事
1.当将旧根节点拆分为多个节点时, 将创建一个新的根节点.
2.插入第一个键时, 创建第一个叶节点为根节点.
这里有个小技巧, 在创建第一个节点时, 向树中插入一个空键. 空键的排序顺序是最低的, 它使查找函数nodeLookupLE总是成功, 从而消除找不到包含输入键的节点的情况.
测试B树
由于我们的数据结构是纯数据结构(没有IO), 所以页分配代码通过3个回调函数进行隔离. 下面测试b树的容器代码, 它将页保存在内存中的哈希表中, 而不是持久化到磁盘上. 下一章我们将在不修改b树代码的情况下实现持久化.
type C struct {tree BTreeref map[string]stringpages map[uint64]BNode
}func newC() *C {pages := map[uint64]BNode{}return &C{tree: BTree{get: func(ptr uint64) BNode {node, ok := pages[ptr]if !ok {panic("error")}return node},new: func(node BNode) uint64 {if node.nbytes() > BTREE_PAGE_SIZE {panic("wrong size")}key := uint64(uintptr(unsafe.Pointer(&node.data[0])))if pages[key].data != nil {panic("error")}pages[key] = nodereturn key},del: func(ptr uint64) {_, ok := pages[ptr]if !ok {panic("error")}delete(pages, ptr)},},ref: map[string]string{},pages: pages,}
}
我们使用引用映射来记录每次B树的更新, 以便稍后验证b树的正确性.
func (c *C) add(key string, val string) {c.tree.Insert([]byte(key), []byte(val))c.ref[key] = val
}func (c *C) del(key string) bool {delete(c.ref, key)return c.tree.Delete([]byte(key))
}
测试用例由读者自行完成.
结束语
这个b树的实现非常简单, 但是对于学习来说非常简单. 现实世界的实现可能要复杂的多, 并且包含实际的优化.
我们的b树实现有一些简单的改进:
- 对叶子节点和内部节点使用不同的格式. 叶子节点不需要指针, 内部节点不需要值. 这会节省一些空间.
- 键或值其中的一个长度是冗余的, kv键值对的长度可以从下一个key的偏移量推断出来.
- 不需要节点的第一个键, 因为它是从父节点继承来的.
- 添加校验和以检测数据损坏.
构建kv存储的下一步是将b树持久化到磁盘上, 这是下一章节的主题.