Python FastApi 实现签名验证

news/2024/10/4 15:24:12/

大家在写后台接口时,都想要设计一个安全的,稳定的架构来支持各种业务,此文章介绍的Token的机制,和签名的验证。Token作为鉴权,签名作防篡改

目录

1.Token

2.签名

3.接口中的实现


1.Token

此处介绍的实现方式较简单,用的缓存来记录用户对应的Token,想要更完善一些的机制,可以使用数据库做记录,运行时做恢复读取处理。看实际项目规模来定

此处缓存的实现参考我的另一篇文章,有详细的介绍:Python Cache 实现缓存管理类_python缓存之importlib.cache的使用详解-CSDN博客

python">    # 创建数据库连接对象dao = UserDAO()# 根据登录名查询用户信息use = dao.GetUserByLogin(loginName=Account)if (use != None):# 验证密码是否一致if (use.Password == MD5(PassWord)):# 创建Tokentoken = __createToken(dao, use)# 将生成的Token 保存到缓存中PyCache.set(use.UserCode, use)res = MsgModel(True, MsgErrorType.Success, "", token)else:# 返回错误信息res = MsgModel(False, MsgErrorType.Login_NameOrPwdError)else:# 返回错误信息res = MsgModel(False, MsgErrorType.Login_NameOrPwdError)

当用户登录成功后,会有相应的Token返回,前端就自己做登录状态记录。

请求头参数 和 Token的数据实例 参考如下:

python">class HeaderModel(BaseModelPY):UserCode :str #用户编码Timestamp :str #时间戳Nonce:str #随机数Signature:str #签名Permit:int #权限#Token令牌
class TicketAuth(BaseModelPY):UserCode:str #用户编码Token:str #Token令牌CreateTime:datetime=None  #创建时间ExpireTime:datetime=None #过期时间RefreshToken:str #刷新Token令牌RefreshTokenExpireTime:datetime=None #刷新Token令牌 过期时间

Token的机制:

Token 不当做请求参数,但会参与签名的生成(下面会介绍)

时间戳用来判断请求时间,超过范围的视作无效的请求,也会参与签名的生成

随机数:作为参数,也会参与签名的生成

权限:一般作为权限阈值的判断,要不要使用就看项目定义了

Token有过期时效   后台判断过期后,则视为Token无效

RefreshToken的过期时效比Token要长,当Token过期后,可以使用RefreshToken进行刷新Token,前端可以自行做逻辑刷新(可以返回特定的Code作判断),从而让客户无感

当Token和RefreshToken都过期后,则需要重新登录了

2.签名

先定一个验证签名的方法  verify_signature

从请求头里边获取相应的信息,如用户编码、时间戳、随机数和签名信息

继而做一些逻辑判断,数据为空,时间戳超出范围等

用 用户编码 从缓存对像中获取后台保存的Token,判断Token的时效性

最后 后台通过缓存的Token 去生成一次签名,跟请求中的签名进行判断是否一致

签名的组装逻辑可以自行定义,比如下面的举例:

# 签名 UserCode+Timestamp+Nonce+Token 进行MD5加密 大写

将用户编码、时间戳、随机数和Token进行字符拼装,再进行MD5加密,随后转成大写

前端也是这么操作,后台也是如此生成 ;   前端通过登录时获取的Token去进行拼装,后台用缓存对象中的Token进行拼装;

这里我检讨一下:生成签名的数据应该还要包含 请求参数 才能防篡改,我这里是因为项目不对外开放,就写简单了。

实现方式就大概这么做,可以自行扩展

