2.3.1(项目)kv存储——框架梳理(待定)

ops/2025/1/20 3:00:01/

一、过一遍代码路线:

(1)底层网络框架;(2)业务层的协议设计(发什么数据,以及收什么数据);(3)业务层的引擎设计;

1. 底层网络框架

目的:通过定义函数建立网络层和业务层的联系,并从原有已以网络框架为主改造后以业务框架为main函数,确保此后网络层隔离和透明(以reactor.c为例)。

要求reactor.c(手写)、ntyco协程(看懂)、io_uring(看懂)。

做法:用做reactor的做法来做kvstore的网络框架

核心:是设置自定义的结构体,明确什么事件执行什么回调函数。while(1){}循环里,(a)EPOLLIN事件执行recv_callback,EPOLLOUT事件执行send_callback。(b)EPOLLIN事件包含sockfd和clientfd,分别执行accept函数和recv/send函数,可以通过recv_callback的index加以区分。

要点:

知识点(1):通过宏定义开启或关闭不同的功能,由此方便新功能编写。

#define ENABLE_KVSTORE        1
#if    ENABLE_KVSTOREkvs_request(&connect_list[fd]);
#endif#if    ENABLE_KVSTOREkvs_response(&connect_list[fd]);
#endif

知识点(2):(1) 协议处理 → 放到kvstore.c;(2)main入口函数 → 放到网络框架(reactor.c)里面?还是放到kvstore.c里面?

答:如果把main入口函数放到网络框架(reactor.c)就表明主要程序由网络框架实现,按照这个逻辑kvstore.c只是其中的一个业务,而流程不应该这样。所以main要放到kvstore.c里面,以保证网络层被完全的隔离。对于一个服务而言,我们要最终封装成:网络层仅需要传输端口(port)协议的接口(kvs_protocol)这两个端口过去,

知识点(3)网络框架(reactor.c)里如何与kvstore.c连接:

(a)定义kvs_request和kvs_response;

(b)send_cb和recv_cb分别设置kvs_request和kvs_response;

(c)send_cb里的kvs_request使用kvs_handler处理数据

kvs_handler通过“static msg_handler kvs_handler” 定义类型,在网络框架(reactor.c)主函数里被handler赋值 

handler作为网络框架(reactor.c)主函数里其中一个输入参数,被msg_handler结构体赋值   

结构体msg_handler在网络框架(reactor.c)被定义,包含*msg、length、*response,同时也在kvstore.h文件里。

作为在网络框架(reactor.c)主函数输入参数的handler,在kvstore.c里被定义为kvs_protocal。网络框架(reactor.c)的命令行 “static msg_handler kvs_handler” 为了 “kvs_handler = handler”。

网络层的编译命令

此处的代码:

server.h(基本没变化)

#ifndef __SERVER_H__
#define __SERVER_H__#define BUFFER_LENGTH		128#define ENABLE_KVSTORE              1typedef int  (*RCALLBACK)(int fd);struct conn {int       fd;char    rbuffer[BUFFER_LENGTH];int       rlength;char    wbuffer[BUFFER_LENGTH];int       wlength;RCALLBACK  send_callback;RCALLBACK  recv_callback;
};#if ENABLE_KVSTORE
int kvs_request(struct conn *c);
int kvs_response(struct conn *c);
#endif#endif

kvs_reactor.c

