【sentinel】热点规则详解及源码分析

news/2024/10/20 11:43:36/

何为热点?热点即经常访问的数据。很多时候我们希望统计某些热点数据中访问频次最高的Top K数据,并对其访问进行限制。

比如:

  • 商品ID为参数,统计一段时间内最常购买的商品ID并进行限制
  • 用户ID为参数,针对一段时间内频繁访问的用户ID进行限制

热点参数限流会统计传入参数中的热点参数,并根据配置的限流阈值与模式,对包含热点参数的资源调用进行限流。热点参数限流可以看做是一种特殊的流量控制,仅对包含热点参数的资源调用生效。

注意:

  • 热点规则需要使用@SentinelResource(“resourceName”)注解,否则不生效
  • 参数必须是7种基本数据类型才会生效

Sentinel利用LRU策略统计最近最常访问的热点参数,结合令牌桶算法来进行参数级别的流控。

热点参数规则

热点参数规则(ParamFlowRule)类似于流量控制规则(FlowRule):

属性说明默认值
resource资源名,必填
count限流阈值,必填
grade限流模式QPS 模式
durationInSec统计窗口时间长度(单位为秒),1.6.0 版本开始支持1s
controlBehavior流控效果(支持快速失败和匀速排队模式),1.6.0 版本开始支持快速失败
maxQueueingTimeMs最大排队等待时长(仅在匀速排队模式生效),1.6.0 版本开始支持0ms
paramIdx热点参数的索引,必填,对应 SphU.entry(xxx, args) 中的参数索引位置
paramFlowItemList参数例外项,可以针对指定的参数值单独设置限流阈值,不受前面 count 阈值的限制。仅支持基本类型和字符串类型
clusterMode是否是集群参数流控规则false
clusterConfig集群流控相关配置

我们可以通过ParamFlowRuleManager的loadRules方法更新热点参数规则,下面是一个示例:

ParamFlowRule rule = new ParamFlowRule(resourceName).setParamIdx(0).setCount(5);
// 针对 int 类型的参数 PARAM_B,单独设置限流 QPS 阈值为 10,而不是全局的阈值 5.
ParamFlowItem item = new ParamFlowItem().setObject(String.valueOf(PARAM_B)).setClassType(int.class.getName()).setCount(10);
rule.setParamFlowItemList(Collections.singletonList(item));ParamFlowRuleManager.loadRules(Collections.singletonList(rule));

热点规则的使用

要使用热点参数限流功能,需要引入以下依赖:

<dependency><groupId>com.alibaba.csp</groupId><artifactId>sentinel-parameter-flow-control</artifactId><version>x.y.z</version>
</dependency>

然后为对应的资源配置热点参数限流规则,并在entry的时候传入相应的参数,即可使热点参数限流生效。

注:若自行扩展并注册了自己实现的SlotChainBuilder,并希望使用热点参数限流功能,则可以在chain里面合适的地方插入 ParamFlowSlot。

那么如何传入对应的参数以便Sentinel统计呢?我们可以通过SphU类里面几个entry重载方法来传入:

public static Entry entry(String name, EntryType type, int count, Object... args) throws BlockExceptionpublic static Entry entry(Method method, EntryType type, int count, Object... args) throws BlockException

其中最后的一串args就是要传入的参数,有多个就按照次序依次传入。比如要传入两个参数paramA和paramB,则可以:

// paramA in index 0, paramB in index 1.
// 若需要配置例外项或者使用集群维度流控,则传入的参数只支持基本类型。
SphU.entry(resourceName, EntryType.IN, 1, paramA, paramB);

注意:若entry的时候传入了热点参数,那么exit的时候也一定要带上对应的参数(exit(count, args)),否则可能会有统计错误。

正确的示例:

Entry entry = null;
try {entry = SphU.entry(resourceName, EntryType.IN, 1, paramA, paramB);// Your logic here.
} catch (BlockException ex) {// Handle request rejection.
} finally {if (entry != null) {entry.exit(1, paramA, paramB);}
}

