Python异步Redis客户端与通用缓存装饰器

news/2024/10/9 13:33:14/

前言

这里我将通过 redis-py 简易封装一个异步的Redis客户端,然后主要讲解设计一个支持各种缓存代理(本地内存、Redis等)的缓存装饰器,用于在减少一些不必要的计算、存储层的查询、网络IO等。

具体代码都封装在 HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com) 中,以大家便捷使用。

redis_10">异步redis客户端

首先安装 redis-py 库

python">pip install redis

Redis 之前是不支持异步的,后面为了统一异步redis操作与python常用的redis.py 的api接口一致,aioredis的作者已经将 aioredis 加入了redis中维护,安装的版本大于 4.2.0rc1 就行。

BaseRedisManager 封装

python">#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @Desc: { redis连接处理模块 }
# @Date: 2023/05/03 21:13
from datetime import timedelta
from typing import Optional, Unionfrom redis import Redis
from redis import asyncio as aioredisfrom py_tools import constants
from py_tools.decorators.cache import CacheMeta, cache_json, RedisCacheProxy, AsyncRedisCacheProxyclass BaseRedisManager:"""Redis客户端管理器"""client: Union[Redis, aioredis.Redis] = Nonecache_key_prefix = constants.CACHE_KEY_PREFIX@classmethoddef init_redis_client(cls,async_client: bool = False,host: str = "localhost",port: int = 6379,db: int = 0,password: Optional[str] = None,max_connections: Optional[int] = None,**kwargs):"""初始化 Redis 客户端。Args:async_client (bool): 是否使用异步客户端,默认为 False(同步客户端)host (str): Redis 服务器的主机名,默认为 'localhost'port (int): Redis 服务器的端口,默认为 6379db (int): 要连接的数据库编号,默认为 0password (Optional[str]): 密码可选max_connections (Optional[int]): 最大连接数。默认为 None(不限制连接数)**kwargs: 传递给 Redis 客户端的其他参数Returns:None"""if cls.client is None:redis_client_cls = Redisif async_client:redis_client_cls = aioredis.Rediscls.client = redis_client_cls(host=host, port=port, db=db, password=password, max_connections=max_connections, **kwargs)return cls.client@classmethoddef cache_json(cls,ttl: Union[int, timedelta] = 60,key_prefix: str = None,):"""缓存装饰器(仅支持缓存能够json序列化的数据)缓存函数整体结果Args:ttl: 过期时间 默认60skey_prefix: 默认的key前缀, 再未指定key时使用Returns:"""key_prefix = key_prefix or cls.cache_key_prefixif isinstance(ttl, timedelta):ttl = int(ttl.total_seconds())cache_proxy = RedisCacheProxy(cls.client)if isinstance(cls.client, aioredis.Redis):cache_proxy = AsyncRedisCacheProxy(cls.client)return cache_json(cache_proxy=cache_proxy, key_prefix=key_prefix, ttl=ttl)

还是跟之前封装客户端一样的简易封装,由类属性 client 维护真正操作的redis的客户端,通过 init_redis_client 方法进行初始化。这样封装的目的就是在系统中只初始化一份 redis 客户端,操作时可以直接使用类方法。BaseRedisManager 只实现一些通用的 redis 操作(有待挖掘),具体还是需要业务Manager来继承封装业务中操作redis的方法。目前只实现了一个redis缓存装饰器,其实内部就是组织参数设置redis代理,然后调用另外一个通用的缓存装饰器,这样使用的时候不需要制定缓存代理了。

缓存装饰器

