KVstore :键值映射存储服务器

devtools/2024/9/24 11:24:58/

概述:本文介绍kv存储服务,所谓kv即key-value映射,用户存储键值对,提供:1.根据键查找值 2.根据键修改值 3.根据键删除值

效果:kv存储是运行在服务器上的一个进程,客户端通过套接字与服务器上的kvstore进程进行通信,客户端发送由协议规定的请求例如 SET name01 wjq ,kvstore服务器接收到请求并解析,回复结果 SUCCESS; 又例如客户端发送 GET name01 ,接收到服务端的回复 wjq

实现思路:

1.首先我们需要做到kvstore与客户端通信,这里使用tcp,也就是说设计之初kvstore就是一个支持百万级并发连接的tcp服务器:这里使用一个reactor模型,直接附上代码,tcp服务器不在本文讲解范围内

#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>#include <stdio.h>
#include <string.h>
#include <unistd.h>#include <pthread.h>
#include <sys/poll.h>
#include <sys/epoll.h>
#include <sys/time.h>#include "kvstore.h"// listenfd
// EPOLLIN --> 
int accept_cb(int fd);
// clientfd
// 
int recv_cb(int fd);
int send_cb(int fd);// conn, fd, buffer, callbackint epfd = 0;
struct conn_item connlist[1048576] = {0}; // 1024  2G     2 * 512 * 1024 * 1024 
// list
struct timeval zvoice_king;
// 
// 1000000#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)int set_event(int fd, int event, int flag) {if (flag) { // 1 add, 0 modstruct epoll_event ev;ev.events = event ;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);} else {struct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);}}int accept_cb(int fd) {struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);if (clientfd < 0) {return -1;}set_event(clientfd, EPOLLIN, 1);connlist[clientfd].fd = clientfd;memset(connlist[clientfd].rbuffer, 0, BUFFER_LENGTH);connlist[clientfd].rlen = 0;memset(connlist[clientfd].wbuffer, 0, BUFFER_LENGTH);connlist[clientfd].wlen = 0;connlist[clientfd].recv_t.recv_callback = recv_cb;connlist[clientfd].send_callback = send_cb;if ((clientfd % 1000) == 999) {struct timeval tv_cur;gettimeofday(&tv_cur, NULL);int time_used = TIME_SUB_MS(tv_cur, zvoice_king);memcpy(&zvoice_king, &tv_cur, sizeof(struct timeval));printf("clientfd : %d, time_used: %d\n", clientfd, time_used);}return clientfd;
}int recv_cb(int fd) { // fd --> EPOLLINchar *buffer = connlist[fd].rbuffer;int idx = connlist[fd].rlen;int count = recv(fd, buffer, BUFFER_LENGTH, 0);if (count == 0) {printf("disconnect\n");epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);		close(fd);return -1;}connlist[fd].rlen = count;kvstore_request(&connlist[fd]); connlist[fd].wlen = strlen(connlist[fd].wbuffer);set_event(fd, EPOLLOUT, 0);   return count;
}int send_cb(int fd) {char *buffer = connlist[fd].wbuffer;int idx = connlist[fd].wlen;int count = send(fd, buffer, idx, 0);set_event(fd, EPOLLIN, 0);return count;
}int init_server(unsigned short port) {int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in serveraddr;memset(&serveraddr, 0, sizeof(struct sockaddr_in));serveraddr.sin_family = AF_INET;serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);serveraddr.sin_port = htons(port);if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) {perror("bind");return -1;}listen(sockfd, 10);return sockfd;
}int epoll_entry(void) {int port_count = 20;unsigned short port = 2048;int i = 0;epfd = epoll_create(1); // int sizefor (i = 0;i < port_count;i ++) {int sockfd = init_server(port + i);  // 2048, 2049, 2050, 2051 ... 2057connlist[sockfd].fd = sockfd;connlist[sockfd].recv_t.accept_callback = accept_cb;set_event(sockfd, EPOLLIN, 1);}gettimeofday(&zvoice_king, NULL);struct epoll_event events[1024] = {0};while (1) { // mainloop();int nready = epoll_wait(epfd, events, 1024, -1); // int i = 0;for (i = 0;i < nready;i ++) {int connfd = events[i].data.fd;if (events[i].events & EPOLLIN) { //int count = connlist[connfd].recv_t.recv_callback(connfd);//printf("recv count: %d <-- buffer: %s\n", count, connlist[connfd].rbuffer);} else if (events[i].events & EPOLLOUT) { // printf("send --> buffer: %s\n",  connlist[connfd].wbuffer);int count = connlist[connfd].send_callback(connfd);}}}//getchar();//close(clientfd);}
函数epoll_entry实现了与客户端之间的通信,并通过kvstore_request(&connlist[fd])这个函数实现了处理客户端请求,并将处理结果发送给客户端

2.kvstore存储引擎的实现

概要:由于服务器要将客户端请求存储的内容存储起来,有两种方式,一是存储到数据库,二是存储到服务端本地

为了简单实现业务,本文使用存储到本地进行讲解,采用的数据结构是哈希表

先介绍哈希表的实现以及为kvstore封装的接口:

/** 单线程版本,没有做线程安全!**/#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>#include "kvstore.h"#define MAX_KEY_LEN	128
#define MAX_VALUE_LEN	512#define MAX_TABLE_SIZE	102400#define ENABLE_POINTER_KEY	1 typedef struct hashnode_s { // hash node#if ENABLE_POINTER_KEYchar *key;char *value;
#elsechar key[MAX_KEY_LEN];char value[MAX_VALUE_LEN];
#endifstruct hashnode_s *next;} hashnode_t;typedef struct hashtable_s { // hash tablehashnode_t **nodes; // hashnode_t * 类型的 *nodes,也就是存放着hashnode_t类型的指针的数组nodesint max_slots;int count;} hashtable_t;hashtable_t Hash;static int _hash(char *key, int size) { // hash函数,使用key确定hash值if (!key) return -1;int sum = 0;int i = 0;while (key[i] != 0) { // 使用ASCII计算hash值,由于key是字符数组,该方法通用sum += key[i];i ++;}return sum % size; // 返回hash值
}hashnode_t *_create_node(char *key, char *value) {hashnode_t *node = (hashnode_t *)kvstore_malloc(sizeof(hashnode_t));if (!node) return NULL; // malloc filed#if ENABLE_POINTER_KEY// 为节点的成员分配空间node->key = kvstore_malloc(strlen(key) + 1);if (!node->key) {kvstore_free(node); // node分配成功但key失败return NULL;}strcpy(node->key, key);node->value = kvstore_malloc(strlen(value) + 1);if (!node->value) {kvstore_free(node->key); // node和key分配成功但value失败kvstore_free(node);return NULL;}strcpy(node->value, value);#elsestrncpy(node->key, key, MAX_KEY_LEN);strncpy(node->value, value, MAX_VALUE_LEN);#endif// 初始化 nextnode->next = NULL;return node;
}int init_hashtable(hashtable_t *hash) {if (!hash) return -1;hash->nodes = (hashnode_t**)kvstore_malloc(sizeof(hashnode_t *) * MAX_TABLE_SIZE);if (!hash->nodes) return -1;hash->max_slots = MAX_TABLE_SIZE;hash->count = 0;return 0;
}void dest_hashtable(hashtable_t *hash) { // 销毁哈希表if (!hash) return;// 遍历释放数组中所有链表int i = 0;for (i = 0; i < hash->max_slots; i++) {hashnode_t *node = hash->nodes[i];while (node != NULL) {hashnode_t *tmp = node; // 保存当前节点node = node->next; // 移动到下一个节点hash->nodes[i] = node; // 更新头指针,在这段代码中没有作用kvstore_free(tmp); // 释放当前节点 }}kvstore_free(hash->nodes); // 释放哈希表的数组成员
}int put_kv_hashtable(hashtable_t *hash, char *key, char *value) {if (!hash || !key || !value) return -1;int idx = _hash(key, MAX_TABLE_SIZE); // 哈希值作为数组下标hashnode_t *node = hash->nodes[idx]; // 获取正确数组位置的头指针
#if 1while (node != NULL) {    // 如果已经存在,直接退出,不重复插入if (strcmp(node->key, key) == 0) {return 1;}node = node->next;}
#endifhashnode_t *new_node = _create_node(key, value);// 头插法new_node->next = hash->nodes[idx];hash->nodes[idx] = new_node; // 更新头节点指针hash->count ++;return 0;
}char *get_kv_hashtable(hashtable_t *hash, char *key) { // searchif (!hash || !key) return NULL;int idx = _hash(key, MAX_TABLE_SIZE);hashnode_t *node = hash->nodes[idx]; // 确定数组索引while (node != NULL) { // 遍历查找if (strcmp(node->key, key) == 0) {return node->value;}node = node->next;}return NULL;
}int count_kv_hashtable(hashtable_t *hash) {return hash->count;
}int delete_kv_hashtable(hashtable_t *hash, char *key) { // 根据key删除节点if (!hash || !key) return -1;int idx = _hash(key, MAX_TABLE_SIZE); // 哈希值作为索引// 先判断头指针hashnode_t *head = hash->nodes[idx];if (head == NULL) return -1;// 遍历链表hashnode_t *cur = hash->nodes;hashnode_t *prev = NULL;while (cur != NULL) {if (strcmp(cur->key, key) == 0) break;prev = cur;cur = cur->next;}if (cur == NULL) return -1; // 没找到if (prev == NULL) { // 如果要删除的是头节点hash->nodes[idx] = cur->next; // 删除cur} else { // 不是头节点prev->next = cur->next; // 删除cur}// 释放cur节点的空间
#if ENABLE_POINTER_KEYif (cur->key) {kvstore_free(cur->key);}if (cur->value) {kvstore_free(cur->value);}kvstore_free(cur);
#elsefree(cur);
#endifhash->count --; // 更新countreturn 0;
}int exit_kv_hashtable(hashtable_t *hash, char *key) { // 判断是否存在该key的映射valuechar *value = get_kv_hashtable(hash, key);if (value) return 1;else return 0;
}int kvs_hash_modify(hashtable_t *hash, char *key, char *value) { // 先查找key再修改valueif (!hash || !key || !value) return -1;int idx = _hash(key, MAX_TABLE_SIZE);hashnode_t *node = hash->nodes[idx];while (node != NULL) {if (strcmp(node->key, key) == 0) {// 先释放原空间,避免内存泄漏kvstore_free(node->value); // 释放原value指向的空间node->value = NULL; // 避免使用悬空指针// 新分配空间node->value = kvstore_malloc(strlen(value) + 1); if (node->value) { // 分配成功strcpy(node->value, value);return 0;} else assert(0);}node = node->next;}return -1;
}int kvs_hash_count(hashtable_t *hash) {return hash->count;
}// 再封装一层接口:使用第三方库时,对库函数进行一层封装,适配自己的代码,
// 排查问题或更新迭代时只需要修改这一层接口的内容就行,不需要在源代码主体上修改,相当于做了一层隔离int kvstore_hash_craete(hashtable_t *hash) {return init_hashtable(hash);
}void kvstore_hash_destory(hashtable_t *hash) {return dest_hashtable(hash);}int kvs_hash_set(hashtable_t *hash, char *key, char *value) {return put_kv_hashtable(hash, key, value);}char *kvs_hash_get(hashtable_t *hash, char *key) {return get_kv_hashtable(hash, key);}int kvs_hash_delete(hashtable_t *hash, char *key) {return delete_kv_hashtable(hash, key);}