注意在Sentinel Dashboard中的簇点链路中根据链接直接配置热点规则是无效的,因为将链接标记为资源是在拦截器AbstractSentinelInterceptor的preHandle()方法中完成的,这个方法里并没有将方法的参数传入entry中。

com.alibaba.csp.sentinel.adapter.spring.webmvc.AbstractSentinelInterceptor#preHandle

public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)throws Exception {try {// 拦截所有的web请求String resourceName = getResourceName(request);if (StringUtil.isEmpty(resourceName)) {return true;}if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {return true;}// Parse the request origin using registered origin parser.String origin = parseOrigin(request);String contextName = getContextName(request);ContextUtil.enter(contextName, origin);// sentinel的入口,注意没有传入方法的参数,无法实现热点规则限流Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);return true;} catch (BlockException e) {try {handleBlockException(request, response, e);} finally {ContextUtil.exit();}return false;}
}

对于@SentinelResource注解方式定义的资源,若注解作用的方法上有参数,Sentinel会将它们作为参数传入SphU.entry(res, args)

比如以下的方法里面p1和p2会分别作为第一个和第二个参数传入Sentinel API,从而可以用于热点规则判断:

@RequestMapping("sentinel")
@RestController
public class ParamFlowController {@RequestMapping("paramFlow")@SentinelResource("paramFlow")public String paramFlow(@RequestParam(value = "p1", required = false) String p1,@RequestParam(value = "p2", required = false) String p2) {return "paramFlow p1=" + p1 + ", p2=" + p2;}
}

例如对上面的资源paramFlow进行热点规则配置:

限流模式只支持QPS模式,也只有QPS模式下才叫热点。

配置的参数索引是@SentinelResource注解的方法参数索引,0代表第一个参数,1代表第二个参数,以此类推;单机阀值以及统计窗口时长表示在此窗口时间超过阀值就限流。

上例中,我们将参数索引指定为0,所以当访问路径带上第一个参数p1时,在一秒(统计窗口时长)内访问超过一次(单机阈值)就可能发生限流,如果不带参数p1不会触发限流。

参数例外项的演示:


在前面的例子基础上,我们增加参数例外项,参数值为1,限流阈值为10,这样当访问路径上第一个参数p1的值为1时,在一秒(统计窗口时长)内访问超过10次(单机阈值)才会发生限流,如果第一个参数p1的值不是1时,限流的阈值还是1,如果不带参数p1不会触发限流,注意指定的参数类型要与方法的参数类型保持一致。

源码分析

ParamFlowSlot插槽

处理热点参数的Slot是ParamFlowSlot。

com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot#entry

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {// 处理热点参数规则// 判断资源名是否配置了规则if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {fireEntry(context, resourceWrapper, node, count, prioritized, args);return;}// 校验热点规则checkFlow(resourceWrapper, count, args);fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot#checkFlow

void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {if (args == null) {return;}// 判断资源名是否配置了规则if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {return;}// 根据资源名查询规则List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());for (ParamFlowRule rule : rules) {// 对规则中参数的index进行处理,index可以为负数applyRealParamIdx(rule, args.length);// Initialize the parameter metrics.// 初始化统计参数ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);// 校验规则if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {String triggeredParam = "";if (args.length > rule.getParamIdx()) {Object value = args[rule.getParamIdx()];triggeredParam = String.valueOf(value);}// 校验不通过,抛出ParamFlowExceptionthrow new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);}}
}

ParamFlowChecker的数据结构

热点参数限流使用的算法为令牌桶算法,首先来看一下数据结构是如何存储的:

// timeRecorder
// 记录令牌桶的最后添加时间,用于QPS限流
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTimeCounters = new HashMap<>();
// tokenCounter
// 记录令牌桶的令牌数量,用于QPS限流
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>();

每个Resource对应一个ParameterMetric对象,上述CacheMap<Object, AtomicLong>的Key代表热点参数的值,Value则是对应的计数器。

所以这里数据结构的关系是这样的:

  • 一个Resource有一个ParameterMetric
  • 一个ParameterMetric统计了多个Rule所需要的限流指标数据
  • 每个Rule又可以配置多个热点参数

CacheMap的默认实现,包装了com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap,使用该类的主要原因是为了实现热点参数的LRU。

ParamFlowChecker的校验逻辑

com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker#passCheck

public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,Object... args) {// 如果参数不存在直接返回if (args == null) {return true;}int paramIdx = rule.getParamIdx();//参数的个数小于规则的索引直接返回if (args.length <= paramIdx) {return true;}// Get parameter value.Object value = args[paramIdx];// Assign value with the result of paramFlowKey methodif (value instanceof ParamFlowArgument) {value = ((ParamFlowArgument) value).paramFlowKey();}// If value is null, then pass// 如果参数为空,直接返回if (value == null) {return true;}if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {return passClusterCheck(resourceWrapper, rule, count, value);}// 校验规则return passLocalCheck(resourceWrapper, rule, count, value);
}private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,Object value) {try {// 根据参数的类型来校验if (Collection.class.isAssignableFrom(value.getClass())) {for (Object param : ((Collection)value)) {if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {return false;}}} else if (value.getClass().isArray()) {int length = Array.getLength(value);for (int i = 0; i < length; i++) {Object param = Array.get(value, i);// 参数一般是数组if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {return false;}}} else {return passSingleValueCheck(resourceWrapper, rule, count, value);}} catch (Throwable e) {RecordLog.warn("[ParamFlowChecker] Unexpected error", e);}return true;
}static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,Object value) {if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {// 热点规则的阈值类型只能配置QPS类型if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);} else {// 流控效果只能是快速失败return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);}} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {Set<Object> exclusionItems = rule.getParsedHotItems().keySet();long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value);if (exclusionItems.contains(value)) {int itemThreshold = rule.getParsedHotItems().get(value);return ++threadCount <= itemThreshold;}long threshold = (long)rule.getCount();return ++threadCount <= threshold;}return true;
}

com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker#passDefaultLocalCheck

static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,Object value) {/*** ParamFlowSlot#checkFlow中会调用ParameterMetricStorage.initParamMetricsFor()初始化统计数据* @see ParamFlowSlot#checkFlow(com.alibaba.csp.sentinel.slotchain.ResourceWrapper, int, java.lang.Object...)*/ParameterMetric metric = getParameterMetric(resourceWrapper);CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);if (tokenCounters == null || timeCounters == null) {return true;}// Calculate max token count (threshold)Set<Object> exclusionItems = rule.getParsedHotItems().keySet();// tokenCount表示的是QPS的阈值,默认使用热点参数中配置的单机阈值long tokenCount = (long)rule.getCount();if (exclusionItems.contains(value)) {// 如果参数例外项中配置了,则使用参数例外项中配置的阈值tokenCount = rule.getParsedHotItems().get(value);}// 阈值为0,直接返回false,不通过if (tokenCount == 0) {return false;}// burstCount表示应对突发流量额外允许的数量,默认为0long maxCount = tokenCount + rule.getBurstCount();// acquireCount表示请求需要的QPS数量,默认为1if (acquireCount > maxCount) {return false;}while (true) {long currentTime = TimeUtil.currentTimeMillis();// tokenCounters用来记录参数当前还能获取到的令牌数// timeCounters用来记录时间段的开始时间。// 获取当前统计的时间段的开始时间AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));if (lastAddTokenTime == null) {// 当前时间段还没有计算就初始化令牌数// Token never added, just replenish the tokens and consume {@code acquireCount} immediately.tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));return true;}// Calculate the time duration since last token was added.// 计算已经过去了多长时间long passTime = currentTime - lastAddTokenTime.get();// A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.if (passTime > rule.getDurationInSec() * 1000) {// 当前时间不在这个窗口内AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));if (oldQps == null) {// Might not be accurate here.lastAddTokenTime.set(currentTime);return true;} else {// 重新计算QPSlong restQps = oldQps.get();// 每毫秒应该生成的 token = tokenCount / (rule.getDurationInSec() * 1000)// 再 * passTime 即等于应该补充的 tokenlong toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount): (restQps + toAddCount - acquireCount);if (newQps < 0) {return false;}if (oldQps.compareAndSet(restQps, newQps)) {// 更新当前时间lastAddTokenTime.set(currentTime);return true;}Thread.yield();}} else {// 当前时间还在还在这个窗口内AtomicLong oldQps = tokenCounters.get(value);if (oldQps != null) {long oldQpsValue = oldQps.get();if (oldQpsValue - acquireCount >= 0) {// CAS减少令牌数if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {return true;}} else {// 令牌数不够直接返回false,限流return false;}}Thread.yield();}}
}

