【源码解析】流控框架Sentinel源码深度解析

news/2024/11/8 5:58:13/

前言

前面写了一篇Sentinel的源码解析,主要侧重点在于Sentinel流程的运转原理。流控框架Sentinel源码解析,侧重点在整个流程。该篇文章将对里面的细节做深入剖析。

统计数据

StatisticSlot用来统计节点访问次数

@SpiOrder(-7000)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {try {// Do some checking.fireEntry(context, resourceWrapper, node, count, prioritized, args);// Request passed, add thread count and pass count.node.increaseThreadNum();node.addPassRequest(count);//...} catch (Throwable e) {// Unexpected internal error, set error to current entry.context.getCurEntry().setError(e);throw e;}}

StatisticNode#addPassRequest

    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,IntervalProperty.INTERVAL);@Overridepublic void addPassRequest(int count) {rollingCounterInSecond.addPass(count);rollingCounterInMinute.addPass(count);}

ArrayMetric#addPassOccupiableBucketLeapArray获取当前窗口,窗口中的请求数据增长。

     public ArrayMetric(int sampleCount, int intervalInMs) {this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);}@Overridepublic void addPass(int count) {WindowWrap<MetricBucket> wrap = data.currentWindow();wrap.value().addPass(count);}

MetricBucket#addPass,对应事件请求数增加。

    public void addPass(int n) {add(MetricEvent.PASS, n);}public MetricBucket add(MetricEvent event, long n) {counters[event.ordinal()].add(n);return this;}

LeapArraysampleCount定义了窗口数,数组大小;intervalInMs定义了统计的时间间隔,windowLengthInMs定义了每一个窗口的时间间隔。

    public LeapArray(int sampleCount, int intervalInMs) {AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");this.windowLengthInMs = intervalInMs / sampleCount;this.intervalInMs = intervalInMs;this.sampleCount = sampleCount;this.array = new AtomicReferenceArray<>(sampleCount);}

LeapArray#currentWindow(),计算索引值和对应窗口的开始时间。如果可以根据索引值获取不到窗口,新建窗口;如果获取到了窗口,判断是否已经过期,过期就更新开始时间,请求数重置;如果获取到了窗口,且判断未过期,返回。

    public WindowWrap<T> currentWindow() {return currentWindow(TimeUtil.currentTimeMillis());}public WindowWrap<T> currentWindow(long timeMillis) {if (timeMillis < 0) {return null;}int idx = calculateTimeIdx(timeMillis);// Calculate current bucket start time.long windowStart = calculateWindowStart(timeMillis);while (true) {WindowWrap<T> old = array.get(idx);if (old == null) {WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));if (array.compareAndSet(idx, null, window)) {// Successfully updated, return the created bucket.return window;} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart == old.windowStart()) {return old;} else if (windowStart > old.windowStart()) {if (updateLock.tryLock()) {try {// Successfully get the update lock, now we reset the bucket.return resetWindowTo(old, windowStart);} finally {updateLock.unlock();}} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart < old.windowStart()) {// Should not go through here, as the provided time is already behind.return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));}}}private int calculateTimeIdx(/*@Valid*/ long timeMillis) {long timeId = timeMillis / windowLengthInMs;// Calculate current index so we can map the timestamp to the leap array.return (int)(timeId % array.length());}protected long calculateWindowStart(/*@Valid*/ long timeMillis) {return timeMillis - timeMillis % windowLengthInMs;}

StatisticNode#passQps,计算通过的QPS,通过数量除以时间间隔。

    @Overridepublic double passQps() {return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();}

ArrayMetric#pass,过滤掉失效的窗口

    @Overridepublic long pass() {data.currentWindow();long pass = 0;List<MetricBucket> list = data.values();for (MetricBucket window : list) {pass += window.pass();}return pass;}

LeapArray#values(),获取所有生效的窗口数据,求和。

    public List<T> values() {return values(TimeUtil.currentTimeMillis());}public List<T> values(long timeMillis) {if (timeMillis < 0) {return new ArrayList<T>();}int size = array.length();List<T> result = new ArrayList<T>(size);for (int i = 0; i < size; i++) {WindowWrap<T> windowWrap = array.get(i);if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {continue;}result.add(windowWrap.value());}return result;}

流控

FlowSlot用于流控管理

@SpiOrder(-2000)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {private final FlowRuleChecker checker;public FlowSlot() {this(new FlowRuleChecker());}@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {checkFlow(resourceWrapper, context, node, count, prioritized);fireEntry(context, resourceWrapper, node, count, prioritized, args);}void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)throws BlockException {checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);}private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {@Overridepublic Collection<FlowRule> apply(String resource) {// Flow rule map should not be null.Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();return flowRules.get(resource);}};
}

FlowRuleChecker#checkFlow,获取到的FlowRule不为空,挨个判断。获取FlowRuleTrafficShapingController,进行初始化。

public class FlowRuleChecker {public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {if (ruleProvider == null || resource == null) {return;}Collection<FlowRule> rules = ruleProvider.apply(resource.getName());if (rules != null) {for (FlowRule rule : rules) {if (!canPassCheck(rule, context, node, count, prioritized)) {throw new FlowException(rule.getLimitApp(), rule);}}}}public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {String limitApp = rule.getLimitApp();if (limitApp == null) {return true;}if (rule.isClusterMode()) {return passClusterCheck(rule, context, node, acquireCount, prioritized);}return passLocalCheck(rule, context, node, acquireCount, prioritized);}private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);if (selectedNode == null) {return true;}return rule.getRater().canPass(selectedNode, acquireCount, prioritized);}
}