#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <errno.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/time.h>
#include "server.h"//#define BUFFER_LENGTH       1024
#define CONNECT_SIZE           1048576
#define PORT_SIZE                   1#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)#if ENABLE_KVSTOREtypedef int (*msg_handler)(char *msg, int length, char *response);
static msg_handler kvs_handler;int kvs_request(struct conn *c){//	printf("recv %d: %s\n", c->rlength ,c->rbuffer);c->wlength = kvs_handler(c->rbuffer, c->rlength, c->wbuffer);}#endifint kvs_response(struct conn *c){}//typedef int  (*RCALLBACK)(int fd);int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);int epfd = 0;struct timeval begin;/*
struct conn {int       fd;char    rbuffer[BUFFER_LENGTH];int       rlength;char    wbuffer[BUFFER_LENGTH];int       wlength;RCALLBACK  send_callback;RCALLBACK  recv_callback;
};
*/struct conn connect_list[CONNECT_SIZE] = {0};int set_event(int fd, int event, int flag){if (flag == 1){ // non-zero addstruct epoll_event ev;ev.data.fd = fd;ev.events = event;epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);} else {           // zero MODstruct epoll_event ev;ev.data.fd = fd;ev.events = event;epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);}}int event_register(int fd, int event){if (fd < 0){return -1;}connect_list[fd].fd = fd;connect_list[fd].recv_callback = recv_cb;connect_list[fd].send_callback = send_cb;memset(connect_list[fd].rbuffer, 0, BUFFER_LENGTH);connect_list[fd].rlength = 0;memset(connect_list[fd].wbuffer, 0, BUFFER_LENGTH);connect_list[fd].wlength = 0;set_event(fd, EPOLLIN, 1);}int accept_cb(int fd){struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);if (clientfd < 0){printf("accept error:  %d, %s\n", errno, strerror(errno));return -1;}event_register(clientfd, EPOLLIN);if((clientfd % 1000) == 0){struct timeval current;gettimeofday(&current, NULL);int time_used = TIME_SUB_MS(current, begin);memcpy(&begin, &current, sizeof(struct timeval));printf("Accept clientfd : %d, time_used: %d\n", clientfd, time_used);}return 0;
}int recv_cb(int fd){int count = recv(fd, connect_list[fd].rbuffer, BUFFER_LENGTH, 0);if (count ==0){printf("client disconnect: %d\n", fd);close(fd);epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);  // unfinishedreturn 0;}connect_list[fd].rlength = count;//	printf("rbuffer: %s, recv: %d\n", connect_list[fd].rbuffer, count);# if 0 //echoconnect_list[fd].wlength = connect_list[fd].rlength;memcpy(connect_list[fd].wbuffer, connect_list[fd].rbuffer, connect_list[fd].wlength);
#endif#if    ENABLE_KVSTOREkvs_request(&connect_list[fd]);
#endifset_event(fd, EPOLLOUT, 0);return count;
}int send_cb(int fd){#if    ENABLE_KVSTOREkvs_response(&connect_list[fd]);
#endifint count = 0;count = send(fd, connect_list[fd].wbuffer, connect_list[fd].wlength, 0);
//	printf("wbuffer: %s, send: %d\n", connect_list[fd].wbuffer, count);set_event(fd, EPOLLIN, 0);return count;}int r_init_server(unsigned short port){int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in servaddr;servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr= htonl(INADDR_ANY);  // uint32_tservaddr.sin_port = htons(port);if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {printf("bind failed: %s\n", strerror(errno));}listen(sockfd, 10);
//	printf("listen finished: %d\n", sockfd);return sockfd;
}int reactor_start(unsigned short port, msg_handler handler){epfd = epoll_create(1);//unsigned short port = 2000;kvs_handler =  handler;int i = 0;for (i = 0; i < PORT_SIZE; i ++){int sockfd = r_init_server(port + i);connect_list[sockfd].fd =  sockfd;connect_list[sockfd].recv_callback = accept_cb;printf("sockfd: %d\n", sockfd);set_event(sockfd, EPOLLIN, 1);}gettimeofday(&begin, NULL);while(1){struct epoll_event events[1024] = {0};int nready = epoll_wait(epfd, events, 1024, -1);
//		printf("epoll_wait is %d\n", nready);int i = 0;for (i = 0; i < nready; i ++){int connfd = events[i].data.fd;if (events[i].events & EPOLLIN){connect_list[connfd].recv_callback(connfd);}if (events[i].events & EPOLLOUT){connect_list[connfd].send_callback(connfd);}}}return 0;
}

kvstore.c

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include "kvstore.h"/*
* msg: request message
* length: length of request message
* response:  need to seed
*@return: length of response
*/int kvs_protocal(char *msg, int length, char *response){printf("recv %d: %s\n", length, msg);memcpy(response, msg, length);return strlen(response);
}int main(int argc, char *argv[]){ //  input parametersif (argc != 2) return -1;// ./program Hello World: Number of arguments is 3int port = atoi(argv[1]);#if   (NETWORK_SELECT  ==  NETWORK_REACTOR)reactor_start(port, kvs_protocal);
#elif#endif}

kvstore.h

#ifndef   __KV_STORE_H__
#define  __KV_STORE_H__#define NETWORK_REACTOR     0#define NETWORK_SELECT        NETWORK_REACTORtypedef int (*msg_handler)(char *msg, int length, char *response);extern int reactor_start(unsigned short port, msg_handler handler);const char *command[] = {"SET", "GET", "DEL", "MOD", "EXIST"
};const char *response[] = {};#endif

2. 业务层的协议设计:

目的:在前面已经建立tcp公共协议连接的基础上,建立发什么数据和接收什么数据的私有协议 / 自定义协议 / 发数据的格式。

具体做法:对于网络部分已经实现了数据的接收和发送,所以接下来我们要在收数据的地方修改,在原有reactor.c的基础上,

(1)新建kvstore.c定义有关业务联系的函数(接收request和输出response);

(2)新建kvstore.h为函数(接收request和输出response)定义函数具体的协议、具体response

具体5个response格式:

command(命令)
SET         Key     ValueGET         KeyDEL         KeyMOD         Key     ValueEXIST       Key

(3)在上一部分网络已经封装基础上,以kvstore.c里的自定义kvs_protocol函数(也就是对应网络框架(reactor.c) 的主函数第2个输入参数)为入口函数,进行具体的协议解析功能(也就是定义接收和发送的数据规则含义,如下2种模式)

SET Key Value
Get Key

要点:

知识点(1):针对tcp的分包和粘包,问题1是什么是分包和粘包,问题2是怎么解决这个问题?

答1:由于send(fd, buffer, length, 0)的次数和recv(fd, buffer, length, 0)可能不一致,可能send()多次发送recv一次性都接收了(这就是粘包),所以要区分出每次的数据(分包解数据)。

答2:分包的时候需要和对方约定好格式,同时tcp连接满足假设(1) recv()先接收的数据就是对方先发送的数据,保证了接收数据的顺序;(2)  数据在传输过程中不会发生丢包情况,保证了数据必达性

方式1. send()在发包时就确定约定的长度,比如接收2次就可以接收完整的数据包,第一次接收2字节作为标志,第二次再接收数据包。

SEND:
[15]SET Key Value\r\n   // 15表示15个字符
第一次:接收开头的2个字符,同时通过这2个字符的值得知后面还要接收15字符
short length = 0;
recv(fd, &length, 2, 0);
第二次:接收剩余数据15个字符
recv(fd, buffer, length, 0);

方式2. 仿照Redis,先确定有多少(N)个token后就先发送字符N,具体发送格式如下:

Refis → SET Key Value
发送顺序如下
3\r\n     // 3表示“SET Key Value”一共是3个token(单词)
3\r\n     // 3表示第一个token(单词)“SET"是3个字符的长度
SET\r\n   // 发送一个token,“SET"
3\r\n     // 3表示第二个token(单词)“Key"也是3个字符的长度
Key\r\n   // 发送一个token,“Key"
5\r\n     // 5表示第三个token(单词)“Value"也是5个字符的长度
Value\r\n   // 发送一个token,“Value"

伪代码如下,最终要读(1+2*3)=7次

自行实现readline()函数,要求每一行读到"\r\n"就停止count = atoi(buffer);
for (i = 0; i < count; i ++){readline(fd, tokenlen);readline(fd, token);
}

3. 业务层的引擎设计

数据结构用红黑树、希哈、数组、链表(手写)

二、备注:

(1) “const char *command[] ={};” 什么意思?以及为什么是const以及为什么*?

答:const: 确保字符串内容不能被修改。*: 表示指针,每个数组元素都指向一个字符串。

(2) 定义函数不用分号";",而结构体需要,那么什么时候需要加“;”?

答:定义一个类型(例如 struct 或 typedef)告诉编译器这个类型存在但并不提供执行行为,需要以分号结束。

(a)需要分号的地方:变量声明、结构体定义、typedef 定义新类型

int a = 10; // 表示变量声明结束struct Point {int x;char y;
}; // 分号表示 `struct Point` 的定义结束typedef struct {int x;char y;
} PointType; // 分号表示 `typedef` 的结束