http://www.ppmy.cn/news/66150.html

相关文章

DAY 57 MySQL数据库的事务

事务的概念 事务是一种机制、一个操作序列&#xff0c;包含了一组数据库操作命令&#xff0c;并且把所有的命令作为一个 整体一起向系统提交或撤销操作请求&#xff0c;即这一组数据库命令要么都执行&#xff0c;要么都不执行。事务是一个不可分割的工作逻辑单元&#xff0c;在…

MathGPT是什么,MathGPT与ChatGPT的区别是什么,MathGPT十大应用场景

MathGPT是一种基于自然语言处理技术的数学语言模型&#xff0c;其目的是通过自动化生成数学公式、证明和解题步骤等来辅助数学学习和研究。 与ChatGPT相比&#xff0c;MathGPT主要关注数学领域而非通用性的自然语言理解&#xff0c;因此其训练语料库和预测任务都与数学有关&…

[图神经网络]ViG(Vision GNN)网络代码实现

论文解读&#xff1a; [图神经网络]视觉图神经网络ViG(Vision GNN)--论文阅读https://blog.csdn.net/weixin_37878740/article/details/130124772?spm1001.2014.3001.5501代码地址&#xff1a; ViGhttps://github.com/huawei-noah/Efficient-AI-Backbones/tree/master/vig_p…

