go手写Redis(8)之数据库核心层及指令实现

news/2024/10/21 20:33:29/

数据库核心层

前面实现完了处理器的逻辑,现在到了核心的数据层实现了,核心的数据库主要是来执行用户发送的指令并且进行数据存储

1. Database

数据层的顶级接口定义,在 interface/database/database.go 文件中定义,
其中定义了 Database 接口以及 DataEntity 结构体用于包装数据,其中 interface 类型表示可以存放任何类型的数据

//Database 数据库的业务层
type Database interface {//Exec 核心层执行指令,返回一个响应体Exec(client resp.Connection, args [][]byte) resp.Reply//Close 关闭Close()//AfterClientClose 客户端关闭之后需要做的一些操作AfterClientClose(c resp.Connection)
}//DataEntity 数据结构,用于包装任何的数据类型
type DataEntity struct {//数据可以指代任何的数据Data interface{}
}

2. Dict

redis中存储字典的数据结构,我们会实现这个接口来进行数据的存储,在 datastruct/dict/dict.go 中定义接口

/**
Dict redis中存储字典的数据结构
*/
type Dict interface {//Get 返回value值,exist代表是否存在Get(key string) (val interface{}, exist bool)//Len 字典存在的数据Len() int//Put 存入数据,返回存入的个数Put(key string, value interface{}) (result int)//PutIfAbsent 如果不存在则设置PutIfAbsent(key string, value interface{}) (result int)//PutIfExists 如果存在则设置,如果不存在就不设置PutIfExists(key string, value interface{}) (result int)//Remove 移除Remove(key string) (result int)//ForEach 遍历方法ForEach(consumer Consumer)//Keys 返回所有的keyKeys() []string//RandomKeys 需要随机返回多少个keyRandomKeys(limit int) []string//RandomDistinctKeys 返回指定个数不重复的keyRandomDistinctKeys(limit int) []string//Clear 清空字典表Clear()
}//Consumer 自定义的遍历函数
type Consumer func(key string, val interface{}) bool

3. 实现类

3.1 SyncDict

Dict 接口进行实现,正常存储数据的地方,我们采用的 sync.Map 来进行数据的存储线程安全的