对于哈希表的设计与实现,注释说的很清楚了,最后封装的接口是用在接下来的kvstore主程序中的

3.kvstore主体

概要:这份代码集成了前面的tcp服务epoll_entry、存储组件哈希表以及最后要介绍的:对客户端请求进行解析处理的组件

先介绍kvstore主程序:

int init_kvengine(void) {kvstore_hash_create(&Hash);
}int exit_kvengine(void) {kvstore_hash_destory(&Hash);}int main() {init_kvengine(); // 创建存储引擎,这里是哈希表epoll_entry();  // 启动tcp服务器,处理并回复客户端请求exit_kvengine(); // 销毁哈希表}

而这里调用的init_kvengine();实际上就是前面的哈希表代码中的:

int init_hashtable(hashtable_t *hash) {if (!hash) return -1;hash->nodes = (hashnode_t**)kvstore_malloc(sizeof(hashnode_t *) * MAX_TABLE_SIZE);if (!hash->nodes) return -1;hash->max_slots = MAX_TABLE_SIZE;hash->count = 0;return 0;
}

4.请求解析

我们对于kvstore主程序中的存储引擎、tcp服务都介绍完了,接下来介绍最核心的请求解析函数:

这两个函数位于epoll_entry的kvstore_request(&connlist[fd])函数中:

int kvstore_request(struct conn_item *item) {char *msg = item->rbuffer;char *tokens[KVSTORE_MAX_TOKENS];int count = kvstore_split_token(msg, tokens); // 解析请求kvstore_parser_protocol(item, tokens, count); // 生成回复内容return 0;
}

这个函数做到了对用户请求的解析以及回复,而依赖的是以下两个函数:

解析请求:

int kvstore_split_token(char *msg, char **tokens) { // 将msg字符串进行分割,结果保存在tokens字符串数字里if (msg == NULL || tokens == NULL) return -1; // 参数检查int idx = 0;char *token = strtok(msg, " "); // 对msg按空格“ ”进行分割,返回第一个子字符串while (token != NULL) { // 获取剩余的子字符串tokens[idx++] = token; // 将子字符串保存在字符串数组里token = strtok(NULL, " "); // 固定写法,依次获取除第一个外,剩余的子字符串}return idx; // 返回子字符串的个数
}

我们能对用户请求按空格进行分割的原因是,kvstore规定了应用层协议,只有按协议规定发送的请求才能被正确处理,就像linux shell 中的命令的名称以及使用方法一样

处理并回复:

int kvstore_parser_protocol(struct conn_item *item, char **tokens, int count) {if (item == NULL || tokens[0] == NULL || count == 0) return -1; // 检查参数char *msg = item->wbuffer; // 获取写缓冲区memset(msg, 0, BUFFER_LENGTH);// 对用户的命令的解析结果, 例如 SET name wjq 解析结果如下:char *command = tokens[0];  // SETchar *key = tokens[1];      // namechar *value = tokens[2];    // wjqint cmd = KVS_CMD_START;for (cmd = KVS_CMD_START; cmd < KVS_CMD_SIZE; cmd++) { // 查找比对tokens里的命令if (strcmp(commands[cmd], command) == 0) {break; // 找到了或者不存在}}// 匹配命令并回复结果switch (cmd) {case KVS_CMD_HSET: {    // SET :添加int res = kvstore_hash_set(key, value); // 调用哈希表的函数if (!res) {snprintf(msg, BUFFER_LENGTH, "SUCCESS");} else {snprintf(msg, BUFFER_LENGTH, "FAILED");}break;}case KVS_CMD_HGET: {   // GET :查询char *val = kvstore_hash_get(key); // 调用哈希表提供的接口if (val) {snprintf(msg, BUFFER_LENGTH, "%s", val);} else {snprintf(msg, BUFFER_LENGTH, "NO EXIST");}break;}case KVS_CMD_HDEL: { // DEL : 删除int res = kvstore_hash_delete(key);if (res < 0) {  // serversnprintf(msg, BUFFER_LENGTH, "%s", "ERROR");} else if (res == 0) {snprintf(msg, BUFFER_LENGTH, "%s", "SUCCESS");} else {snprintf(msg, BUFFER_LENGTH, "NO EXIST");}break;}case KVS_CMD_HMOD: { // MOD : 修改int res = kvstore_hash_modify(key, value);if (res < 0) {  // serversnprintf(msg, BUFFER_LENGTH, "%s", "ERROR");} else if (res == 0) {snprintf(msg, BUFFER_LENGTH, "%s", "SUCCESS");} else {snprintf(msg, BUFFER_LENGTH, "NO EXIST");}break;}case KVS_CMD_HCOUNT: { // COUNT : 查询数量int count = kvstore_hash_count();if (count < 0) {  // serversnprintf(msg, BUFFER_LENGTH, "%s", "ERROR");} else {snprintf(msg, BUFFER_LENGTH, "%d", count);}break;}default: {printf("cmd: %s\n", commands[cmd]);assert(0);}}}

可以看到解析查询的过程就是将用户按我们指定协议输入的请求,分成几段,为每一条请求进行一次解析、处理

增删改查用到了哈希表这个数据结构提供的函数,而只有按空格将字符串分割这个函数是我们自行设计的,难度并不大

至此,kvstore的设计实现已经全部完成

推荐学习https://xxetb.xetslk.com/s/p5Ibb


http://www.ppmy.cn/devtools/32992.html

相关文章

批量抓取某电影网站的下载链接

思路&#xff1a; 进入电影天堂首页&#xff0c;提取到主页面中的每一个电影的背后的那个urL地址 a. 拿到“2024必看热片”那一块的HTML代码 b. 从刚才拿到的HTML代码中提取到href的值访问子页面&#xff0c;提取到电影的名称以及下载地址 a. 拿到子页面的页面源代码 b. 数据提…

【阿里云服务器】ubuntu 22.04.1安装docker以及部署java环境

我的服务器配置是2GB CPU 2GB 内存 Ubuntu22.04 目录 一、阿里云 ubuntu 22.04.1安装docker 二、docker基础命令 三、Windows电脑访问云服务器 四、安装java环境 安装OpenJDK 8&#xff08;可以根据需要安装其他版本的JDK&#xff09; 安装java的依赖管理工具maven 一、…

IP 寻址与地址解析

目录 前言 1.IP 分类地址 2.IP 子网与超网 3.组成 IP 超网 4.无类地址与 CIDR 5.配置管理 6.地址解析 总结 前言 在互联网协议 (IP) 的世界中&#xff0c;寻址和地址解析是关键概念。它们使设备能够在互联网上唯一地标识和相互通信。让我们深入了解 IP 寻址、子网、超网…

消息队列 RabbitMQ python实战

企业面临着信息爆炸、实时数据处理、高效通信等诸多挑战。如何确保系统稳定运行、信息快速传递、应用程序高效通信?答案在于消息队列!异步通信: 不必等待!系统解耦: 消息队列将应用程序的通信解耦,降低彼此之间的依赖,让开发、部署、扩展更加灵活。数据持久化: 确保重要…

怎么让电脑耳机和音响都有声音

电脑耳机音响不能同时用没声音怎么办 一般来说&#xff0c;重新开机后问题能够得到解决。右击“我的电脑”---“属性”---“硬件”---“设备管理器”&#xff0c;打开“声音、视频和游戏控制器”有无问题&#xff0c;即看前面有没有出现黄色的“”。 如果您的 电脑 耳机能正常…

【LeetCode刷题记录】98. 验证二叉搜索树

98 验证二叉搜索树 给你一个二叉树的根节点 root &#xff0c;判断其是否是一个有效的二叉搜索树。 有效 二叉搜索树定义如下&#xff1a; 节点的左子树只包含 小于 当前节点的数。 节点的右子树只包含 大于 当前节点的数。 所有左子树和右子树自身必须也是二叉搜索树。 示例…

JVM笔记2--垃圾收集算法

1、如何确认哪些对象“已死” 在上一篇文章中介绍到Java内存运行时的各个区域。其中程序计数器、虚拟机栈、本地方法栈3个区域随着线程而生&#xff0c;随线程而灭&#xff0c;栈中的栈帧随着方法的进入和退出而有条不紊的执行着入栈和出栈操作。每个栈帧中分配多少内存基本上…

【数据库】Elasticsearch的操作

在关系数据库和Elasticsearch之间&#xff0c;对基本概念和数据结构的理解对于使用两者进行有效的数据操作非常关键。下面是关系数据库和Elasticsearch之间的基本概念比较&#xff0c;包括实际的应用例子&#xff1a; 对比数据库的概念 数据库与索引 关系数据库 在关系数据…