note-Redis实战5 核心-构建应用程序组件1

news/2024/9/28 17:35:17/
助记提要
  1. 使用Redis列表实现自动补全
  2. 列表实现自动补全的限制
  3. 使用Redis有序集合实现自动补全
  4. 锁的异常状态 4种
  5. 使用SETNX命令实现简单的锁
  6. 细粒度锁
  7. 给锁加上超时限制
  8. 三种信号量的实现和适用条件(基本、公平、加锁)

6章 构建应用程序组件1

构建常用组件:自动补全、分布式锁、并发控制

6.1 自动补全

自动补全能让用户在不进行搜索的情况下,快速找到所需东西的技术。

自动补全最近联系人

需求:记录用户最近联系过的100个人,并能按用户已经输入的文字列出以这些文字开头的人。

使用这个功能的用户会很多,需要尽可能减少服务端占用的内存。所以使用列表来存储这些联系人信息。列表的移除操作耗时与列表长度相关,限制在较短的长度时速度也够快。
Redis列表可以满足存储和更新的功能,过滤操作由Python执行。

# 添加或更新最近联系人
def add_update_contact(conn, user, contact):ac_list = 'recent:' + userpipeline = conn.pipeline(True)# 如果当前联系人已存在,就先删掉pipeline.lrem(ac_list, contact)# 把该联系人添加到最前面pipeline.lpush(ac_list, contact)# 添加完成,仅保留前100个联系人pipeline.ltrim(ac_list, 0, 99)pipeline.execute()# 用户移除指定的联系人
def remove_contact(conn, user, contact):conn.lrem('recent:' + user, contact)# 获取自动补全列表并查找匹配的用户
def fetch_autocomplete_list(conn, user, prefix):# 获取自动列表的全部联系人candidates = conn.lrange('recent:' + user, 0, -1)matches = []for candidate in candidates:if candidate.lower().startswith(prefix):matches.append(candidate)return matches
通讯录自动补全