FlowRuleUtil#generateRater,根据rule生成对应的TrafficShapingController

    private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {switch (rule.getControlBehavior()) {case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),ColdFactorProperty.coldFactor);case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:default:// Default mode or unknown mode: default traffic shaping controller (fast-reject).}}return new DefaultController(rule.getCount(), rule.getGrade());}

DefaultController#canPass(Node, int),当前数量加请求数量大于规则配置的数量,返回false。

    @Overridepublic boolean canPass(Node node, int acquireCount) {return canPass(node, acquireCount, false);}@Overridepublic boolean canPass(Node node, int acquireCount, boolean prioritized) {int curCount = avgUsedTokens(node);if (curCount + acquireCount > count) {if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {long currentTime;long waitInMs;currentTime = TimeUtil.currentTimeMillis();waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {node.addWaitingRequest(currentTime + waitInMs, acquireCount);node.addOccupiedPass(acquireCount);sleep(waitInMs);// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.throw new PriorityWaitException(waitInMs);}}return false;}return true;}private int avgUsedTokens(Node node) {if (node == null) {return DEFAULT_AVG_USED_TOKENS;}return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());}

针对于突发大流量情况下可能存在把系统压垮而设计的限流。通过预热,让流量缓慢增加。设想一下,在一段时间内运输中,系统长期处于低水位的情况下,当流量突然增加时,会直接把系统拉升到高水位,并有可能瞬间把系统压垮。但通过冷启动,可以让流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮。

WarmUpController,当超过warningToken,进行预热;不在预热阶段,直接判断passQps + acquireCount <= count。令牌桶算法,每通过一个请求,就会从令牌桶中取走一个令牌。当令牌桶中的令牌达到最大值的时候,意味着系统目前处于最冷阶段,因为桶里的令牌始终处于一个非常饱和的状态。(有点绕,需要多看几遍)

    public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {construct(count, warmUpPeriodInSec, coldFactor);}public WarmUpController(double count, int warmUpPeriodInSec) {construct(count, warmUpPeriodInSec, 3);}private void construct(double count, int warmUpPeriodInSec, int coldFactor) {if (coldFactor <= 1) {throw new IllegalArgumentException("Cold factor should be larger than 1");}this.count = count;this.coldFactor = coldFactor;// thresholdPermits = 0.5 * warmupPeriod / stableInterval.// warningToken = 100;warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);// / maxPermits = thresholdPermits + 2 * warmupPeriod /// (stableInterval + coldInterval)// maxToken = 200maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));// slope// slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits// - thresholdPermits);slope = (coldFactor - 1.0) / count / (maxToken - warningToken);}@Overridepublic boolean canPass(Node node, int acquireCount) {return canPass(node, acquireCount, false);}@Overridepublic boolean canPass(Node node, int acquireCount, boolean prioritized) {long passQps = (long) node.passQps();long previousQps = (long) node.previousPassQps();syncToken(previousQps);// 开始计算它的斜率// 如果进入了警戒线,开始调整他的qpslong restToken = storedTokens.get();if (restToken >= warningToken) {long aboveToken = restToken - warningToken;// 消耗的速度要比warning快,但是要比慢// current interval = restToken*slope+1/countdouble warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));if (passQps + acquireCount <= warningQps) {return true;}} else {if (passQps + acquireCount <= count) {return true;}}return false;}protected void syncToken(long passQps) {long currentTime = TimeUtil.currentTimeMillis();currentTime = currentTime - currentTime % 1000;long oldLastFillTime = lastFilledTime.get();if (currentTime <= oldLastFillTime) {return;}long oldValue = storedTokens.get();long newValue = coolDownTokens(currentTime, passQps);if (storedTokens.compareAndSet(oldValue, newValue)) {long currentValue = storedTokens.addAndGet(0 - passQps);if (currentValue < 0) {storedTokens.set(0L);}lastFilledTime.set(currentTime);}}private long coolDownTokens(long currentTime, long passQps) {long oldValue = storedTokens.get();long newValue = oldValue;// 添加令牌的判断前提条件:// 当令牌的消耗程度远远低于警戒线的时候if (oldValue < warningToken) {newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);} else if (oldValue > warningToken) {if (passQps < (int)count / coldFactor) {newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);}}return Math.min(newValue, maxToken);}

RateLimiterController,记录两个请求之间允许通过的最小时间。获取最近一次请求的时间,判断当前时间减去最近一次请求的时间是否大于两个请求的最小间隔时间,大于就放行,否则就进入睡眠等待。

public class RateLimiterController implements TrafficShapingController {private final int maxQueueingTimeMs;private final double count;private final AtomicLong latestPassedTime = new AtomicLong(-1);public RateLimiterController(int timeOut, double count) {this.maxQueueingTimeMs = timeOut;this.count = count;}@Overridepublic boolean canPass(Node node, int acquireCount) {return canPass(node, acquireCount, false);}@Overridepublic boolean canPass(Node node, int acquireCount, boolean prioritized) {// Pass when acquire count is less or equal than 0.if (acquireCount <= 0) {return true;}// Reject when count is less or equal than 0.// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.if (count <= 0) {return false;}long currentTime = TimeUtil.currentTimeMillis();// Calculate the interval between every two requests.long costTime = Math.round(1.0 * (acquireCount) / count * 1000);// Expected pass time of this request.long expectedTime = costTime + latestPassedTime.get();if (expectedTime <= currentTime) {// Contention may exist here, but it's okay.latestPassedTime.set(currentTime);return true;} else {// Calculate the time to wait.long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();if (waitTime > maxQueueingTimeMs) {return false;} else {long oldTime = latestPassedTime.addAndGet(costTime);try {waitTime = oldTime - TimeUtil.currentTimeMillis();if (waitTime > maxQueueingTimeMs) {latestPassedTime.addAndGet(-costTime);return false;}// in race condition waitTime may <= 0if (waitTime > 0) {Thread.sleep(waitTime);}return true;} catch (InterruptedException e) {}}}return false;}}

降级

熔断状态有以下三种:

状态说明
OPEN熔断开启状态,拒绝所有请求
HALF_OPEN半开状态,如果接下来的一个请求顺利通过则表示结束熔断,否则继续熔断
CLOSE熔断关闭状态,请求顺利通过

DegradeSlot获取CircuitBreaker集合,挨个执行CircuitBreaker#tryPass,放行通过后会执行CircuitBreaker#onRequestComplete

@SpiOrder(-1000)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {performChecking(context, resourceWrapper);fireEntry(context, resourceWrapper, node, count, prioritized, args);}void performChecking(Context context, ResourceWrapper r) throws BlockException {List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());if (circuitBreakers == null || circuitBreakers.isEmpty()) {return;}for (CircuitBreaker cb : circuitBreakers) {if (!cb.tryPass(context)) {throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());}}}@Overridepublic void exit(Context context, ResourceWrapper r, int count, Object... args) {Entry curEntry = context.getCurEntry();if (curEntry.getBlockError() != null) {fireExit(context, r, count, args);return;}List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());if (circuitBreakers == null || circuitBreakers.isEmpty()) {fireExit(context, r, count, args);return;}if (curEntry.getBlockError() == null) {// passed requestfor (CircuitBreaker circuitBreaker : circuitBreakers) {circuitBreaker.onRequestComplete(context);}}fireExit(context, r, count, args);}
}

DegradeRuleManager#newCircuitBreakerFrom,根据DegradeRule生成对应的CircuitBreaker

    private static CircuitBreaker newCircuitBreakerFrom(/*@Valid*/ DegradeRule rule) {switch (rule.getGrade()) {case RuleConstant.DEGRADE_GRADE_RT:return new ResponseTimeCircuitBreaker(rule);case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:return new ExceptionCircuitBreaker(rule);default:return null;}}

AbstractCircuitBreaker#tryPass,如果当前状态是关闭,放行;如果是半开,拒绝请求;如果是开启状态,过了重试时间就修改为半开状态。

    @Overridepublic boolean tryPass(Context context) {// Template implementation.if (currentState.get() == State.CLOSED) {return true;}if (currentState.get() == State.OPEN) {// For half-open state we allow a request for probing.return retryTimeoutArrived() && fromOpenToHalfOpen(context);}return false;}protected boolean retryTimeoutArrived() {return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;}protected void updateNextRetryTimestamp() {this.nextRetryTimestamp = TimeUtil.currentTimeMillis() + recoveryTimeoutMs;}protected boolean fromCloseToOpen(double snapshotValue) {State prev = State.CLOSED;if (currentState.compareAndSet(prev, State.OPEN)) {updateNextRetryTimestamp();notifyObservers(prev, State.OPEN, snapshotValue);return true;}return false;}protected boolean fromOpenToHalfOpen(Context context) {if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {notifyObservers(State.OPEN, State.HALF_OPEN, null);Entry entry = context.getCurEntry();entry.whenTerminate(new BiConsumer<Context, Entry>() {@Overridepublic void accept(Context context, Entry entry) {// Note: This works as a temporary workaround for https://github.com/alibaba/Sentinel/issues/1638// Without the hook, the circuit breaker won't recover from half-open state in some circumstances// when the request is actually blocked by upcoming rules (not only degrade rules).if (entry.getBlockError() != null) {// Fallback to OPEN due to detecting request is blockedcurrentState.compareAndSet(State.HALF_OPEN, State.OPEN);notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);}}});return true;}return false;}

ResponseTimeCircuitBreaker#onRequestComplete,当前状态是开启,不做处理;当前状态是半开,判断是否是慢请求,如果是慢请求,转到开启状态;如果不是慢请求,转到关闭状态。如果当前状态是关闭,计算比例,如果超过最大的慢请求比例,转到开启状态。

    @Overridepublic void onRequestComplete(Context context) {SlowRequestCounter counter = slidingCounter.currentWindow().value();Entry entry = context.getCurEntry();if (entry == null) {return;}long completeTime = entry.getCompleteTimestamp();if (completeTime <= 0) {completeTime = TimeUtil.currentTimeMillis();}long rt = completeTime - entry.getCreateTimestamp();if (rt > maxAllowedRt) {counter.slowCount.add(1);}counter.totalCount.add(1);handleStateChangeWhenThresholdExceeded(rt);}private void handleStateChangeWhenThresholdExceeded(long rt) {if (currentState.get() == State.OPEN) {return;}if (currentState.get() == State.HALF_OPEN) {// In detecting request// TODO: improve logic for half-open recoveryif (rt > maxAllowedRt) {fromHalfOpenToOpen(1.0d);} else {fromHalfOpenToClose();}return;}List<SlowRequestCounter> counters = slidingCounter.values();long slowCount = 0;long totalCount = 0;for (SlowRequestCounter counter : counters) {slowCount += counter.slowCount.sum();totalCount += counter.totalCount.sum();}if (totalCount < minRequestAmount) {return;}double currentRatio = slowCount * 1.0d / totalCount;if (currentRatio > maxSlowRequestRatio) {transformToOpen(currentRatio);}}

在这里插入图片描述


http://www.ppmy.cn/news/110034.html

相关文章

Redis事务详解

目录 一、前言二、Redis事务 - 基本使用三、Redis事务 - 错误处理四、Redis事务 - 事务冲突1、事务所产生的问题2、悲观锁&乐观锁3、watch监听4、watch的应用场景 五、Redis 事务特性 一、前言 事务是指一个完整的动作&#xff0c;要么全部执行&#xff0c;要么什么也没有…

01_java基础语法

1. Java概述 1.1 Java语言背景介绍&#xff08;了解&#xff09; 语言&#xff1a;人与人交流沟通的表达方式 计算机语言&#xff1a;人与计算机之间进行信息交流沟通的一种特殊语言 Java语言是美国Sun公司&#xff08;Stanford University Network&#xff09;在1995年推出的…

Metasploit超详细安装及使用教程(图文版)

通过本篇文章&#xff0c;我们将会学习以下内容&#xff1a; 1、在Windows上安装Metasploit 2、在Linux和MacOS上安装Metasploit 3、在Kali Linux中使用 Metasploit 4、升级Kali Linux 5、使用虚拟化软件构建渗透测试实验环境 6、配置SSH连接 7、使用SSH连接Kali 8、配…

ShardingSphere笔记(三):自定义分片算法 — 按月分表·真·自动建表

ShardingSphere笔记&#xff08;二&#xff09;&#xff1a;自定义分片算法 — 按月分表真自动建表 文章目录 ShardingSphere笔记&#xff08;二&#xff09;&#xff1a;自定义分片算法 — 按月分表真自动建表一、 前言二、 Springboot 的动态数据库三、 实现我们自己的动态数…

Jsp基于Web的可维护的数据库浏览器(源代码+论文+答辩PPT)

1绪论 1.1Web应用系统 近十年来,基于Internet的应用正以前所未有的高速度发展,其中一个重要的方向就是基于Web的应用系统的发展。在此期间,随着技术的不断更新和应用的不断深入,Web应用系统的发展也经历了几个阶段性的跨越。 (图1.1) 在Web发展的初期,人们通常使用W…

读数据压缩入门笔记03_VLC

1. 概率、熵与码字长度 1.1. 数据压缩的目的 1.1.1. 给定一个数据集中的符号&#xff0c;将最短的编码分配给最可能出现的符号 1.2 1.2.1. 当P(A)P(B)&#xff0c;也就是两个符号等可能出现时&#xff0c;数据集对应的熵取最大值LOG2&#xff08;符号的个数&#xff09;&…

static_cast<type_name>(experssion)

C四种类型转换方式 C语言类型转换不适用于C&#xff0c;因此出现了新的类型转换。或者说C语言的强转不安全&#xff0c;没有安全检查。 1.static_cast(expression) 将表达式转换为type_name类型&#xff0c;在编译时使用类型信息转换&#xff1b;但没有运行时类型检查保证转…

TCP网络编程

文章目录 前言一、TCP简介---传输控制协议1. 特点2. 安全可靠3. TCP通信流程 二、TCP函数接口2.1 socket2.2 bind2.3 listen2.4 connect2.5 accept2.6 recv2.7 send 三、TCP通信实例1. 功能介绍2. 引入库3. 发送端4. 发送端 总结 前言 上期分享了UDP下的网络编程&#xff0c;相…