MIT6824——lab4(实现一个分片kv存储)的一些实现,问题,和思考

news/2024/11/30 0:42:18/

Part A 分片控制器

1. 整体思路

  • 和lab3A一样,shardctler也是一个服务,由客户端调用。这个服务建立在raft集群上,保证容错。

  • shardctler也应该保证线性一致性和重复请求的问题,因此也需要记录clientid和messageid。

  • shardctler保存了当前的分片信息,称为配置

    • Num:当前配置号
    • Shards:每一个分片对应的副本组id—gid
    • Groups:每个组(gid)对应哪些服务器
    type Config struct {Num    int              // config numberShards [NShards]int     // shard -> gid [gid1, gid2, gid3,...]Groups map[int][]string // gid -> servers[] {gid1:[x1,y1,z1], gid2:[x2,y2,z2], ...}
    }
    
  • shardcltr包含4个操作,Join、Leave、Move、Query

    • Join:添加组 { gid : servers, …},新配置应尽可能将分片均匀地分配到整组组中,并应移动尽可能少的分片以实现该目标
    • Leave:移除组,并将该组上的分片均匀分配给其他组
    • Move:转移分片**,**一些组可能比其他组负载更多,因此需要移动分片来平衡负载
    • Query:查询当前的Config
  • 一些需要注意的地方

    • Join和Leave都涉及到对Groups 的修改,因此需要重新调整Shards(这里涉及到负载均衡),因为有可能gid已经没有对应的服务器了,因此分片也不应该由该gid管理
    • 除了Qeury,其他三种操作都涉及到修改config,因此需要添加新的config

2 流程

  • 客户端对shardctler发出请求
  • shardctler通知raft Start()
  • raft执行AppendEntries,通知其他raft节点日志复制
  • raft收到过半确认后,通知shardctler执行请求 applyCh管道
  • shardctler执行 收到信号(实际上shardctler会在applyRoutine这个协程上一直等applyCh),根据clientid,messageid判断是否执行过该条指令,没有执行过就执行
  • shardctler执行完后,通知 可以 返回结果给客户端 broadcastCh管道

3 一些细节

  • 通常来说,分片比组多,相当于每个组都会负责多个分片。
  • Join操作
    • 向当前Config中新增一些组
    • 新增后,需要调整shard,保证负载均衡
  • Leave操作
    • 移除一些组
    • 移除后,需要调整shard,保证负载均衡
  • 如何实现负载均衡?
    • 每当Join、Leave后,Groups都会发生变化,因此需要重新调整Shard。由于分片比组多,因此每个组都会负责多个分片。为了让服务器实现均衡,通过shard%len(gids)找到,shard对应的gid,这样,每个组负责的shard就是尽可能平衡的

      sort.Ints(gids)                    // 假设:gids = [1,2,3,4]
      for shard := range config.Shards { // 0-9config.Shards[shard] = gids[shard%len(gids)] // 负载均衡算法,shard_0映射到gids[0], shard_1映射到gids[1], ..., shard_5映射到gids[1], ...,
      }
      

4 问题

  • 如何实现负载均衡?
    • 方案一:取模映射 O(nlogn),但是移动的并不是最少

      • 通过shard%len(gids)找到,shard对应的gid,这样,每个组负责的shard就是尽可能平衡的

        sort.Ints(gids)                   // 排序后,才能保证移动最少
        for shard := range config.Shards { // 0-9config.Shards[shard] = gids[shard%len(gids)] // 负载均衡算法,shard_0映射到gids[0], shard_1映射到gids[1], ..., shard_5映射到gids[1], ...,
        }
        

        请添加图片描述

    • 方案二:直接移动 O(n^2),移动的次数最少

      • 需要一个map,记录当前的gid有多少个shard

        gid_num_shard_map{1:52:5
        }minNum = 5;
        maxNum = 5;
        
      • 循环让拥有最多shard的Group分一个给拥有最少的Group,直到maxNum - minNm=1
        请添加图片描述

Part B 多集群KV存储—Multi-Raft

请添加图片描述