python">#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @Desc: { 缓存装饰器模块 }
# @Date: 2023/05/03 19:23
import asyncio
import functools
import hashlib
import json
from datetime import timedeltaimport cacheout
from redis import Redis
from redis import asyncio as aioredis
from pydantic import BaseModel, Field
from typing import Union
from py_tools import constantsMEMORY_PROXY = MemoryCacheProxy(cache_client=cacheout.Cache(maxsize=1024))def cache_json(cache_proxy: BaseCacheProxy = MEMORY_PROXY,key_prefix: str = constants.CACHE_KEY_PREFIX,ttl: Union[int, timedelta] = 60,
):"""缓存装饰器(仅支持缓存能够json序列化的数据)Args:cache_proxy: 缓存代理客户端, 默认系统内存ttl: 过期时间 默认60skey_prefix: 默认的key前缀Returns:"""key_prefix = f"{key_prefix}:cache_json"if isinstance(ttl, timedelta):ttl = int(ttl.total_seconds())def _cache(func):def _gen_key(*args, **kwargs):"""生成缓存的key"""# 根据函数信息与参数生成# key => 函数所在模块:函数名:函数位置参数:函数关键字参数 进行hashparam_args_str = ",".join([str(arg) for arg in args])param_kwargs_str = ",".join(sorted([f"{k}:{v}" for k, v in kwargs.items()]))hash_str = f"{func.__module__}:{func.__name__}:{param_args_str}:{param_kwargs_str}"hash_ret = hashlib.sha256(hash_str.encode()).hexdigest()# 根据哈希结果生成key 默认前缀:函数所在模块:函数名:hashhash_key = f"{key_prefix}:{func.__module__}:{func.__name__}:{hash_ret}"return hash_key@functools.wraps(func)def sync_wrapper(*args, **kwargs):"""同步处理"""# 生成缓存的keyhash_key = _gen_key(*args, **kwargs)# 先从缓存获取数据cache_data = cache_proxy.get(hash_key)if cache_data:# 有直接返回print(f"命中缓存: {hash_key}")return json.loads(cache_data)# 没有,执行函数获取结果ret = func(*args, **kwargs)# 缓存结果cache_proxy.set(key=hash_key, value=json.dumps(ret), ttl=ttl)return ret@functools.wraps(func)async def async_wrapper(*args, **kwargs):"""异步处理"""# 生成缓存的keyhash_key = _gen_key(*args, **kwargs)# 先从缓存获取数据cache_data = await cache_proxy.get(hash_key)if cache_data:# 有直接返回return json.loads(cache_data)# 没有,执行函数获取结果ret = await func(*args, **kwargs)# 缓存结果await cache_proxy.set(key=hash_key, value=json.dumps(ret), ttl=ttl)return retreturn async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapperreturn _cache

cache_json 是一个带单参数的缓存装饰器,可以指定一些缓存的代理、缓存key前缀、缓存ttl等。

内部实现了同步、异步函数的缓存处理,关键点其实就是如何构造唯一的缓存key,这里就是根据key前缀与函数的一些签名信息来构造的。

