开源限流组件分析(一):juju/ratelimit

embedded/2024/10/19 12:23:40/

文章目录

  • 前言
  • 数据结构
  • 对外提供接口
    • 初始化令牌桶
    • 获取令牌
  • 核心方法
    • adjustavailableTokens
    • currentTick
    • take
    • TakeAvailable
    • Wait系列

前言

这篇文章分析下go开源限流组件juju-ratelimit的使用方式和源码实现细节

源码地址:https://github.com/juju/ratelimit 版本:v1.0.2

其提供了一种高效的令牌桶限流实现

令牌桶相比于其他限流算法(如漏桶算法)的一个显著优势,就是在突发流量到来时,可以短时间内提供更多的处理能力,以应对这些额外的请求

直观上来说,令牌桶算法可以实现为:桶用channel实现

  • 在后台每隔一段固定的时间向桶中发放令牌
  • 要获取令牌时,从channel取数据

后台定时填充令牌:

 func Start(bucket chan struct{}, interval time.Duration) {go func() {ticker := time.NewTicker(interval)defer ticker.Stop()// 每隔interval时间往channel中塞一个令牌for range ticker.C {select {// 放令牌case bucket <- struct{}{} :// 桶满了,丢弃default:}}}()}

业务请求要获取获取令牌时(非阻塞式):

func GetToken(bucket chan struct{}) bool {select {case <- bucket:return truedefault:return false}
}

但这样有多少容量就要开多少空间,对内存不友好

更好的方式是只用一个int变量availableTokens维护桶中有多少token:

  1. 每次有请求要获取令牌时,先根据当前时间now减去上次获取令牌的时间last,计算因为这段时间流逝,应该给桶中补充多少令牌,并加到availableTokens
  2. 如果availableTokens < 本次请求的令牌数 request,说明令牌不够。否则令牌够,放行请求,并根据本次需求的令牌数在桶中扣减 :availableTokens -= request

数据结构

type Bucket struct {clock Clock// bucket创建时间,仅初始化一次,用于计算时间相对偏移量startTime time.Time// 令牌桶最大容量capacity int64// 每次放入多少个令牌quantum int64// 每次放入令牌桶的间隔fillInterval time.Durationmu sync.Mutex// 当前桶中有多少token,注意:可能为负值availableTokens int64// 上一次访问令牌桶的ticklatestTick int64
}

注意:桶中剩余令牌availableTokens可能为负值,至于为啥在下文分析扣减令牌流程时说明

对外提供接口

初始化令牌桶

其提供了几种根据不同需求初始化令牌桶的方法:

fillInterval时间放1个令牌,桶容量为capacity

func NewBucket(fillInterval time.Duration, capacity int64) *Bucket 

fillInterval时间放入quantum个令牌

func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket 

这两个方法都是简单设置Bucket的字段,比较简单

每秒放入rate个令牌:

func NewBucketWithRate(rate float64, capacity int64) *Bucket 

其构造方法如下:主要是需要根据参数rate推算出tb.fillInterval以及tb.quantum

func NewBucketWithRateAndClock(rate float64, capacity int64, clock Clock) *Bucket {tb := NewBucketWithQuantumAndClock(1, capacity, 1, clock)// 每次放入令牌数quantum从1开始,每轮 * 1.1for quantum := int64(1); quantum < 1<<50; quantum = nextQuantum(quantum) {// 假设quantum=1, rate=10,即每次放1个,每秒放10个 => 则放令牌的间隔是0.1sfillInterval := time.Duration(1e9 * float64(quantum) / rate)if fillInterval <= 0 {continue}tb.fillInterval = fillIntervaltb.quantum = quantum// 误差小于0.01就返回tbif diff := math.Abs(tb.Rate() - rate); diff/rate <= rateMargin {return tb}}}

虽然这里用了for循环寻找最合适的fillInterval和quantum,但只要rate小于10亿,都会在执行第一轮循环后退出

所以可以近似看做quantum=1fillInterval=1e9 / rate

例如:quantum=1,当要求 rate=10,即每次放1个,每秒放10个时,计算出来放令牌的时间间隔时是 0.1s

nextQuantum方法:

// 每次增大1.1倍
func nextQuantum(q int64) int64 {q1 := q * 11 / 10if q1 == q {q1++}return q1
}

Rate方法:根据quantumfillInterval,计算每秒放入多少个令牌

func (tb *Bucket) Rate() float64 {// 举个例子:假设每次放入2个,每500ms放一个 => 每秒就放4个return 1e9 * float64(tb.quantum) / float64(tb.fillInterval)
}

获取令牌

非阻塞获取count个令牌,返回的Duration表示调用者需要等待的时间才能获得令牌

func (tb *Bucket) Take(count int64) time.Duration 

如果在maxWait时间内能获取到count个令牌就获取,否则不获取

返回获取成功时需要等待的时间,是否获取成功

func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) 

非阻塞获取最多count个令牌,桶中还有多少获取多少,返回实际获得的令牌数(可能小于count)

func (tb *Bucket) TakeAvailable(count int64) int64

获取count个令牌,并在方法内部等待直到获取到令牌

func (tb *Bucket) Wait(count int64) 

只有当最多阻塞等待maxWait时间,能获取到count个令牌时才等待,并在内部等待。如果不能返回false

func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool

这5个方法中,业务上实用的是WaitMaxDuration,毕竟被限流的请求不能无限等下去

核心方法

adjustavailableTokens

将桶中的令牌数调整为tick时间的令牌数,相当于给桶补充token

  • 计算在tick与上一次lastTick之间能够生产多少个令牌

    • 一般是计算从上次请求到本次请求中产生的令牌数
  • 追加令牌到桶中

func (tb *Bucket) adjustavailableTokens(tick int64) {lastTick := tb.latestTicktb.latestTick = tickif tb.availableTokens >= tb.capacity {return}// 补充令牌tb.availableTokens += (tick - lastTick) * tb.quantumif tb.availableTokens > tb.capacity {tb.availableTokens = tb.capacity}return
}

其参数tick如何计算的?调currentTick方法

currentTick

计算当前时间与开始时间(桶的初始化时间)之间,需要放入多少次令牌

func (tb *Bucket) currentTick(now time.Time) int64 {return int64(now.Sub(tb.startTime) / tb.fillInterval)
}

用于计算当前到哪个tick了

take

Take系列的方法,底层会调到take方法:

Take:

func (tb *Bucket) Take(count int64) time.Duration {tb.mu.Lock()defer tb.mu.Unlock()// 可以等待无限大的时间d, _ := tb.take(tb.clock.Now(), count, infinityDuration)return d
}

TakeMaxDuration:

func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) {tb.mu.Lock()defer tb.mu.Unlock()return tb.take(tb.clock.Now(), count, maxWait)
}

差别在于Take会等待无限大的时间,直到拿到token。TakeMaxDuration最多只会等待maxWait时间


take是获取令牌的核心方法,其流程如下:

  1. 计算出当前时刻可用的令牌数,并补充到令牌桶中
  2. 如果当前令牌桶存量够,在桶中扣减令牌
  3. 如果当前令牌桶存量不够,在桶中预扣减令牌,并返回需要等待的时间waitTime
/** 
当前时间是now
要获取count个令牌
最多等待maxWait时间
*/
func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) {if count <= 0 {return 0, true}tick := tb.currentTick(now)// 给桶补充tokentb.adjustavailableTokens(tick)avail := tb.availableTokens - count// 当前桶中的令牌就能满足需求,直接返回if avail >= 0 {tb.availableTokens = availreturn 0, true}// 到这avail是负数,-avail就是还需要产生多少个令牌,// 要产生avail个令牌,还需要多少个tick(向上取整)// 再加上当前tick,就是能满足需求的tickendTick := tick + (-avail+tb.quantum-1)/tb.quantum// 需要等到这个时间才有令牌endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval)// 需要等待的时间waitTime := endTime.Sub(now)// 如果需要等待的时间超过了最大等待时间,则不等待,也不扣减,直接返回if waitTime > maxWait {return 0, false}// 这里将availableTokens更新为负值tb.availableTokens = avail// 返回要等待的时间,且已经在桶里预扣减了令牌return waitTime, true
}

