文章目录
- 为什么需要熔断
- Google sre弹性熔断算法
- kratos Breaker源码分析
- 公共接口
- sre实现
- 上报请求结果
- 判定是否熔断
为什么需要熔断
一般来说,当服务器过载(overload)时,需要给client返回服务过载的报错
但是拒接请求也有成本,可能响应错误码本身没啥成本,但处理请求协议栈,构建响应header等也有一笔开销
如果被拒绝的请求数量很大,后端任然会过载,因为其绝大多数CPU都花在拒绝请求上
因此最好的办法是客户端不要将请求发到服务端:当客户端检测到其最近的请求中有很大一部分因“服务过载”错误而被拒绝时,直接在本地失败,不会经过网络IO发给服务端
熔断也可以称为客户端限流
Google sre弹性熔断算法
google sre提供了一种自适应的客户端熔断算法,其维护了过去一段时间内的两个信息:
- requests:往下游发起请求的总数
- accepts:成功的请求数
- 正常情况下,这两个值是相等的
- 但当下游出现异常时,
accepts
会逐渐小于requests
- 一旦requests达到了accepts的
K
倍,客户端就要启动自适应限流,新产生的请求以一定概率被拒绝- 拒绝请求的概率计算公式为: m a x ( 0 , r e q u e s t s − K ∗ a c c e p t s r e q u e s t s + 1 ) max(0, \frac{requests - K * accepts}{requests + 1}) max(0,requests+1requests−K∗accepts)
- 当下游逐渐恢复时,accetps会增加,使得上述公式中分子变为负数,拒绝的概率降为0
可以调整K值,使算法产生不同的效果:
- 减少K值会使得行为更激进,也就是更容易发生熔断
- 增大K值会使得自适应熔断不那么激进
kratos Breaker源码分析
接下来分析kratos中熔断器,其采用了google sre的自适应客户端限流算法
公共接口
熔断器对外暴露3个方法:
Allow
:每次调下游之前判断熔断器状态,根据返回结果决定是否往下游发送请求MarkSuccess
:每次调下游如果成功,上报SuccMarkFailed
:每次调下游如果失败,上报Failed
type CircuitBreaker interface { Allow() error MarkSuccess() MarkFailed()
}
业务中用起来大概是这样:
// 初始化breaker
b := sre.NewBreaker()// 请求下游前判断熔断器状态
if err = breaker.Allow(); err != nil { return
}// 请求下游
err := fn()// 执行成功或失败将结果告知 breaker
if(err != nil){ breaker.MarkFailed()
}else{ breaker.MarkSuccess()
}
sre实现
- stat:维护请求总数和成功数的滑动窗口
- k:熔断算法的K值
- request:开始熔断的请求数阈值,滑动窗口中请求数量达到
request
才开始熔断 - state:熔断器状态,该字段实际没啥用
type Breaker struct { // 滑动窗口stat window.RollingCounter // 随机数产生器,同于根据概率熔断请求r *rand.Rand randLock sync.Mutex // 熔断算法的K值k float64// 开始熔断的请求数阈值request int64 state int32
}
上报请求结果
func (b *Breaker) MarkSuccess() { b.stat.Add(1)
} func (b *Breaker) MarkFailed() { b.stat.Add(0)
}
- MarkSuccess:内部会将总数+1,成功数+1
- MarkFailed:内部只会将总数+1
本文的重点不是滑动窗口,这里知道其干了啥就好
判定是否熔断
- 调
summary()
拿到滑动窗口中的请求总数和成功数 - 如果没达到熔断条件,返回err=nil。两个判定条件:
- 条件一:滑动窗口中请求总数没达到阈值(
total < b.request
) - 条件二:近期失败的数量不够多(
k * accepts > total
)
- 条件一:滑动窗口中请求总数没达到阈值(
- 否则就需要熔断,根据公式计算熔断概率
dr
- 判定是否命中概率:生成一个0~1之间的随机数,如果小于
dr
说明命中
func (b *Breaker) Allow() error { // 拿到滑动窗口中的请求总数和成功数accepts, total := b.summary() requests := b.k * float64(accepts) // 没达到熔断条件 if total < b.request || float64(total) < requests { atomic.CompareAndSwapInt32(&b.state, StateOpen, StateClosed) return nil } // 下面就是需要熔断atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen)// 计算熔断概率 dr := math.Max(0, (float64(total)-requests)/float64(total+1)) drop := b.trueOnProba(dr) // 需要熔断if drop { return circuitbreaker.ErrNotAllowed } return nil
}
从滑动窗口中获得请求总数total和成功请求数success:
func (b *Breaker) summary() (success int64, total int64) { b.stat.Reduce(func(iterator window.Iterator) float64 { for iterator.Next() { bucket := iterator.Bucket() // 统计总数 total += bucket.Count for _, p := range bucket.Points { // 统计成功的数量 success += int64(p) } } return 0 }) return
}
trueOnProba就是生成一个0~1之间的随机数,看是否小于概率proba
func (b *Breaker) trueOnProba(proba float64) (truth bool) { b.randLock.Lock() truth = b.r.Float64() < proba b.randLock.Unlock() return
}