python">def _gen_key(*args, **kwargs):"""生成缓存的key"""# 没有传递key信息,根据函数信息与参数生成# key => 函数所在模块:函数名:函数位置参数:函数关键字参数 进行hashparam_args_str = ",".join([str(arg) for arg in args])param_kwargs_str = ",".join(sorted([f"{k}:{v}" for k, v in kwargs.items()]))hash_str = f"{func.__module__}:{func.__name__}:{param_args_str}:{param_kwargs_str}"hash_ret = hashlib.sha256(hash_str.encode()).hexdigest()# 根据哈希结果生成key 默认前缀:函数所在模块:函数名:hashhash_key = f"{key_prefix}:{func.__module__}:{func.__name__}:{hash_ret}"return hash_key

函数所在模块:函数名:函数位置参数:函数关键字参数 进行hash,在处理关键字参数的需要排个序,来保证相同的参数,顺序不同但缓存key一致。后面的逻辑就是常见的设置缓存操作。

image.png

缓存代理类

python">#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @Desc: { 缓存装饰器模块 }
# @Date: 2023/05/03 19:23
import asyncio
import functools
import hashlib
import json
from datetime import timedeltaimport cacheout
from redis import Redis
from redis import asyncio as aioredis
from pydantic import BaseModel, Field
from typing import Union
from py_tools import constantsclass CacheMeta(BaseModel):"""缓存元信息"""key: str = Field(description="缓存的key")ttl: Union[int, timedelta] = Field(description="缓存有效期")cache_client: str = Field(description="缓存的客户端(Redis、Memcached等)")data_type: str = Field(description="缓存的数据类型(str、list、hash、set)")class BaseCacheProxy(object):"""缓存代理基类"""def __init__(self, cache_client):self.cache_client = cache_client  # 具体的缓存客户端,例如Redis、Memcached等def set(self, key: str, value: str, ttl: int):raise NotImplementeddef get(self, key):cache_data = self.cache_client.get(key)return cache_dataclass RedisCacheProxy(BaseCacheProxy):"""同步redis缓存代理"""def __init__(self, cache_client: Redis):super().__init__(cache_client)def set(self, key, value, ttl):self.cache_client.setex(name=key, value=value, time=ttl)class AsyncRedisCacheProxy(BaseCacheProxy):"""异步Redis缓存代理"""def __init__(self, cache_client: aioredis.Redis):super().__init__(cache_client)async def set(self, key, value, ttl):await self.cache_client.setex(name=key, value=value, time=ttl)async def get(self, key):cache_data = await self.cache_client.get(key)return cache_dataclass MemoryCacheProxy(BaseCacheProxy):"""系统内存缓存代理"""def __init__(self, cache_client: cacheout.Cache):super().__init__(cache_client)def set(self, key, value, ttl):self.cache_client.set(key=key, value=value, ttl=ttl)MEMORY_PROXY = MemoryCacheProxy(cache_client=cacheout.Cache(maxsize=1024))

这里设置一个缓存代理抽象类是用于封装屏蔽不同缓存客户端的操作不一致性。统一成如下入口

python">def set(self, key: str, value: str, ttl: int):raise NotImplementeddef get(self, key):cache_data = self.cache_client.get(key)return cache_data

让具体的缓存客户端重写(实现)这两个方法,以达到缓存装饰器的通用性。目前只实现了同步、异步redis缓存代理以及通过 cacheout 库实现的本地内存缓存代理,后面接入其他的缓存代理(例如Memcached等)就不用动cache_json函数了,只要继承 BaseCacheProxy,实现具体的 set、get 操作即可。

pip install python-memcached
python">import memcacheclass MemcacheCacheProxy(BaseCacheProxy):def __init__(self, cache_client: memcache.Client):super().__init__(cache_client)def set(self, key, value, ttl):self.cache_client.set(key, value, time=ttl)

由于获取缓存的方法逻辑一致,故而直接复用就行。

测试Demo

python">#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @File: cache.py
# @Desc: { cache demo 模块 }
# @Date: 2024/04/23 11:11
import asyncio
import time
from datetime import timedeltaimport cacheoutfrom py_tools.connections.db.redis_client import BaseRedisManager
from py_tools.decorators.cache import cache_json, MemoryCacheProxy, RedisCacheProxy, AsyncRedisCacheProxyclass RedisManager(BaseRedisManager):client = Noneclass AsyncRedisManager(BaseRedisManager):client = NoneRedisManager.init_redis_client(async_client=False)
AsyncRedisManager.init_redis_client(async_client=True)memory_proxy = MemoryCacheProxy(cache_client=cacheout.Cache())
redis_proxy = RedisCacheProxy(cache_client=RedisManager.client)
aredis_proxy = AsyncRedisCacheProxy(cache_client=AsyncRedisManager.client)@cache_json(key_prefix="demo", ttl=3)
def memory_cache_demo_func(name: str, age: int):return {"test_memory_cache": "hui-test", "name": name, "age": age}@cache_json(cache_proxy=redis_proxy, ttl=10)
def redis_cache_demo_func(name: str, age: int):return {"test_redis_cache": "hui-test", "name": name, "age": age}@cache_json(cache_proxy=aredis_proxy, ttl=timedelta(minutes=1))
async def aredis_cache_demo_func(name: str, age: int):return {"test_async_redis_cache": "hui-test", "name": name, "age": age}@AsyncRedisManager.cache_json(ttl=30)
async def aredis_manager_cache_demo_func(name: str, age: int):return {"test_async_redis_manager_cache": "hui-test", "name": name, "age": age}def memory_cache_demo():print("memory_cache_demo")ret1 = memory_cache_demo_func(name="hui", age=18)print("ret1", ret1)print()ret2 = memory_cache_demo_func(name="hui", age=18)print("ret2", ret2)print()time.sleep(3)ret3 = memory_cache_demo_func(age=18, name="hui")print("ret3", ret3)print()assert ret1 == ret2 == ret3def redis_cache_demo():print("redis_cache_demo")ret1 = redis_cache_demo_func(name="hui", age=18)print("ret1", ret1)print()ret2 = redis_cache_demo_func(name="hui", age=18)print("ret2", ret2)assert ret1 == ret2async def aredis_cache_demo():print("aredis_cache_demo")ret1 = await aredis_cache_demo_func(name="hui", age=18)print("ret1", ret1)print()ret2 = await aredis_cache_demo_func(name="hui", age=18)print("ret2", ret2)assert ret1 == ret2async def aredis_manager_cache_demo():print("aredis_manager_cache_demo")ret1 = await aredis_manager_cache_demo_func(name="hui", age=18)print("ret1", ret1)print()ret2 = await aredis_manager_cache_demo_func(name="hui", age=18)print("ret2", ret2)assert ret1 == ret2async def main():memory_cache_demo()redis_cache_demo()await aredis_cache_demo()await aredis_manager_cache_demo()if __name__ == '__main__':asyncio.run(main())

输出结果

python">memory_cache_demo
ret1 {'test_memory_cache': 'hui-test', 'name': 'hui', 'age': 18}命中缓存: demo:cache_json:__main__:memory_cache_demo_func:46c6a618a88eb5067a00915c10c97c6c72d5073ecf9b04060433de75b2d21f51
ret2 {'test_memory_cache': 'hui-test', 'name': 'hui', 'age': 18}ret3 {'test_memory_cache': 'hui-test', 'name': 'hui', 'age': 18}redis_cache_demo
ret1 {'test_redis_cache': 'hui-test', 'name': 'hui', 'age': 18}命中缓存: py-tools:cache_json:__main__:redis_cache_demo_func:a00b13aa2e1e56ad328d1956bc3c3fb8e89b7007453a780e866cc3ccafb51d73
ret2 {'test_redis_cache': 'hui-test', 'name': 'hui', 'age': 18}
aredis_cache_demo
ret1 {'test_async_redis_cache': 'hui-test', 'name': 'hui', 'age': 18}ret2 {'test_async_redis_cache': 'hui-test', 'name': 'hui', 'age': 18}
aredis_manager_cache_demo
ret1 {'test_async_redis_manager_cache': 'hui-test', 'name': 'hui', 'age': 18}ret2 {'test_async_redis_manager_cache': 'hui-test', 'name': 'hui', 'age': 18}

Redis 缓存情况

缓存信息还是挺清晰的就是有点长。由于是从主入口调用的函数,所以 func.__module__ 是 __main__。

缓存装饰器一般适用于一些参数相同, 结果经常不变的情况下,以及允许短时间内出现数据不一致的场景。如下是一些典型的应用场景

  1. API Token缓存:对于需要使用API Token进行身份验证的API请求,通常API Token具有一定的有效期。在这种情况下,你可以使用缓存装饰器缓存API Token,以避免在每次请求时重新生成或从后端服务获取。这样可以降低对后端服务的负载,并提高系统的响应速度。
  2. OSS Sign URL缓存:当需要生成签名URL来访问对象存储服务(如AWS S3、阿里云OSS)中的资源时,通常需要对URL进行签名以确保安全性。在这种情况下,你可以使用缓存装饰器缓存已签名的URL,在一定时间内重复使用相同的签名URL,而不必重新计算签名。这样可以降低对签名计算资源的消耗,并减少重复的签名请求。
  3. 频繁查询的数据缓存:对于一些数据不经常变化但是频繁被查询的情况,比如一些静态配置信息、全局参数等,可以使用缓存装饰器将查询结果缓存起来,减少数据库查询次数,提高系统的响应速度。
  4. 外部API响应结果缓存:当你调用外部API获取数据时,有时这些数据在一段时间内不会发生变化。在这种情况下,你可以使用缓存装饰器缓存外部API的响应结果,以避免频繁地向外部API发出请求。这不仅可以提高系统的性能,还可以降低对外部服务的依赖性。

总的来说,缓存装饰器可以应用于许多场景,特别是在需要提高性能、减少资源消耗和避免重复请求数据的情况下。通过合理地设置缓存时间,可以权衡数据的新鲜度和系统性能,从而实现更好的用户体验。

源代码

源代码已上传到了Github,里面也有具体的使用Demo,欢迎大家一起体验、贡献。

HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com)


http://www.ppmy.cn/news/1449069.html

相关文章

爬虫学习:基本网络请求库的使用

目录 一、urllib网络库 1.urlopen()方法 2.request方法 二、requests网络请求库 1.主要方法 2.requests.get()和requests.post() 一、urllib网络库 1.urlopen()方法 语法格式: urlopen(url,data,timeout,cafile,capath,context) # url:地址 # data:要提交的数据…

如何避免 sql 注入?

SQL注入是一种常见的安全漏洞,攻击者通过在应用程序的输入字段中插入或“注入”恶意的SQL代码,来影响后端数据库的正常查询。为了避免SQL注入,可以遵循以下最佳实践: 使用参数化查询或预处理语句: 这是防止SQL注入的最…

06 - 步骤 add constants

简介 Add Constants 步骤是用于在数据流中添加常量字段的步骤。它允许用户在数据流中插入一个或多个常量字段,并为这些字段指定固定的数值、字符串或其他类型的常量值。 使用 场景 我需要在数据清后,这个JSON 字符串有一个固定的行流数据。 1、拖拽…

【实时数仓架构】方法论

笔者不是专业的实时数仓架构,这是笔者从其他人经验和网上资料整理而来,仅供参考。写此文章意义,加深对实时数仓理解。 一、实时数仓架构技术演进 1.1 四种架构演进 1)离线大数据架构 一种批处理离线数据分析架构,…

AI图书推荐:ChatGPT写论文的流程与策略

论文一直是任何学术学位的顶峰。它展示了学生在研究领域的兴趣和专业知识。撰写论文也是一个学习经验,为学术工作以及专业研究角色做好准备。但是,论文工作总是艰苦的,通常是充满乐趣和创造性的,但有时也是乏味和无聊的。生成式人…

Cloudflare高级防御规则 看看我的网站如何用防御的

网站已趋于稳定,并且经过nginx调优。我想先分享一下Cloudflare的WAF规则,因为这是最有效的防御之一,可以抵御大量恶意攻击流量,我已经验证了数月。 对于海外独立站电商网站,Cloudflare的CDN服务是首选,它强…

CMake学习详解

目录转到 -> [[…/目录|目录]] 基础 编译源文件:CMakeLists.txt CMake里面变量默认都是字符串 宏 CMAKE_CURRENT_SOURCE_DIR:当前CMakeLists.txt文件所在路径CMAKE_CXX_STANDARD:C++编译标准EXECUTABLE_OUTPUT_PATH:可执行程序输出路径LIBRARY_OUTPUT_PATH:库文件生…

Type-C接口取电IC6500:优势与应用场景的深度解析

Type-C接口PD芯片取电IC的优势 随着科技的不断进步和移动设备在日常生活中的广泛应用,充电技术的革新变得愈发重要。Type-C接口PD芯片取电IC作为现代充电技术的关键组件,其优势日益凸显,为移动设备充电带来了革命性的改变。本文将深入探讨Ty…