怎么理解桶中的availableTokens变为负值?表示有请求已经提前预支了令牌,相当于欠账

  • 之后有请求要获取令牌时,需要先等时间流逝,把这些账还了,才能获取成功

    • TakeAvailable方法
  • 当然也可以在之前已欠账的基础上继续欠账,这样要等待更久的时间才能获取令牌成功

    • Take,TakeMaxDuration方法

例如:令牌桶每秒产生1个令牌,假设在**t1时刻**桶中已经没有令牌了
  1. 请求A在t1时刻调Take()获取5个令牌,将availableTokens更新为-5,并返回5s,表示需要等待5s才能让请求放行
  2. 时间过去1s,此时时刻t2 = t1 + 1s
  3. 请求B在t2时刻调Take()获取5个令牌,首先因为时间流逝,将availableTokens更新为-4。再将availableTokens更新为-4 - 5 = -9,也就是继续欠账。返回9s,表示要等待9s才能让请求放行
    在这里插入图片描述

TakeAvailable

非阻塞获取最多count个令牌,桶中还有多少获取多少,返回实际获得的令牌数(可能小于count)

func (tb *Bucket) TakeAvailable(count int64) int64 {tb.mu.Lock()defer tb.mu.Unlock()return tb.takeAvailable(tb.clock.Now(), count)
}// 返回可用的令牌数(可能小于count),如果没可用的令牌,将返回0,不会阻塞
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {if count <= 0 {return 0}// 基于当前时间,给桶补充令牌tb.adjustavailableTokens(tb.currentTick(now))if tb.availableTokens <= 0 {return 0}// 获取max(count, availableTokens)个令牌,也就是有多少就获取多少if count > tb.availableTokens {count = tb.availableTokens}tb.availableTokens -= countreturn count
}