//SyncDict 使用并发安全的字典,为什么需要包装一层,因为底层的数据接口可能会进行更换
type SyncDict struct {//m 并发安全的mapm sync.Map
}//Get get获取方法
func (dict *SyncDict) Get(key string) (val interface{}, exist bool) {return dict.m.Load(key)
}//Len 默认使用 map 的range方法来进行遍历个数
func (dict *SyncDict) Len() int {length := 0dict.m.Range(func(key, value any) bool {length++return true})return length
}//Put 存入数据
func (dict *SyncDict) Put(key string, value interface{}) (result int) {_, existed := dict.m.Load(key)dict.m.Store(key, value)if existed {return 0}return 1
}//PutIfAbsent 如果不存在则存入数据
func (dict *SyncDict) PutIfAbsent(key string, value interface{}) (result int) {_, existed := dict.m.Load(key)if !existed {dict.m.Store(key, value)return 1}return 0
}//PutIfExists 如果存在则插入数据
func (dict *SyncDict) PutIfExists(key string, value interface{}) (result int) {_, existed := dict.m.Load(key)if existed {dict.m.Store(key, value)return 1}return 0
}//Remove 移除数据
func (dict *SyncDict) Remove(key string) (result int) {_, existed := dict.m.Load(key)dict.m.Delete(key)if existed {return 1}return 0
}//ForEach 遍历数据
func (dict *SyncDict) ForEach(consumer Consumer) {dict.m.Range(func(key, value any) bool {consumer(key.(string), value)return true})
}//Keys 获取所有的key
func (dict *SyncDict) Keys() []string {//创建一个切片,长度为当前字典的长度keys := make([]string, dict.Len())dict.m.Range(func(key, _ any) bool {keys = append(keys, key.(string))return true})return keys
}//RandomKeys 随机返回指定的key值
func (dict *SyncDict) RandomKeys(limit int) []string {keys := make([]string, limit)//遍历传进来的次数,每一次进入dict都只作用一个key上面,因为map每次遍历是无序的for i := 0; i < limit; i++ {dict.m.Range(func(key, _ any) bool {keys[i] = key.(string)//直接返回false,关闭当前遍历return false})}return keys
}//RandomDistinctKeys 随机取不重复的值
func (dict *SyncDict) RandomDistinctKeys(limit int) []string {keys := make([]string, limit)i := 0dict.m.Range(func(key, _ any) bool {keys[i] = key.(string)i++if i == limit {//直接返回false,关闭当前遍历return false}return true})return keys
}//clear 直接换一个新的即可
func (dict *SyncDict) Clear() {dict = MakeSyncDict()
}func MakeSyncDict() *SyncDict {return &SyncDict{}
}

3.2 DB

DB作为最底层支持的数据库存储层,DB对数据存储的 Dict 进行了一层包装对外提供了支持

字段说明

  • index:数据库的索引,每个数据库都会有一个对应的索引id用于进行切换标记
  • data:数据存储的地方,这里我们定义了一个 Dict 的结构用于底层数据的存储
  • addAof:定义的一个aof操作的匿名函数,后续来进行实现
//DB 每一个redis的分数据库
type DB struct {//数据库的索引index int//数据存储的实现data dict.Dict//赋值一个addAof的方法,可以让指令执行的时候写入数据addAof func(line CmdLine)
}//Close 关闭数据库进行清空
func (db *DB) Close() {db.data.Clear()
}//AfterClientClose 关闭数据库需要做的操作
func (db *DB) AfterClientClose(c resp.Connection) {}//ExecFunc 后续所有的指令执行都要实现当前函数
type ExecFunc func(db *DB, args [][]byte) resp.Reply//CmdLine 给二维字节组取一个别名
type CmdLine = [][]byte//makeDB 创建数据库
func makeDB() *DB {return &DB{data:   dict.MakeSyncDict(),addAof: func(line CmdLine) {}, //设置一个空的默认方法,防止初始化开始的时候出现问题}
}//Exec 在数据库上进行执行
func (db *DB) Exec(c resp.Connection, args [][]byte) resp.Reply {//获取到一个数据,存储的应该是指令 例如:setnx或者set;这里统一一下按照小写指令进行处理cmdName := strings.ToLower(string(args[0]))//获取到执行器cmd, ok := cmdTable[cmdName]if !ok {//如果没有实现这个命令,恢复一个错误return reply.MakeStandardErrReply("ERR unknown command " + cmdName)}//校验一下参数的个数是否合法,例如:set k,缺少了valueif !validateArity(cmd.arity, args) {return reply.MakeArgNumErrReply(cmdName)}//获取到执行器fun := cmd.executor//set k,v --> 这里传入的参数就是 k,vreturn fun(db, args[1:])
}//validateArity 验证参数,参数分为两种情况 :set K v --> arity = 3 ; 如果是 exists k1 k2 k3 ---> arity 的参数最少都需要2个(属于变长的指令),统一定义为 -2,负号执行标记变长的
func validateArity(arity int, cmdLine CmdLine) bool {argNum := len(cmdLine)//指令为定长的if arity >= 0 {return argNum == arity}//这里负arity可以将变长的参数个数转换为正数进行判断,在注册指令的时候就指定好变长的参数return argNum >= -arity
}//GetEntity 公共的方法获取数据,db将dict进行包装了一层
func (db *DB) GetEntity(key string) (*database.DataEntity, bool) {//获取到最原始的值raw, ok := db.data.Get(key)if !ok {return nil, false}//将raw原始的格式需要转换为 DataEntitydataEntity, _ := raw.(*database.DataEntity)return dataEntity, true
}//PutEntity 存入entity对象
func (db *DB) PutEntity(key string, entity *database.DataEntity) int {return db.data.Put(key, entity)
}//PutIfExists 存入entity对象,如果存在
func (db *DB) PutIfExists(key string, entity *database.DataEntity) int {return db.data.PutIfExists(key, entity)
}//PutIfAbsent 如果不存在进行存入
func (db *DB) PutIfAbsent(key string, entity *database.DataEntity) int {return db.data.PutIfAbsent(key, entity)
}//Remove 移除
func (db *DB) Remove(key string) int {return db.data.Remove(key)
}//Removes 批量移除
func (db *DB) Removes(keys ...string) int {result := 0for _, key := range keys {_, ok := db.data.Get(key)if ok {result += db.Remove(key)}}return result
}//Flush 清空
func (db *DB) Flush() {db.data.Clear()
}

3.3 Database

database/database.go 文件中定义的一个 Database 结构体主要是对 DB 进行了一层封装

字段说明

  • dbSet:创建的数据库,根据配置文件中的初始化数据库的数量进行创建
  • aofHandler:自定义的 AofHandler 接口,后续用于消息持久化
type Database struct {//存储多个数据库dbSet []*DB//aof处理器aofHandler *aof.AofHandler
}//NewDatabase 创建一个db数据库内核,初始化数据,根据配置文件中的数据库数量进行初始化
func NewDatabase() *Database {database := &Database{}if config.Properties.Databases == 0 {config.Properties.Databases = 16}//初始化16个数据库dbdatabase.dbSet = make([]*DB, config.Properties.Databases)for i := range database.dbSet {db := makeDB()db.index = idatabase.dbSet[i] = db}//判断是否开启消息落盘的功能if config.Properties.AppendOnly {aofHandler, err := aof.NewAofHandler(database)if err != nil {panic(err)}database.aofHandler = aofHandler//初始化db里面的匿名处理器的方法for _, db := range database.dbSet {tmpDb := dbdb.addAof = func(line CmdLine) {/**匿名函数会出现闭包的问题,如果使用 db.index 会一直是15索引*/database.aofHandler.AddAof(tmpDb.index, line)}}}return database
}//Exec 内核层进行执行
func (d *Database) Exec(client resp.Connection, args [][]byte) resp.Reply {defer func() {if err := recover(); err != nil {logger.Error(err)}}()//需要处理 select 1 命令来切换数据库,后面的db核心数据库不需要处理selectcmdName := strings.ToLower(string(args[0]))if cmdName == "select" {//select命令if len(args) != 2 {return reply.MakeArgNumErrReply("select")}return execSelect(client, d, args)}dbIndex := client.GetDBIndex()db := d.dbSet[dbIndex]if db == nil {return reply.MakeStandardErrReply("ERR DB is nil")}return db.Exec(client, args)
}//Close 关闭数据库
func (d *Database) Close() {for _, db := range d.dbSet {db.Close()}
}//AfterClientClose 关闭连接之后需要执行的回调
func (d *Database) AfterClientClose(c resp.Connection) {}//execSelect 根据用户发送的指令,来修改数据库的索引,例如:select 1
func execSelect(c resp.Connection, database *Database, args [][]byte) resp.Reply {//参数进行转换为数字dbIndex, err := strconv.Atoi(string(args[1]))if err != nil {return reply.MakeStandardErrReply("ERR invalid DB index")}if dbIndex >= len(database.dbSet) {return reply.MakeStandardErrReply("ERR DB index is out of range")}c.SelectDB(dbIndex)return reply.MakeOkReply()
}

以上就是数据库层的实现,主要是 Database --> DB --> Dict 这么一个层次的结构,Database获取到用户需要在哪个数据库进行指令的执行,然后交给对应索引的 DB 进行执行,DB然后去回去到对应的指令执行器进行执行,下面是指令的实现

4. 指令实现

4.1 command

这里定义一个 command 的命令结构体,主要用于对执行命令的函数进行包装

参数说明

  • executor :执行器函数,这个函数的定义是在 database/db.go 中进行定义就是上面定义 DB 结构体文件中的函数,后续我们只需要将对应的函数注册到一个 map 中跟命令进行关联上就可以方便的取到
//ExecFunc 后续所有的指令执行都要实现当前函数
type ExecFunc func(db *DB, args [][]byte) resp.Reply
  • arity:当前指令需要几个参数,例如:set key value,就是需要3个参数;这里的 arity 有正负两种情况,负数代表变长指令,例如:keys key1 key2,那么arity就是 -2 也就是说指令最少都得两个参数

方法说明

  • RegisterCommand:提供的一个公共方法可以将执行的函数注册到map中,跟命令相互关联上
//用于记录所有指令跟 command 的关系
var cmdTable = make(map[string]*command)//command 用于区分指令的类型以及执行方式
type command struct {//执行器executor ExecFunc//需要几个参数arity int
}//RegisterCommand 注册指令的执行方法,存储的时候就是 一个指令对应一个执行器
func RegisterCommand(name string, executor ExecFunc, arity int) {name = strings.ToLower(name)cmdTable[name] = &command{executor: executor,arity:    arity,}
}

4.2 keys指令集

database/keys.go 中我们定义了 keys相关指令集的实现,其中方法名称随意定义,主要是在下面的 init() 方法中进行注册,go在启动时,会自动执行每个文件中的 init() 方法,这样就可以将执行的函数跟命令相互关联上了。每个方法都需要 *DBargs [][]bytes 两个参数,上面说了 DB 是对 Dict 进行封装的,Dict 又是真正存储数据的地方,所以命令执行的方法只需要去调用 DB 中对应的方法即可

/**
实现以下keys指令集:DELEXISTSKEYSFLUSHDBTYPERENAMERENAMENX
*///execDel DEL k1 k2 k3,外面就已经切掉了DEL
func execDel(db *DB, args [][]byte) resp.Reply {keys := make([]string, len(args))for i, v := range keys {keys[i] = string(v)}deleted := db.Removes(keys...)if deleted > 0 {//前面指令是被切掉了,这里需要恢复回来db.addAof(utils.ToCmdLine2("del", args...))}return reply.MakeIntReply(int64(deleted))
}//execExists Exists存在几个key
func execExists(db *DB, args [][]byte) resp.Reply {keys := make([]string, len(args))result := int64(0)for _, v := range keys {_, exists := db.GetEntity(v)if exists {result++}}return reply.MakeIntReply(result)
}//execKeys keys k1 k2 k3
func execKeys(db *DB, args [][]byte) resp.Reply {//获取到第一个参数是否是通配符pattern := wildcard.CompilePattern(string(args[0]))//用于所有匹配的keyresult := make([][]byte, 0)db.data.ForEach(func(key string, val interface{}) bool {match := pattern.IsMatch(key)if match {result = append(result, []byte(key))}return true})return reply.MakeMultiBulkReply(result)
}//execType 查询key的类型,例如:type key1
func execType(db *DB, args [][]byte) resp.Reply {key := string(args[0])entity, ok := db.GetEntity(key)if !ok {return reply.MakeStatusReply("none") //tcp报文里面就是 :none\r\n}//类型断言switch entity.Data.(type) {case []byte:return reply.MakeStatusReply("string")//todo: 判断后续其它的类型}return reply.MakeUnknownErrReply()
}//execRename 改名,例如:rename old newKey
func execRename(db *DB, args [][]byte) resp.Reply {old := string(args[0])newKey := string(args[1])entity, exists := db.GetEntity(old)if !exists {return reply.MakeStandardErrReply("no such key")}//将新的key存入进行db.PutEntity(newKey, entity)//删除老的keydb.Remove(old)db.addAof(utils.ToCmdLine2("rename", args...))return reply.MakeOkReply()
}//execRenameNx renamenx:在改到新名称的时候,判断新的名称会不会把原来已经存在的key给干掉,例如:renamenx K1 K2 ,需要判断原来K2是否存在
func execRenameNx(db *DB, args [][]byte) resp.Reply {old := string(args[0])newKey := string(args[1])//判断新的key是否存在了,如果存在就什么都不做_, ok := db.GetEntity(newKey)if ok {//如果什么都没有操作就返回0return reply.MakeIntReply(0)}//继续判断原来的逻辑entity, exists := db.GetEntity(old)if !exists {return reply.MakeStandardErrReply("no such key")}//将新的key存入进行db.PutEntity(newKey, entity)//删除老的keydb.Remove(old)db.addAof(utils.ToCmdLine2("renamenx", args...))//如果执行了就返回一个1return reply.MakeIntReply(1)
}//execFlushDb
func execFlushDb(db *DB, args [][]byte) resp.Reply {db.Flush()db.addAof(utils.ToCmdLine2("flushdb", args...))return reply.MakeOkReply()
}//init 初始化注册命令
func init() {RegisterCommand("del", execDel, -2)RegisterCommand("exists", execExists, -2)RegisterCommand("flushdb", execFlushDb, -1) //-1的参数无论 flushdb后面跟随什么都直接忽略RegisterCommand("type", execType, 2)RegisterCommand("rename", execRename, 3)RegisterCommand("renamenx", execRenameNx, 3)RegisterCommand("keys", execKeys, -2)
}

4.3 string指令集

string 指令集在 database\string.go 中进行定义,跟上面的 keys集一样的实现方式

/**
实现string类型的指令集GETSETSETNXGETSETSTRLEN
*///init go语言在启动的时候就会执行init方法
func init() {RegisterCommand("get", execGet, 2)RegisterCommand("set", execSet, 3)RegisterCommand("setnx", execSetnx, 3)RegisterCommand("getset", execGetset, 3)RegisterCommand("strlen", execStrLen, 2)
}//execGet 获取数据
func execGet(db *DB, args [][]byte) resp.Reply {key := string(args[0])entity, ok := db.GetEntity(key)if !ok {return reply.MakeNullBulkReply()}//目前存入数据都是存入 []bytebytes, b := entity.Data.([]byte)//如果存的类型有其它的,所以需要判断是否转换成功if !b {//todo:转换其它的类型}return reply.MakeBulkReply(bytes)
}//execSet set key value
func execSet(db *DB, args [][]byte) resp.Reply {key := string(args[0])//按照字节数组的方式存储数据value := args[1]data := &database.DataEntity{Data: value, //数据存储依照字节数组的方式进行存储}result := db.PutEntity(key, data)db.addAof(utils.ToCmdLine2("set", args...))return reply.MakeIntReply(int64(result))
}//execSetnx setnx key value
func execSetnx(db *DB, args [][]byte) resp.Reply {key := string(args[0])//按照字节数组的方式存储数据value := args[1]data := &database.DataEntity{Data: value, //数据存储依照字节数组的方式进行存储}result := db.PutIfAbsent(key, data)db.addAof(utils.ToCmdLine2("setnx", args...))return reply.MakeIntReply(int64(result))
}//execGetset getset key value
func execGetset(db *DB, args [][]byte) resp.Reply {key := string(args[0])value := args[1]//先获取原来的值,再设置现在的值entity, exists := db.GetEntity(key)db.PutEntity(key, &database.DataEntity{Data: value})if !exists {return reply.MakeNullBulkReply()}db.addAof(utils.ToCmdLine2("getset", args...))return reply.MakeBulkReply(entity.Data.([]byte))
}//execStrLen strlen key 获取到的key的value长度
func execStrLen(db *DB, args [][]byte) resp.Reply {key := string(args[0])//先获取原来的值,再设置现在的值entity, exists := db.GetEntity(key)if !exists {return reply.MakeNullBulkReply()}return reply.MakeIntReply(int64(len(entity.Data.([]byte))))
}

4.4 ping命令

ping 命令实现比较简单

//Ping ping的命令
func Ping(db *DB, args [][]byte) resp.Reply {return reply.MakePongReply()
}//init 随便写在哪个包下面,go语言在启动的时候都会调用这个方法
func init() {RegisterCommand("ping", Ping, 1)
}

http://www.ppmy.cn/news/77069.html

相关文章

[C++/PTA] 立方体类的实现

[C/PTA] 立方体类的实现 题目要求解题思路代码总结 题目要求 立方体类Box的实现&#xff0c;完成计算体积、计算表面积、输出结果等功能。其中给定的主函数为&#xff1a; int main( ){float ab;cin>>ab;Box obj;obj.seta( ab );obj.getvolume( );obj.getarea( );obj…

齐聚手机赛道:小度朝左,蔚来向右

经过多年的发展&#xff0c;智能手机可以说已经发展到了人手一台的地步了&#xff0c;普及率之高可见一斑。然而&#xff0c;如今的智能手机却没有延续高增长态势&#xff0c;反而出现了销量下滑的情况。据Canalys公布的数据显示&#xff0c;2022年全球智能手机出货量不足12亿部…

ov2640子设备视频操作详细分析

ov2640子设备视频操作详细分析 文章目录 ov2640子设备视频操作详细分析ov2640_subdev_video_ops视频操作ov2640_s_stream开始流ov2640_g_fmt 获取格式ov2640_s_fmt设置格式ov2640_try_fmt尝试格式ov2640_cropcap裁剪能力ov2640_g_crop获取裁剪ov2640_enum_fmt枚举格式ov2640_g_…

充实你的Android开发工具箱:无效数据处理的方案

&#x1f604;&#x1f604;个人介绍 光子郎.进行开发工作七年以上&#xff0c;目前涉及全栈领域并进行开发。会经常跟小伙伴分享前沿技术知识&#xff0c;java后台、web前端、移动端&#xff08;Android&#xff0c;uniapp&#xff0c;小程序&#xff09;相关的知识以及经验体…

Mit6.006-lecture09-Breadth-First-Search

一、新单元&#xff1a;图 Quiz 1包含lecture01到lecture08&#xff0c;关注数据结构和排序 今天开始新单元&#xff0c;lecture09-lecture14&#xff0c;关注图算法 二、图应用 图无处不在 任何网络系统都存在有向连接图 比如&#xff1a;路网、计算机网络、社交网络 任…

实现取关和关注功能

将关注过的用户id存如数据库中 //关注或者取关 Override public Result follow(Long id, Boolean flag) { //1.获取当前登录用户的id UserDTO user UserHolder.getUser(); if(usernull){ return Result.fail("请先登录"); } Long userId user.getId(); //2.判断是关…

【Java EE 初阶】网络初识

目录 1.网络互连 1.局域网&#xff1a; 2.广域网WAN 2.网络通信基础 3.IP地址&#xff1a;端口号 4.协议 1.五元组 2.协议分层 1.为什么要用网络分层&#xff1f; 3.OSI七层模型 4.TCP/IP五层&#xff08;或四层&#xff09;模型 5.封装和分用 1.应用层 2.传输层A…

栈和队列 - C语言实现

目录 栈 栈的概念 栈的实现 队列 队列的概念 队列的实现 栈 栈的概念 栈是一种后进先出 (LIFO - last in first out) 的数据结构&#xff0c;通常利用数组或链表实现。栈只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作的一端称为栈顶&#xff0c;另…