Springboot +Flowable,三种常见网关的使用(排他、并行、包容网关)(二)

一.简介 Flowable 中常用的网关主要有三种类型&#xff0c;分别是&#xff1a; 排他网关并行网关包容网关 下面来说下这三种的网关的概念和用法。 二.并行网关 并行网关&#xff0c;这种网关一般用在并行任务上&#xff0c;截图如下&#xff1a; 并行网关一般是成对出现的…

Overcoming catastrophic forgetting in neural networks

目录 预备知识&#xff1a; 论文笔记 1. Introduction 2. Elastic weight consolidation 2.1 EWC allows continual learning in a supervised learning context 2.2 EWC allows continual learning in a reinforcement learning context 3. Conclusion 文章链接&#x…

微信小程序从入门到精通

目录 前言一&#xff0c;初学小程序1.1 小程序概述1.2 基础配置1.2.1 注册开发账号1.2.2 获取AppID1.2.3 微信开发者工具1.2.4 修改代理模式 1.3 第一个小程序1.4 开发文档1.5 机型1.6 项目基本结构1.6.1 页面内部文件1.6.2 app.json1.6.3 project.config.json1.6.4 sitemap.js…

redis 数据类型简介

redis 数据类型 redis的五种数据类型是&#xff1a;1、string&#xff08;字符串&#xff09;&#xff1b;2、hash&#xff08;哈希&#xff09;&#xff1b;3、list&#xff08;列表&#xff09;&#xff1b;4、set&#xff08;集合&#xff09;&#xff1b;5、sort set &…

【MySql】数据库索引

数据库索引 索引索引的创建索引的查看索引的删除 聚簇索引 & 非聚簇索引聚簇索引非聚簇索引 索引创建原则 索引 可以简单理解为一本书的目录信息&#xff0c;是为了提升查找效率而建立的 索引的创建 1、在创建一个主键、唯一键、外键时候&#xff0c;数据库会自动地针对查…