Wait系列

Wait系列的两个方法Wait和WaitMaxDuration就是对Take的封装,也就是如果需要等待,在Wait方法内部等待

func (tb *Bucket) Wait(count int64) {if d := tb.Take(count); d > 0 {// 在内部等待tb.clock.Sleep(d)}
}func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool {d, ok := tb.TakeMaxDuration(count, maxWait)if d > 0 {tb.clock.Sleep(d)}return ok
}

http://www.ppmy.cn/embedded/128737.html

相关文章

精心整理85道Java微服务面试题(含答案)

微服务 面试题 1、您对微服务有何了解&#xff1f; 2、微服务架构有哪些优势&#xff1f; 3。微服务有哪些特点&#xff1f; 4、设计微服务的最佳实践是什么&#xff1f; 5、微服务架构如何运作&#xff1f; 6、微服务架构的优缺点是什么&#xff1f; 7、单片&#xff0…

iPad备份软件哪个好?好用的苹果备份软件推荐

苹果手机在将数据备份到电脑时&#xff0c;需要通过第三方的管理软件&#xff0c;才可以将手机连接到电脑进行备份。苹果手机备份软件有很多&#xff0c;常用的有&#xff1a;爱思助手、iMazing、iTuns等。那么这三款常用的备份软件究竟哪款更好呢&#xff1f;下面就给大家盘点…

Diffusion Mechanism in Residual Neural Network: Theory and Applications

残差神经网络中的扩散机制&#xff1a;理论与应用 作者&#xff1a;Tangjun Wang; Zehao Dou; Chenglong Bao; Zuoqiang Shi 源码链接&#xff1a;https://github.com/shwangtangjun/Diff-ResNet 摘要 扩散是一种在许多物理过程中出现的基本内部机制&#xff0c;描述了不同…

【服务器部署】Docker部署小程序

一、下载Docker 安装之前&#xff0c;一定查看是否安装docker&#xff0c;如果有&#xff0c;卸载老版本 我是虚拟机装的Centos7&#xff0c;linux 3.10 内核&#xff0c;docker官方说至少3.8以上&#xff0c;建议3.10以上&#xff08;ubuntu下要linux内核3.8以上&#xff0c…

C++算法练习-day7——707.设计链表

题目来源&#xff1a;. - 力扣&#xff08;LeetCode&#xff09; 题目思路分析 在编程中&#xff0c;链表是一种常见的数据结构&#xff0c;用于存储一系列元素&#xff0c;但与数组不同&#xff0c;链表中的元素在内存中不必连续存储。每个元素&#xff08;称为节点&#xf…

Linux权限管理

Linux权限管理是Linux系统中保证系统安全性和管理效率的重要手段。Linux权限主要涉及对文件和目录的操作权限,包括读(r)、写(w)和执行(x)三种基本权限。 一、Linux权限的基本概念 权限类型:Linux中的权限分为读(r)、写(w)和执行(x)三种。这些权限分别对应于文件…

【AIGC】优化长提示词Prompt:提升ChatGPT输出内容的准确性与实用性

博客主页&#xff1a; [小ᶻZ࿆] 本文专栏: AIGC | ChatGPT 文章目录 &#x1f4af;前言&#x1f4af;长提示词的挑战&#x1f4af;谷歌的优化长提示词技术关键因素分析 &#x1f4af;长提示词的设计原则&#x1f4af;优化长提示词的新框架方法&#x1f4af;实验结果分析不…

生产力工具|vscode for mac的安装python库和使用虚拟环境(一)

一、在vscode中运行python代码&#xff08;mac或windows&#xff09; &#xff08;一&#xff09;在vscode中安装Python插件 若想在vscode中高效率的编辑Python代码&#xff0c;需要安装Python插件&#xff0c;点击下图中红框内的按钮&#xff1a; 然后在左上角的搜索框中输入…