自动补全列表较小时,使用Redis列表存储,实际补全操作交给Python执行。
但是对于很长的列表时,每次都获取全部元素会浪费很多资源。为了向客户端传输更少的数据,需要在服务器完成自动补全的前缀计算工作。

  • Redis如何实现自动补全功能
    使用有序集合,成员为联系人,把分值都设为0。这样联系人会按照成员名字排序。
    自动补全列表排好序以后,按前缀查找实际上找到的就是列表中的一整段字符串序列。如查找abc前缀的单词,实际上就是找列表中abbz…到abd之间的字符串。
    如果知道abbz之后的第一个元素的排名,和abd之前的第一个元素的排名,就能通过ZRANGE获取它们之间的元素。
    字符串的排序都是按ASCII字符顺序排的,把给定前缀的最后一个字符替换为第一个排在该字符前面的字符,可以得到前缀的前驱;在前缀的末尾加上左花括号{(排在z之后的字符),就能得到前缀的后继。

根据前缀生成查找范围:

# 已知字符组成的序列
valid_characters = '\`abcdefghijklmnopqrstuvwxyz{'def find_prefix_range(prefix):# 在字符序列中查找前缀末尾字符所处的位置posn = bisect.bisect_left(valid_characters, prefix[-1])# 找到末尾字符的前驱字符suffix = valid_characters[(posn or 1) - 1]# 返回前驱和后继return prefix[:-1] + suffix + '{', prefix + '{'

对于ASCII码以外的字符,需要先把字符转为字节。然后找到想要支持的字符范围,确保所选范围的前后都留有字符,并使用这个范围前面和后面的字符替换上述例子中的反引号和左花括号。

完整的自动补全程序:

def autocomplete_on_prefix(conn, guild, prefix):# guild是示例场景中的玩家公会# 根据前缀计算查找范围的起点和终点start, end = find_prefix_range(prefix)identifier = str(uuid.uuid4())start += identifierend += identifier# 将起点和终点添加到有序集合zset_name = 'members:' + guildconn.zadd(zset_name, start, 0, end, 0)pipeline = conn.pipeline()while 1:try:pipeline.watch(zset_name)# 找被插入元素的排名sindex = pipeline.zrank(zset_name, start)eindex = pipeline.zrank(zset_name, end)erange = min(sindex + 9, eindex - 2)pipeline.multi()# 获取范围内的值,然后删除插入的元素pipeline.zrange(zset_name, sindex, erange)pipeline.zrem(zset_name, start, end)items = pipeline.execute()[-1]breakexcept redis.exceptions.WatchError:# 如果自动补全集合被其他客户端修改过了,就重试continue# return [item for item in items if '{' not in item]

玩家加入和退出公会

# 加入公会
def join_guild(conn, guild, user):conn.zadd('members:' + guild, user, 0)# 退出公会
def leave_guild(conn, guild, user):conn.zrem('members:' + guild, user)

简要概括上述操作,向有序集合添加元素来创建查找范围,并在取得范围内的元素后移除之前添加的元素。这种操作能应用到任何已排序索引上。

6.2 分布式锁

分布式锁的使用操作和普通的锁一样,但是它可以由不同机器上的不同客户端使用。

  • 使用Redis手动构建分布式锁的原因:
    所有能访问Redis数据的客户端也都能访问这个锁,来对Redis数据进行排他性访问;
    Redis的SETNX命令只有基本的加锁功能,不具备的分布式锁的高级特性。
    使用WATCH、MULTI和EXEC等命令的事务,在负载高的情况下,系统重试次数会很高,这种方式扩展性很差。
锁的异常状态
  1. 持有锁的进程因为操作时间过长而导致锁被自动释放,但是进程本身不知晓这一点,甚至还释放掉其他进程的锁;
  2. 一个持有锁并打算执行长时间操作的进程崩溃,其它想要获取锁的进程不知道哪个进程持有锁,也无法检测持有锁的进程已崩溃,只能浪费时间等待;
  3. 一个进程的锁过期以后,其他多个进程同时尝试获取锁,并且都取到了;
  4. 状况1和3同时出现,导致多个进程获得锁,并认为自己是唯一获取锁的进程;

刚开始构建锁的时候,不会立即处理可能导致锁无法正常运行的问题,而是先做出可以运行的锁获取和释放过程,证明锁有利于性能提升后,才会回头解决引发锁故障的问题。

使用Redis构建锁

把之前的商品交易场景加锁实现。相关数据结构简写如下:

数据结构说明内容
有序集合市场market:成员是“商品名.用户ID”,分值是商品价格
哈希用户信息users:用户ID用户名、用户存款
集合用户的商品inventory:用户ID商品名

获取锁

def acquire_lock(conn, lockname, acquire_timeout=10):# uuid创建随机标识符,用于防止被其他进程取到锁identifier = str(uuid.uuid4())end = time.time() + acquire_timeoutwhile time.time() < end:# SETNX只会在键不存在的情况下为键设置值if conn.setnx('lock:' + lockname, identifier)return identifier# 一直重试,直到获取锁或超过时限time.sleep(.001)return False

交易操作

def purchase_item_with_lock(conn, buyerid, itemid, sellerid):buyer = 'users:%s' % buyeridseller = 'users:%s' % selleriditem = '%s.%s' % (itemid, sellerid)inventory = 'inventory:%s' % buyeridlocked = acquire_lock(conn, market)if not locked:return Falsepipe = conn.pipeline(True)try:# 检查商品是否在售,买家钱是否够pipe.zscore('market:', item)pipe.hget(buyer, 'funds')price, funds = pipe.execute()if price is None or price > funds:return None# 付钱、交货pipe.hincrby(seller, 'funds', int(price))pipe.hincrby(buyer, 'funds', int(-price))pipe.sadd(inventory, itemid)pipe.zrem('market:', item)pipe.execute()return Truefinally:release_lock(conn, market, locked)

释放锁

def release_lock(conn, lockname, identifier):pipe = conn.pipeline(True)lockname = 'lock:' + locknamewhile True:try:pipe.watch(lockname)if pipe.get(lockname) == identifier:pipe.multi()pipe.delete(lockname)pipe.execute()return Truepipe.unwatch()break# 其他客户端修改了锁,重试except redis.exceptions.WatchError:pass# 进程失去了锁return False

和使用WATCH相比,使用锁进行交易在买入商品时不需要进行重试,每次购买的平均耗时大大降低。
这时多个上架或多个买入进程之间存在的竞争是限制交易操作性能进一步提升的关键。

细粒度锁

每次交易中,关心的是某件商品,而不是整个市场。所以加锁的粒度可以细一些,只锁住被买卖的商品而不是市场。这样能减少锁竞争出现的几率并提升程序的性能。

在需要锁住的数据有不止一份或需要锁住结构的多个部分时,对锁粒度的判断会变得困难。
使用多个细粒度锁还有引发死锁的危险。

锁的超时限制

给锁加上超时限制特性,可以确保锁总会被释放,而不会被某个客户端一直持有。

def acquire_lock_with_timeout(conn, lockname, acquire_timeout=10, lock_timeout=10):identifier = str(uuid.uuid4())lockname = 'lock:' + lockname# expire命令只能接受整数lock_timeout = int(math.ceil(lock_timeout))end = time.time() + acquire_timeoutwhile time.time() < end:if conn.setnx(lockname, identifier):# 为获取的锁设置过期时间conn.expire(lockname, lock_timeout)return identifierelif not conn.ttl(lockname):# 检查到未设置超时时间就进行设置conn.expire(lockname, lock_timeout)time.sleep(.001)return False

6.3 计数信号量

计数信号量是一种锁,可以限制一项资源最多能同时被多少个进程访问。

普通的锁在获取失败时,客户端一般会等待;计数信号量获取失败时,客户端会立即返回失败结果(系统繁忙之类)。

基本的计数信号量

使用有序集合构建计数信号量,成员为每个进程的唯一标识符,分值为进程尝试获取信号量的时间戳。
进程尝试获取信号量时,会把自身标识符添加到有序集合,如果排名低于可获取的信号量总数,就可以成功获取信号量。否则就无法获取,且需要从有序集合中移除自身的标识符。
标识符添加到有序集合之前,会先把有序集合里时间戳大于超市数值的标识符清理掉。

获取信号量

def acquire_semaphore(conn, semname, limit, timeout=10):identifier = str(uuid.uuid4())now = time.time()pipeline = conn.pipeline(True)# 清理过期的信号量pipeline.zremrangebyscore(semname, '-inf', now - timeout)pipeline.zadd(semname, identifier, now)# 是否可以获取信号量pipeline.zrank(semname, identifier)if pipeline.execute()[-1] < limit:return identifier# 获取失败,删除添加的标识符conn.zrem(semname, identifier)return None

释放信号量

def release_semaphore(conn, semname, identifier):# 释放成功返回True,已经过期被删则返回Falsereturn conn.zrem(semname, identifier)

程序会按照系统的当前时间作为获取信号量的时间插入有序集合内。如果两个系统的时间有差异,如A系统比B系统快10毫秒。当A获取最后一个信号量后,B如果在10毫秒内去取信号量,就会取走A获取的信号量。

公平信号量

当锁或者信号量会因为系统时钟的细微不同而导致获取结果产生变化时,这个锁或信号量就是不公平的。
系统时钟较慢的系统上运行的客户端,会取走系统时钟较快的系统上运行的客户端已获得的信号量。

新增数据结构,通过自增的计数值来排名,而不是通过客户端的系统时间排名。客户端的系统时间记录,仅用于判断是否超时。

数据结构说明内容注意
有序集合信号量时间semaphore:remote成员是客户端标识符,分值是客户端获取信号量的时间用于控制信号量超时时间
字符串信号量计数器semaphore:remote:counter整型值最先对计数器执行自增操作的客户端能获得信号量
有序集合信号量拥有者semaphore:remote:owner成员是客户端标识符,分值是计数器生成的值计数值排名靠前的客户端可获取信号量

获取公平信号量

def acquire_fair_semaphore(conn, semname, limit, timeout=10):identifier = str(uuid.uuid4())czset = semname + ':owner'ctr = semname + ':counter'now = time.time()pipeline = conn.pipeline(True)# 删除超时信号量pipeline.zremrangebyscore(semname, '-inf', now - timeout)# 把不存在于超时有序集合中的标识符,在拥有者有序集合也删掉pipeline.zinterstore(czset, {czset: 1, semname: 0})# 获取计数器自增后的值pipeline.incr(ctr)counter = pipeline.execute()[-1]# 尝试获取信号量pipeline.zadd(semname, identifier, now)pipeline.zadd(czset, identifier, counter)# 检查排名,判断是否取到信号量pipeline.zrank(czset, identifier)if pipeline.execute()[-1] < limit:return identifier# 信号量获取失败,清理无用数据pipeline.zrem(semname, identifier)pipeline.zrem(czset, identifier)pipeline.execute()return None

释放信号量

def release_fair_semaphore(conn, semname, identifier):pipeline = conn.pipeline(True)pipeline.zrem(semname, identifier)pipeline.zrem(semname + ':owner', identifier)# True表示成功删除,False表示信号量因超时被删除return pipeline.execute()[0]

用计数器判断排名的信号量,不需要各个主机拥有相同的系统时间。但主机之间的时间差需要在1-2秒内,避免信号量过早或太晚释放。

  • 释放信号量时必须同时清理两个有序集合
    获取信号量的时候,更新拥有者有序集合的方式是删除掉超时有序集合中不存在的标识符。
    释放的时候只清理超时有序集合,理论上也不会有问题。
    这样做有可能的问题是,如果一个客户端在获取信号量,刚更新过超时有序集合,正准备把标识符加到两个有序集合,另一个客户端执行信号量释放函数把自己的标识符从超时有序集合移除(拥有者有序集合还有数据),前一个客户端本该成功的获取操作就会失败。
刷新信号量

如果应用程序使用信号量的时间要求大于10秒,就需要对信号量刷新,防止它过期。

def refresh_fair_semaphore(conn, semname, identifier):# 更新客户端的信号量if conn.zadd(semname, identifier, time.time()):# 客户端已失去信号量release_fair_semaphore(conn, semname, identifier)return False# 客户端仍持有信号量return True
消除竞争条件

计数器信号量消除了系统时间带来的不确定性,但是获取信号量的程序执行过程中,需要客户端和服务器进行多次通信。
当进程A和B同时获取剩余的一个信号量时,A先对计数器做了自增操作,但是由于A网络延迟等原因,B抢先把自己的标识符加到了有序集合中,并检查了排名,那么B就会成功获取信号量。之后恢复网络的A添加标识符后检查排名,就会偷走B已取得的信号量。B只有在释放或刷新时才会发现。

一个正确的计数器信号量,需要使用分布式锁。

def acquire_semaphore_with_lock(conn, semname, limit, timeout=10):identifier = acquire_lock(conn, semname, acquire_timeout=.01)if identifier:try:return acquire_fair_semaphore(conn, semname, limit, timeout)finally:release_lock(conn, semname, identifier)
三种信号量的适用
  • 简单信号量
    系统时钟不重要、不需要刷新、允许信号量数目偶尔超过限制;
  • 公平信号量
    差距在1-2秒的系统时钟是可接受的,允许信号量数量偶尔超过限制;
  • 正确的公平信号量(加锁)
    信号量行为一直都是正确的;

http://www.ppmy.cn/news/1530735.html

相关文章

气压高度加误差的两种方法(直接添加 vs 换算到气压误差),附MATLAB程序

在已知高度真实值时,如果需要计算此高度下的气压计误差,可考虑本文所述的两种方法 气压高度 气压与高度之间的关系可以用大气压的垂直变化来描述。随着高度的增加,气压通常会下降。这是因为空气的密度在高度增加时减少,导致上方空气柱对下方空气施加的压力减小。 主要关系…

LLM - 使用 XTuner 指令微调 多模态大语言模型(InternVL2) 教程

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/142528967 免责声明&#xff1a;本文来源于个人知识与公开资料&#xff0c;仅用于学术交流&#xff0c;欢迎讨论&#xff0c;不支持转载。 XTuner…

【C语言】const char*强制类型转换 (type cast)的告警问题

void run_upload(const char *ftp_url) {CircularQueue queue;// 初始化环形队列for (int i = 0; i < QUEUE_SIZE; i++) {queue.items[i].data = malloc(BUFFER_SIZE);if (queue.items[i].data == NULL) {fprintf(stderr, "Failed to allocate memory for queue item %…

在nuxt中集成mars3d

创建一个nuxt项目 创建一个项目&#xff0c;安装依赖 安装mars3d ,安装mars3d-cesium 替换app.vue <template><div id"mars3dContainer" class"mars3d-container"></div><!-- <div>123</div> --> </template&…

初识C#(二)- 流程控制

我希望能把自己的命运掌握在自己的手里《流程控制》 文章目录 前言一、分支语句1.1 如此如此这般这般的if语句1.2 一次满足多个愿望的switch语句 二、循环语句2.1 漫无目的没因没果的while循环2.2 会灵活设定目标的for循环2.3 先动起来再想其他事情的do while循环 总结 前言 本…

JVM 类加载机制

类加载 在 JVM 虚拟机实现规范中,通过 ClassLoader 类加载器把 *. class 字节码文件(文件流)加载到内存,并对字节码文件内容进行验证、准备、解析和初始化,最终形成可以被虚拟机直接使用的 java.lang.class 对象,这个过程被称作类加载。 类是在运行期间第一次使用时,被类加载器…

解决Mac 默认设置 wps不能双面打印的问题

目录 问题描述&#xff1a; 问题解决&#xff1a; 问题描述&#xff1a; 使用mac电脑的时候&#xff0c;发现wps找不到双面打印的按钮&#xff0c;导致使用wps打开的所有文件都不能自动双面打印 问题解决&#xff1a; mac的wps也是有双面打印的选项&#xff0c;只是默认被关…

基于MioIO的图片工具Thumbor

参考文章&#xff1a;基于MinIO和Thumbortc_awsnginx&#xff08;docker版&#xff09;搭建图像缩略图服务_minio上传图片生成缩略图-CSDN博客