(b)不需要分号的地方:函数定义、#define 宏(宏是简单的文本替换)。

int add(int a, int b) {return a + b; // 函数体内语句的分号只是代码逻辑的一部分
}#define SQUARE(x) ((x) * (x)) // 宏定义

(3)为什么分为 .c 和 .h 文件?

答:每个 .c 文件负责一个模块的实现,多个模块通过 .h 文件互相调用。.h 文件只能声明接口,不能包含具体实现。自定义头文件使用双引号:#include "xxxx.h"。

(4)"int main(int argc, char *argv[]){}" 是一个标准的 C 语言程序入口函数。

答:argc表示参数个数 (Argument Count),argv表示参数向量 (Argument Vector)。用于获取命令行传递给程序的参数。

int main(int argc, char *argv[]) {printf("Number of arguments: %d\n", argc);for (int i = 0; i < argc; i++) {printf("Argument %d: %s\n", i, argv[i]);}return 0;
}
运行:
./program Hello World
输出:
Number of arguments: 3
Argument 0: ./program
Argument 1: Hello
Argument 2: World


http://www.ppmy.cn/ops/151544.html

相关文章

游戏引擎学习第81天

仓库:https://gitee.com/mrxiao_com/2d_game_2 或许我们应该尝试在地面上添加一些绘图 在这段时间的工作中&#xff0c;讨论了如何改进地面渲染的问题。虽然之前并没有专注于渲染部分&#xff0c;因为当时主要的工作重心不在这里&#xff0c;但在实现过程中&#xff0c;发现地…