python">def verify_signature(request: Request) -> MsgModel:    try:# 从请求头中获取认证信息headerModel = HeaderModel()token: TicketAuthmsg = MsgModel.Success()IsValid: bool = True# 通过request获取Header里边的数据模型headerModel.UserCode = request.headers.get("UserCode")  # 用户编码headerModel.Timestamp = request.headers.get("Timestamp")  # 时间戳headerModel.Nonce = request.headers.get("Nonce")  # 随机数headerModel.Signature = request.headers.get("Signature")  # 签名# Header数据不能为空if (IsValid and headerModel.UserCode == None or not headerModel.UserCode):IsValid = Falsemsg = MsgModel(False, MsgErrorType.Header_Empty, "Header is empty")if (IsValid):# 验证时间戳# 将时间戳转换为日期时间对象ft = float(headerModel.Timestamp)/1000ts = datetime.fromtimestamp(timestamp=ft)# 时间计算差time_difference = datetime.now() - ts# 时间戳一分钟内有效if (time_difference.seconds > 300):IsValid = Falsemsg = MsgModel(False, MsgErrorType.Request_TimeOut,f"Request Time Out[{time_difference.seconds}]")if (IsValid):# 判断Token是否有效# 从缓存中取数据cache_key = headerModel.UserCode+"_Token"token = PyCache.get(cache_key)if (token is not None):# 判断Token是否过期if (token.ExpireTime < datetime.now()):IsValid = Falsemsg = MsgModel(False, MsgErrorType.Token_Expired,"Token Have Expired")else:IsValid = Falsemsg = MsgModel(False, MsgErrorType.Token_Expired,"Token Have Expired")if (IsValid):# 验证用户请求是否合法# 签名 UserCode+Timestamp+Nonce+Token 进行MD5加密 大写strRaw = headerModel.UserCode+headerModel.Timestamp+headerModel.Nonce+token.Tokensign = MD5(strRaw).upper()  # 变为大写# 验证参数中的签名是否一致if (headerModel.Signature != sign):IsValid = Falsemsg = MsgModel(False, MsgErrorType.Sign_Invalid,"The signature is invalid")if (IsValid):try:#从缓存数据获取用户数据 得到用户类型#获取用户类型相应的权限#再接口中判断是否足够的权限来调用user = PyCache.get(headerModel.UserCode)if (user is not None):#获取用户类型的权限dictList = PyCache.get("DictData")dict = next((x for x in dictList if x.DictCode == user.UserType), None)if (dict is not None):headerModel.Permit = int(dict.DictValue)else:headerModel.Permit = 0else:headerModel.Permit = 0except Exception as ex:headerModel.Permit = 0LogOperate.error("获取用户权限发生异常", ex)return MsgModel.Internal_Error()# headerModel.Permit=100msg.MsgContent = headerModelreturn msgexcept Exception as ex:LogOperate.error("签名验证发生异常", ex)return MsgModel.Internal_Error()

3.接口中的实现

最后再说下在每个接口中如何实现:

--- 无需签名验证的接口 : 比如登录接口

只需要定义入参 即可

python">@userLoginController.post(apiPrefix+'Login', name="登录-用户登录", response_model=None, summary="登录-用户登录")
async def user_login(Account: str = Form(...), PassWord: str = Form(...)):res = MsgModel.Failure()return res

--- 需要签名验证的接口:

一是定义入参,二是增加调用签名验证方法 verify_signature

下面的例子:

除了分页必要的参数 page、pageSize 等等,最后有个IsValid=Depends(verify_signature) 即是签名验证方法的调用

第一行就是作为判断签名验证是否通过了

如果签名验证失败了,记得要返回IsValid对象,否则接口会返回null

    if (IsValid.MsgSuccessed): 

    else:

        return IsValid

        

python">from fastapi import APIRouter, Depends
from datetime import datetime
from Modules.Models.MsgModel import MsgModel, MsgErrorType, MsgPageModel
from Modules.SysFrame.String import getApiPrefix, getVersion, IsNullOrEmpty, Guid, MD5
from Modules.SysFrame.GlobalData import verify_signature@orderController.get(apiPrefix+'GetOrderPageList', name="Order-分页获取订单列表数据")
async def GetOrderPageList(page: int,  pageSize: int,  begin: datetime, end: datetime, orderId: str, customer: str, IsValid=Depends(verify_signature)):if (IsValid.MsgSuccessed):try:orderDao = OrderDAO()res = orderDao.GetOrderPageList(page, pageSize, begin, end, orderId, customer)if (res.IsSucessed):mList = res.Tagmodels = []for m in mList:model = _Order_Entity_To_Model(m)models.append(model.tojson())return MsgPageModel.Success(models, total=res.Total)else:return MsgPageModel.Failure(msgDes=res.Message)except Exception as ex:LogOperate.error("GetOrderPageList", ex)return MsgPageModel.Internal_Error()else:return IsValid