1 整体思路

  1. 更新配置

    • 什么时候会更新配置??

      • 当此时分片没有分配对应组时(shard[i]=0),kv需要询问分配控制器,最新的配置
    • 代码:作为一个协程单独跑,定时检查是否要更新

      func (kv *ShardKV) configurationRoutine() {for !kv.killed() {kv.mu.Lock()defaultShard := true // 默认的切片,就是初始化的时候,此时切片还没分配组for _, shard := range kv.state {if shard.Status != Default {defaultShard = falsebreak}}num := kv.config.Numkv.mu.Unlock()_, isLeader := kv.rf.GetState()if isLeader && defaultShard {// 询问分配控制器,当前配置的下一个配置nextConfig := kv.clerk.Query(num + 1)nextNum := nextConfig.Num// 如果获取了新配置,需要通知kv,改变当前的配置,因为这是一个命令,需要raft进行日志复制,然后再执行if num+1 == nextNum {command := Configuration{Config: nextConfig,}index, _, isLeader := kv.rf.Start(command)if isLeader {// 等到kv执行了更新config请求,或者超时才会返回kv.consensus(index)}}}time.Sleep(100 * time.Millisecond)}
      }
      
    • 得到新配置后,需要处理一下两种情况

      • 设置Pull状态:新配置shard属于kv && 旧配置shard分片不属于kv,也就是说,此后,kv应该负责当前分片,但是数据库中(kv.state)中没有数据,此时需要拉取过来.
      • 设置WaitDelete状态:新配置shard不属于kv,旧配置shard属于kv,也就是说,此后kv不应该负责当前分片,但数据库中(kv.state)有数据,此时需要删除。但也不是马上删除,而是在删除完对方的数据后 **DeleteShardRPC()**返回,再删除自己的
  2. 拉取数据

    • 什么时候才会拉取成为拉取状态?

      • 新配置shard属于kv && 旧配置shard分片不属于kv,也就是说,此后,kv应该负责当前分片,但是数据库中(kv.state)中没有数据,此时需要拉取过来
    • 更新配置后,并不表示所属分片可以立刻对外提供服务,还需要等待在上一个版本的Config中不属于自身的shard从它之前所属的Group中迁移到本Group

    • 拉取数据的方法应该怎么进行,同步?异步?

      • 不能同步,因为如果在更新日志的时候同步阻塞整个协程,会影响其他的对外请求
      • 不能异步,leader 可能会在 新配置之后新数据被异步拉取到并提交日志之前宕机,而 follower 虽然会apply 配置但是不会去拉数据,这样这些数据将永远无法被更新。
      • 在更新配置中,我们只根据情况改变状态
      for shard, gid := range nextConfig.Shards {targetGid := kv.config.Shards[shard] // 当前配置shard分片的gid// 新配置shard属于kv && 旧配置shard分片不属于kv,也就是说,此后,kv应该负责当前分片,但是数据库中(kv.state)中没有数据,此时需要拉取过来if gid == kv.gid && targetGid != kv.gid && targetGid != 0 {kv.state[shard].Status = Pull}// 新配置shard不属于kv,旧配置shard属于kv,也就是说,此后kv不应该负责当前分片,但数据库中(kv.state)有数据,此时需要删除if gid != kv.gid && targetGid == kv.gid && gid != 0 {kv.state[shard].Status = Push}
      }
      
    • 在拉取数据,更新配置时,如何保证线性一致性,也就是如何让用户不会发现这个过程?

      • 在拉取数据时,将kv的此时已经接受的客户端请求信息也给对方

        // 将给当前kv发送过请求的client信息给对方,因为后续客户端可以再向对方请求数据,这样在更新配置时,也能保证线性一致性
        client := make(map[int64]int)
        for clientId, messageId := range kv.client {client[clientId] = messageId
        }
        
  3. 数据删除

    • 什么时候要删除数据?
      • 当拉取数据成功后,需要将通知数据来源方删除自己的数据,标记为Collection状态

        // 把数据给 对应的Pull状态的分片
        for shard := range state {if kv.state[shard].Status == Pull {for k, v := range state[shard].State {kv.state[shard].Put(k, v)}// 拉取数据后,成为Collection状态kv.state[shard].Status = Collection}
        }
        
      • 当对方删除完毕后,**DeleteShardRPC()**返回,此时还要删除自己WaitDelete状态的数据

2 流程

  • 初始下,共有10个分片,3个shardkv集群组,1个分片控制器集群
  • 客户端通过key映射得到分片,然后更具配置,找到对应的集群组
  • 集群组初始化下还没有配置,发出更新配置请求 configurationRoutine()
  • 更新配置后,如果新配置shard属于kv && 旧配置shard分片不属于kv,需要拉取数据 pullShardRoutine()
  • 拉取数据后,还需要让数据来源方删除shard对应的数据 deleteShardRoutine()
  • KV数据库执行请求,返回数据给客户端

4 问题

  • 为什么要使用多集群?

    • 之前,只有一个集群时,所有请求都由leader负责,当数据量大后,请求增多,leader面临的压力非常大,请求响应的时间也会延长,这种情况下,简单增加机器并不会由性能提升。因此可以将数据分开存储到不同集群,将不同请求引流到不同集群,降低单一集群的压力。
  • 如何保证一个集群组丢失了一个分片,需要立即停止向该分片中的键提供请求?

    • 每次kv在响应请求前,都会通过staleShard(shard int),判断当前shard是不是自己负责

      /*准备响应Get请求*/
      kv.mu.Lock()
      if kv.staleShard(shard) {response.Err = ErrWrongGroupkv.mu.Unlock()return
      }
      if method == "Get" {response.Value = kv.state[shard].Get(key)
      }
      kv.mu.Unlock()/*准备响应Put、Append请求*/
      if !kv.staleShard(shard) && kv.client[clientId] < messageId {switch method {case "Put":kv.state[shard].Put(key, value)case "Append":kv.state[shard].Append(key, value)}kv.client[clientId] = messageId
      }
      
  • 如何保证在分片移动后的线性一致性?

    • kv在拉取数据时,对端也会把client信息返回,这样当前kv就有了当前客户的信息,如果该客户端发送了一个已经执行过的请求,将不再执行。
  • 如何实现在配置更改时,对于那些没移动的分片,可以正常响应给客户端呢?

    • 只要是Collection和Default状态的切片,都认为是拥有此分片的
    • Collection只会在拉取到数据后被设置
    • Default就是拥有这些切片
    • staleShard会去判断是否拥有当前切片,只要拥有,就可以对外提供服务

http://www.ppmy.cn/news/94653.html

相关文章

Python数据分析案例28——西雅图交通事故预测(不平衡样本处理)

本次案例适合机器学习数据科学方向的同学。 引言(废话集) 交通事故是一个严重的公共安全问题&#xff0c;在全球范围内每年都有成千上万的人死于交通事故。随着交通运输的发展和城市化进程的加速&#xff0c;交通事故已成为制约城市发展和人民幸福的主要因素之一。因此&#x…

STM32F4_I2C(从机EEPROM/MPU-6050)协议详解

目录 1. I2C是什么 2. I2C物理层介绍 3. I2C协议层介绍 3.1 I2C基本读写过程 3.1.1 通讯复合格式 3.2 通讯的起始和停止信号 3.3 数据有效性 3.4 地址及数据方向 3.5 响应 4. STM32的I2C特性及架构 4.1 I2C架构剖析 5. I2C通讯过程 5.1 主发送器 5.2 主接收器 6…

中间件篇2:中间件交付云原生之Operator

为什么我们需要Operator? 编写Operator其目的是将部署从文档化转为代码化,从人工部署转为自动化部署,即“部署即代码”,但还不是纯粹的部署即代码,因为基础设施依然需要手动去申请。 例如部署网络接入器(我司基于百度开源BFE二次开发的七层流量代理网关),我们需要先部…

攫取 RGB图像 和 PCM音频 数据

一、获取源码 1. 下载地址 Github: https://github.com/Gaaagaa/MediaGrabber 2. 编译提醒 这个测试程序&#xff0c;是使用 QtCreator 写的 Qt 界面程序&#xff0c;调用我封装好的 vlc_mgrabber_t 类实现了一个简单的播放器。MFC的我也写过相应的测试程序&#xff0c;这里…

games103——作业4

实验四主要使用 Shallow Wave 模拟流体 完整项目已上传至github。 文章目录 Height Feild(高度场)更新高度场更新速度场 Shallow Wave EquationDiscretization(离散化)一阶导数二阶导数 Discretized Shallow Wave EquationSolution 1Solution 2Pressure(压强)Viscosity(粘滞) 算…

深度探索:使用FFmpeg实现视频Logo的添加与移除

深度探索&#xff1a;使用FFmpeg实现视频Logo的添加与移除 前言一、FFmpeg简介&#xff08;Introduction to FFmpeg&#xff09;1.1 FFmpeg的定义&#xff08;Definition of FFmpeg&#xff09;1.2 FFmpeg的功能&#xff08;Functions of FFmpeg&#xff09;1.3 FFmpeg的安装&a…

ESP8266基于Lua开发使用U8g2模块驱动 i2c ssd1306 OLED显示

ESP8266基于Lua开发使用U8g2模块驱动 i2c ssd1306 OLED显示 &#x1f4cd;相关篇《ESP8266基于Lua开发点灯示例》 &#x1f4d6;U8g2对应的API接口函数&#xff1a;https://nodemcu.readthedocs.io/en/release/modules/u8g2/ &#x1f4fa;驱动显示效果&#xff1a; &#…

从月薪8k到月薪30k,自动化测试究竟该怎样学...

绝大多数测试工程师都是从功能测试做起的&#xff0c;工作忙忙碌碌&#xff0c;每天在各种业务需求学习和点点中度过&#xff0c;过了好多年发现自己还只是一个功能测试工程师。 随着移动互联网的发展&#xff0c;从业人员能力的整体进步&#xff0c;软件测试需要具备的能力要…