1.令牌桶限流算法
算法思想:系统以一定速率生成令牌,存放于桶中,在达到容量的最大值后停止生成令牌。用户生成请求后从令牌桶中消费令牌才能执行。否则延迟执行或被限制。
使用场景:平滑流量控制;在一定程度上可以处理突发流量,但维护了一个延时队列存放token,同时需要一个定时器定期生成token在性能上有损耗。
限流流程如下:
2.time/rate中的limiter核心函数
a.NewLimiter(v,cap)
初始化系统生成令牌的速度,令牌桶的容量。
func NewLimiter(r Limit, b int) *Limiter {return &Limiter{limit: r,burst: b,}
}
burst表示最大并发量
limit表示每秒能够生产token的数量
b.WaitN(ctx,n)
func (lim *Limite) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {lim.mu.Lock()burst := lim.burstlimit := lim.limitlim.mu.Unlock()if n > burst && limit != Inf {
// 大于最大并发量,等待时间不大return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)}// 检查ctx是否已经结束select {case <-ctx.Done():return ctx.Err()default:}// 计算ctx结束前,可以等待的时间waitLimit := InfDurationif deadline, ok := ctx.Deadline(); ok {waitLimit = deadline.Sub(t)}// 预留令牌r := lim.reserveN(t, n, waitLimit)if !r.ok {return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)}// 看是否需要等待生产令牌delay := r.DelayFrom(t)if delay == 0 {return nil}ch, stop, advance := newTimer(delay)//等待生产令牌,期间如果ctx结束会产生预留错误defer stop()advance()select {case <-ch:return nilcase <-ctx.Done():r.Cancel()return ctx.Err()}
}
c.reserveN(t, n, waitLimit)
func (lim *Limite) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {lim.mu.Lock()defer lim.mu.Unlock()//等待无限if lim.limit == Inf {return Reservation{ok: true,lim: lim,tokens: n,timeToAct: t,}} else if lim.limit == 0 {var ok bool//不等待if lim.burst >= n {//桶里容量>ok = truelim.burst -= n}return Reservation{ok: ok,lim: lim,tokens: lim.burst,timeToAct: t,}}
//懒加载,两次请求之间生成的token数量t, tokens := lim.advance(t)
//token不够用,还需要再生成tokens -= float64(n)var waitDuration time.Durationif tokens < 0 {//生成足够token等待时间waitDuration = lim.limit.durationFromTokens(-tokens)}
n的数量<最大并发量 且 等待时间可接受ok := n <= lim.burst && waitDuration <= maxFutureReserver := Reservation{ok: ok,lim: lim,limit: lim.limit,}if ok {r.tokens = nr.timeToAct = t.Add(waitDuration)// 执行等待的限流器,更新数据lim.last = tlim.tokens = tokenslim.lastEvent = r.timeToAct}return r
}
d、advance(t time.Time)
懒加载两次请求中间生成的token数量
func (lim *Limite) advance(t time.Time) (newT time.Time, newTokens float64) {last := lim.last//上次请求时间if t.Before(last) {last = t}elapsed := t.Sub(last)//懒加载两次请求中间生成的token数量//token_n = 所有token数量/capdelta := lim.limit.tokensFromDuration(elapsed)tokens := lim.tokens + deltaif burst := float64(lim.burst); tokens > burst {tokens = burst}//返回桶里的tokenreturn t, tokens
}
e.ctx结束,请求失败使用cancel归还token
预存器:
type Reservation struct {ok boollim *Limitetokens inttimeToAct time.Timelimit Limit
}
预存器记录了当前的预存token,上一次取token操作时间,预取结果
func (r *Reservation) CancelAt(t time.Time) {if !r.ok {return}r.lim.mu.Lock()defer r.lim.mu.Unlock()if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(t) {return}//获得过的tokenrestoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))if restoreTokens <= 0 {return}//获得桶里的tokent, tokens := r.lim.advance(t)// calculate new number of tokenstokens += restoreTokensif burst := float64(r.lim.burst); tokens > burst {tokens = burst}// update stater.lim.last = tr.lim.tokens = tokensif r.timeToAct == r.lim.lastEvent {prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))if !prevEvent.Before(t) {r.lim.lastEvent = prevEvent}}
}
3.使用限流器生成中间件
func Limiter(limit, cap int) MiddlewareFunc {li := rate.NewLimiter(rate.Limit(limit), cap)return func(next HandlerFunc) HandlerFunc {return func(ctx *Context) {//实现限流con, cancel := context.WithTimeout(context.Background(), time.Duration(1)*time.Second)defer cancel()err := li.WaitN(con, 1)if err != nil {//没有拿到令牌的操作li.Reserve().Cancel()ctx.String(http.StatusForbidden, "限流了")return}next(ctx)}}
}