http://www.ppmy.cn/news/1533784.html

相关文章

docker pull 超时Timeout失败的解决办法

当国内开发者docker pull遇到如下提示时&#xff0c;不要惊讶 [rootvm /]# docker pull postgres Using default tag: latest Error response from daemon: Get "https://registry-1.docker.io/v2/": dial tcp 128.121.146.235:443: i/o timeout [rootvm /]# 自2024…

关于对比学习(简单整理

什么是对比学习&#xff1f; ​ 对比学习是一种学习方法&#xff0c;侧重于通过对比正反两方面的实例来提取有意义的表征。它利用的假设是&#xff0c;在学习到的嵌入空间中&#xff0c;相似的实例应靠得更近&#xff0c;而不相似的实例应离得更远。通过将学习作为一项辨别任务…

在Git中操作失误,如何撤回

在 Git 中&#xff0c;撤回操作可以通过多种方式实现&#xff0c;具体取决于你想要撤回的操作类型&#xff08;如提交、暂存、修改等&#xff09;。以下是几种常见场景及其相应的撤回方法&#xff1a; 1. 撤回未暂存的修改 如果你在工作区中进行了更改&#xff0c;但还没有将…

适配器模式【对象适配器模式和类适配器模式,以及具体使用场景】

2.1-适配器模式 ​ 类的适配器模式是把适配者类的API转换成为目标类的API&#xff0c;适配器模式使得原来由于接口不兼容而不能一起工作的那些类可以一起工作&#xff0c;其实在具体的开发中&#xff0c;对于自己系统一开始的设计不会优先考虑适配器模式&#xff0c;通常会将接…

滚雪球学MySQL[8.3讲]:数据库中的JSON与全文检索详解:从数据存储到全文索引的高效使用

全文目录&#xff1a; 前言8.3 JSON与全文检索1. JSON数据类型的使用1.1 JSON 数据类型概述1.2 JSON 数据的插入与查询1.3 JSON 常用函数与操作1.4 JSON使用的优缺点与性能考虑 2. 全文索引与全文检索2.1 全文索引概述2.2 全文检索的使用2.3 全文检索模式2.4 全文索引优化与性能…

Electron 安装以及搭建一个工程

安装Node.js 在使用Electron进行开发之前&#xff0c;需要安装 Node.js。 官方建议使用最新的LTS版本。 检查 Node.js 是否正确安装&#xff1a; # 查看node版本 node -v # 查看npm版本 npm -v注意 开发者需要在开发环境安装 Node.js 才能编写 Electron 项目&#xff0c;但是…

QT系统学习篇(3)- Qt开发常用算法及控件原理

一、Qt中 Qt框架中和模块提供一些算法和常用函数 比如 double d1-59.6,d292.5;double d3qAbs(d1);double d4qMax(d1,d2);qDebug()<<"d3"<<d3<<"d4"<<d4<<endl;二、Qt窗口及控件原理设计 理解Qt窗口的生成、属性 1、Qt窗口…

Windows 环境搭建 CUDA 和 cuDNN 详细教程

CUDA CUDA&#xff08;Compute Unified Device Architecture&#xff09;是由NVIDIA公司推出的一个并行计算平台和编程模型&#xff0c;它允许开发者使用NVIDIA GPU进行通用计算&#xff08;即GPGPU&#xff09;&#xff0c;从而加速各种计算密集型任务。CUDA提供了一套基于C/C…