Golang笔记——包的循环引用问题(import cycle not allowed)和匿名导入

大家好&#xff0c;这里是Good Note&#xff0c;关注 公主号&#xff1a;Goodnote&#xff0c;专栏文章私信限时Free。本文详细介绍Golang中包的循环引用问题(import cycle not allowed)和匿名导入问题。 文章目录 循环引用问题优势设计原因解决方法 明确导入的包必须使用匿名导…

“大数据+技校”:VR虚拟仿真实训室的发展前景

在技术教育的新时代&#xff0c;大数据与虚拟现实技术的融合正在重塑技校的教学模式。"大数据技校"模式下的VR虚拟仿真实训室&#xff0c;为技校学生提供了一个创新的学习平台&#xff0c;预示着教育方式的深刻变革。 一、大数据与技校教育的深度融合 大数据技术的应…

Unity中实现倒计时结束后干一些事情

问题描述&#xff1a;如果我们想实现在一个倒计时结束后可以执行某个方法&#xff0c;比如挑战成功或者挑战失败&#xff0c;或者其他什么的比如生成boss之类的功能&#xff0c;而且你又不想每次都把代码复制一遍&#xff0c;那么就可以用下面这种方法。 结构 实现步骤 创建一…

第十一章 图论

#include <iostream> #include <cstdio> #include <vector>using namespace std;const int MAXN 1000;vector<int> graph[MAXN]; //用向量存储邻接表中的每个点及其连接的的其他点int main(){return 0; } #include <iostream> #include &…

【ROS2 中间件RMW】基于FastDDS共享内存实现ROS2跨进程零拷贝通讯

前言 谈及ROS2的通讯机制&#xff0c;话题通讯作为一个最为常用的通讯手段&#xff0c;相信大家都不为陌生。但是即便话题通讯提供了一种跨进程的通讯方式&#xff0c;我们难免无法防止其在发布和订阅 的时候传递的消息被进行内存中的一次拷贝。因此诞生了零拷贝(zero_copy)这…

内网渗透测试工具及渗透测试安全审计方法总结

1. 内网安全检查/渗透介绍 1.1 攻击思路 有2种思路&#xff1a; 攻击外网服务器&#xff0c;获取外网服务器的权限&#xff0c;接着利用入侵成功的外网服务器作为跳板&#xff0c;攻击内网其他服务器&#xff0c;最后获得敏感数据&#xff0c;并将数据传递到攻击者&#xff0…

基于微信小程序的校园运动场地预约系统设计与实现

一.前言 选题背景&#xff1a; 随着社会的进步和人们生活水平的提高&#xff0c;健康意识逐渐增强&#xff0c;越来越多的人开始关注和参与体育运动。在校园中&#xff0c;学生们也积极参与各种体育活动&#xff0c;以提升身体素质和促进全面发展。然而&#xff0c;由于校园运动…