【ETCD】【源码阅读】深入分析 applierV3backend.Apply`方法源码

server/2024/12/23 1:26:22/

applierV3backend的Apply主要负责将 Raft 请求 (pb.InternalRaftRequest) 应用到 Etcd 的后端存储中。它处理各种不同类型的请求,并且根据请求的具体内容调用相应的处理逻辑。
版本【release

文章目录

      • 一、完整源码
      • 二、方法详解
        • 1. 定义和初始化
        • 2. 记录操作开始时间并设置延迟处理
        • 3. 处理不同的 Raft 请求
          • 3.1 处理 `ClusterVersionSet` 请求
          • 3.2 处理 `ClusterMemberAttrSet` 请求
          • 3.3 处理 `DowngradeInfoSet` 请求
          • 3.4 处理常规操作
        • 4. 处理具体的操作请求(Range, Put, DeleteRange, Txn, etc.)
          • 4.5 处理与 Lease 相关的请求
          • 4.6 处理与身份验证相关的请求
          • **4.7 处理用户和角色相关的请求**
        • 5. 未实现的操作
        • 6. 返回结果
      • 三、 总结

一、完整源码

func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult {op := "unknown"ar := &applyResult{}defer func(start time.Time) {success := ar.err == nil || ar.err == mvcc.ErrCompactedapplySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())warnOfExpensiveRequest(a.s.Logger(), a.s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)if !success {warnOfFailedRequest(a.s.Logger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)}}(time.Now())switch {case r.ClusterVersionSet != nil: // Implemented in 3.5.xop = "ClusterVersionSet"a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3)return nilcase r.ClusterMemberAttrSet != nil:op = "ClusterMemberAttrSet" // Implemented in 3.5.xa.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3)return nilcase r.DowngradeInfoSet != nil:op = "DowngradeInfoSet" // Implemented in 3.5.xa.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3)return nil}if !shouldApplyV3 {return nil}// call into a.s.applyV3.F instead of a.F so upper appliers can check individual callsswitch {case r.Range != nil:op = "Range"ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range)case r.Put != nil:op = "Put"ar.resp, ar.trace, ar.err = a.s.applyV3.Put(context.TODO(), nil, r.Put)case r.DeleteRange != nil:op = "DeleteRange"ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)case r.Txn != nil:op = "Txn"ar.resp, ar.trace, ar.err = a.s.applyV3.Txn(context.TODO(), r.Txn)case r.Compaction != nil:op = "Compaction"ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction)case r.LeaseGrant != nil:op = "LeaseGrant"ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)case r.LeaseRevoke != nil:op = "LeaseRevoke"ar.resp, ar.err = a.s.applyV3.LeaseRevoke(r.LeaseRevoke)case r.LeaseCheckpoint != nil:op = "LeaseCheckpoint"ar.resp, ar.err = a.s.applyV3.LeaseCheckpoint(r.LeaseCheckpoint)case r.Alarm != nil:op = "Alarm"ar.resp, ar.err = a.s.applyV3.Alarm(r.Alarm)case r.Authenticate != nil:op = "Authenticate"ar.resp, ar.err = a.s.applyV3.Authenticate(r.Authenticate)case r.AuthEnable != nil:op = "AuthEnable"ar.resp, ar.err = a.s.applyV3.AuthEnable()case r.AuthDisable != nil:op = "AuthDisable"ar.resp, ar.err = a.s.applyV3.AuthDisable()case r.AuthStatus != nil:ar.resp, ar.err = a.s.applyV3.AuthStatus()case r.AuthUserAdd != nil:op = "AuthUserAdd"ar.resp, ar.err = a.s.applyV3.UserAdd(r.AuthUserAdd)case r.AuthUserDelete != nil:op = "AuthUserDelete"ar.resp, ar.err = a.s.applyV3.UserDelete(r.AuthUserDelete)case r.AuthUserChangePassword != nil:op = "AuthUserChangePassword"ar.resp, ar.err = a.s.applyV3.UserChangePassword(r.AuthUserChangePassword)case r.AuthUserGrantRole != nil:op = "AuthUserGrantRole"ar.resp, ar.err = a.s.applyV3.UserGrantRole(r.AuthUserGrantRole)case r.AuthUserGet != nil:op = "AuthUserGet"ar.resp, ar.err = a.s.applyV3.UserGet(r.AuthUserGet)case r.AuthUserRevokeRole != nil:op = "AuthUserRevokeRole"ar.resp, ar.err = a.s.applyV3.UserRevokeRole(r.AuthUserRevokeRole)case r.AuthRoleAdd != nil:op = "AuthRoleAdd"ar.resp, ar.err = a.s.applyV3.RoleAdd(r.AuthRoleAdd)case r.AuthRoleGrantPermission != nil:op = "AuthRoleGrantPermission"ar.resp, ar.err = a.s.applyV3.RoleGrantPermission(r.AuthRoleGrantPermission)case r.AuthRoleGet != nil:op = "AuthRoleGet"ar.resp, ar.err = a.s.applyV3.RoleGet(r.AuthRoleGet)case r.AuthRoleRevokePermission != nil:op = "AuthRoleRevokePermission"ar.resp, ar.err = a.s.applyV3.RoleRevokePermission(r.AuthRoleRevokePermission)case r.AuthRoleDelete != nil:op = "AuthRoleDelete"ar.resp, ar.err = a.s.applyV3.RoleDelete(r.AuthRoleDelete)case r.AuthUserList != nil:op = "AuthUserList"ar.resp, ar.err = a.s.applyV3.UserList(r.AuthUserList)case r.AuthRoleList != nil:op = "AuthRoleList"ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList)default:a.s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r))}return ar
}

二、方法详解

1. 定义和初始化
op := "unknown"
ar := &applyResult{}
  • op 用于标识当前操作的类型,初始值为 "unknown",在后续处理中会根据具体操作类型更新。
  • ar 是一个 applyResult 结构体的实例,用于存储操作结果,包括响应 (resp)、错误 (err) 等信息。
2. 记录操作开始时间并设置延迟处理
defer func(start time.Time) {success := ar.err == nil || ar.err == mvcc.ErrCompactedapplySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())warnOfExpensiveRequest(a.s.Logger(), a.s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)if !success {warnOfFailedRequest(a.s.Logger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)}
}(time.Now())
  • 延迟执行的回调函数:它在 Apply 方法结束时执行,记录应用操作的成功与否,并且基于操作的时长监控是否出现了性能问题或失败。
  • applySec.WithLabelValues 用来记录操作的耗时并标记成功与否。
  • 如果操作耗时过长,会触发警告 (warnOfExpensiveRequest)。
  • 如果操作失败,会记录失败的警告 (warnOfFailedRequest)。
3. 处理不同的 Raft 请求

接下来,代码根据 pb.InternalRaftRequest 中不同字段的值来判断请求类型并进行相应处理。以下是一些具体的分支和操作:

3.1 处理 ClusterVersionSet 请求
case r.ClusterVersionSet != nil: op = "ClusterVersionSet"a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3)return nil
  • 如果 ClusterVersionSet 字段不为空,表示请求是关于设置集群版本的。
  • 调用 ClusterVersionSet 方法处理该请求。
3.2 处理 ClusterMemberAttrSet 请求
case r.ClusterMemberAttrSet != nil:op = "ClusterMemberAttrSet"a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3)return nil
  • 如果 ClusterMemberAttrSet 字段不为空,表示请求是关于设置集群成员属性的。
  • 调用 ClusterMemberAttrSet 方法处理该请求。
3.3 处理 DowngradeInfoSet 请求
case r.DowngradeInfoSet != nil:op = "DowngradeInfoSet"a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3)return nil
  • 如果 DowngradeInfoSet 字段不为空,表示请求是关于设置降级信息的。
  • 调用 DowngradeInfoSet 方法处理该请求。
3.4 处理常规操作
if !shouldApplyV3 {return nil
}
  • 如果 shouldApplyV3false,则直接返回 nil,跳过后续的请求处理。
4. 处理具体的操作请求(Range, Put, DeleteRange, Txn, etc.)

接下来的部分是针对 Raft 请求中的不同字段进行处理。这些字段通常是 Etcd 后端存储操作的具体请求,如 Range, Put, DeleteRange, Txn, Compaction 等。

switch {
case r.Range != nil:op = "Range"ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range)
case r.Put != nil:op = "Put"ar.resp, ar.trace, ar.err = a.s.applyV3.Put(context.TODO(), nil, r.Put)
case r.DeleteRange != nil:op = "DeleteRange"ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
case r.Txn != nil:op = "Txn"ar.resp, ar.trace, ar.err = a.s.applyV3.Txn(context.TODO(), r.Txn)
case r.Compaction != nil:op = "Compaction"ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction)
  • 每个 case 分支对应不同的请求类型:
    • Range:执行范围查询操作。
    • Put:执行数据写入操作。
    • DeleteRange:执行范围删除操作。
    • Txn:执行事务操作。
    • Compaction:执行存储压缩操作。

对于每种操作,都会调用相应的处理方法,并把结果存储到 ar.respar.err 中。

4.5 处理与 Lease 相关的请求
case r.LeaseGrant != nil:op = "LeaseGrant"ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
case r.LeaseRevoke != nil:op = "LeaseRevoke"ar.resp, ar.err = a.s.applyV3.LeaseRevoke(r.LeaseRevoke)
case r.LeaseCheckpoint != nil:op = "LeaseCheckpoint"ar.resp, ar.err = a.s.applyV3.LeaseCheckpoint(r.LeaseCheckpoint)
  • Lease 操作:这些请求处理与 Etcd 租约(lease)相关的操作,包括租约的授予、撤销和检查点等。
4.6 处理与身份验证相关的请求
case r.Authenticate != nil:op = "Authenticate"ar.resp, ar.err = a.s.applyV3.Authenticate(r.Authenticate)
  • 身份验证操作:处理身份验证请求,通常用于授权等功能。
4.7 处理用户和角色相关的请求
case r.AuthUserAdd != nil:op = "AuthUserAdd"ar.resp, ar.err = a.s.applyV3.UserAdd(r.AuthUserAdd)
  • 这些请求处理与 Etcd 中的用户和角色相关的操作,包括添加用户、删除用户、修改密码、赋予角色等。
5. 未实现的操作
default:a.s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r))
  • 如果 Raft 请求的类型未在上述处理分支中出现,程序会记录 Panic 日志,标明该操作尚未实现。
6. 返回结果
return ar
  • 最后,返回 applyResult 结构体实例 ar,其中包含操作的结果。

三、 总结

  • 操作类型判断:根据 Raft 请求的不同类型,选择合适的处理方法。
  • 性能监控:对每个操作的处理时间进行监控,记录成功与否,并在超时或失败时进行警告。
  • 支持多种操作:包括数据存取、事务、压缩、租约管理、身份验证和权限管理等。
  • 错误处理:对不同类型的错误进行处理并返回响应结果。

http://www.ppmy.cn/server/152371.html

相关文章

Docker搭建kafka环境

系统:MacOS Sonoma 14.1 Docker版本:Docker version 27.3.1, build ce12230 Docker desktop版本:Docker Desktop 4.36.0 (175267) 1.拉取镜像 先打开Docker Desktop,然后在终端执行命令 docker pull lensesio/fast-data-dev …

ios的safari下载文件 文件名乱码

当使用nginx代理文件并下载文件时,返回的协议头Content-Disposition中filename%E9%9B%AA%E5%B1%B1.jpg中文内容会是URL编码的形式,当客户端在safari浏览器下载下载文件时,文件名不会转换(URL解码)为正常的中文。 应该…

全国青少年信息学奥林匹克竞赛(信奥赛)备考实战之分支结构(关系运算符和逻辑运算符)

在C编程的旅途中,经常会遇到需要根据不同条件执行不同代码的情况。这时,分支结构就显得尤为重要。它就像程序中的“决策点”,让程序能够根据输入、状态或其他条件智能地选择执行路径。这时就需要学习关系表达式与逻辑表达式,有了判…

Altair: 轻松创建交互式数据可视化

Altair: 轻松创建交互式数据可视化 Altair 是一个基于 Vega-Lite 的 Python 数据可视化库,它旨在简化数据可视化的创建过程,尤其适用于统计图表的生成。Altair 强调声明式编码方式,通过简单的语法,用户能够快速创建复杂的交互式图…

使用JavaScript获取商品详情接口:一个实用的指南

引言 在现代电子商务中,获取商品详情是提供个性化购物体验的关键。JavaScript,作为一种广泛使用的编程语言,可以轻松地与各种API进行交互,从而获取商品信息。本文将指导您如何使用JavaScript调用商品详情接口,并处理返…

基于Spring Boot的个人财务系统

一、系统背景与目的 随着全球经济的发展和人们生活水平的提高,个人财务管理变得越来越重要。传统的个人财务软件存在操作复杂、用户体验差、数据不安全等问题,无法满足用户的个性化需求。因此,开发一种基于Spring Boot的个人财务系统&#x…

探秘数据库索引:功能、意义与实例

探秘数据库索引:功能、意义与实例 ​数据库索引就像是一个 “路标” 或者 “地图”。当进行查询操作时,数据库软件首先会查找索引表。索引表中的数据是经过排序的,并且和原表中的数据存在关联(这种关联可以是基于数据行的物理位置…

在uniapp Vue3版本中如何解决webH5网页浏览器跨域的问题

问题复现 uniapp项目在浏览器运行,有可能调用某些接口会出现跨域问题,报错如下图所示: 什么是跨域? 存在跨域问题的原因是因为浏览器的同源策略,也就是说前端无法直接发起跨域请求。同源策略是一个基础的安全策略&a…