Ali-Sentinel-链路控制

server/2024/12/22 20:27:29/

归档

  • GitHub: Ali-Sentinel-链路控制

链结构

  • 参考:入口控制-处理链

具体实现

NodeSelectorSlot

  • 给上下文设置统计节点

  • com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot

java">@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {DefaultNode node = map.get(context.getName());if (node == null) {synchronized (this) {   // DCLnode = map.get(context.getName());if (node == null) {node = new DefaultNode(resourceWrapper, null);              // 创建默认节点HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());cacheMap.putAll(map);cacheMap.put(context.getName(), node);map = cacheMap;                                             // COW((DefaultNode) context.getLastNode()).addChild(node);       // 构建调用树  ref: sign_m_010 | sign_m_020}}}context.setCurNode(node);                                               // 保存到上下文  ref: sign_m_011fireEntry(context, resourceWrapper, node, count, prioritized, args);    // (将统计节点) 流转给下游}
}
  • com.alibaba.csp.sentinel.context.Context
    • entranceNode 在 上下文-创建 sign_m_020 中赋值
java">    // sign_m_010 获取尾节点public Node getLastNode() {if (curEntry != null && curEntry.getLastNode() != null) {return curEntry.getLastNode();  // sign_m_040} else {return entranceNode;    // 一般返回此}}// sign_m_011public Context setCurNode(Node node) {this.curEntry.setCurNode(node); //  sign_m_030return this;}
  • com.alibaba.csp.sentinel.node.DefaultNode
java">    // sign_m_020 添加子节点public void addChild(Node node) {... // 省略 node 空判断if (!childList.contains(node)) {synchronized (this) {       // DCLif (!childList.contains(node)) {Set<Node> newSet = new HashSet<>(childList.size() + 1);newSet.addAll(childList);newSet.add(node);childList = newSet; // COW}}}}
java">public abstract Node getLastNode();// sign_m_030public void setCurNode(Node node) {this.curNode = node;}// sign_m_031public Node getCurNode() {return curNode;}
java">    // sign_m_040@Overridepublic Node getLastNode() {return parent == null ? null : parent.getCurNode(); // sign_m_031}

ClusterBuilderSlot

  • 给统计节点设置集群(统计)节点

    • 只是用于统计
  • com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot

java">@Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>(); // 记录所有的节点private static final Object lock = new Object();private volatile ClusterNode clusterNode = null;@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {if (clusterNode == null) {synchronized (lock) {   // DCLif (clusterNode == null) {clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));newMap.putAll(clusterNodeMap);newMap.put(node.getId(), clusterNode);clusterNodeMap = newMap;    // COW}}}node.setClusterNode(clusterNode);       // 设置集群节点... // 省略设置源节点处理fireEntry(context, resourceWrapper, node, count, prioritized, args);    // 传递给下游节点}}

LogSlot

  • 日志异常记录

  • com.alibaba.csp.sentinel.slots.logger.LogSlot

java">@Spi(order = Constants.ORDER_LOG_SLOT)
public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)throws Throwable {try {fireEntry(context, resourceWrapper, obj, count, prioritized, args); // 先传给下游} catch (BlockException e) {EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),context.getOrigin(), e.getRule().getId(), count);throw e; // 继续往上抛} catch (Throwable e) {// 下游处理出错,则记录异常日志RecordLog.warn("Unexpected entry exception", e);}}
}

StatisticSlot

  • 统计各种数据

  • com.alibaba.csp.sentinel.slots.statistic.StatisticSlot

java">@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Override // sign_m_400 记录通过数public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {try {/*** 先传给下游处理,让下游先处理,* 后面自己才做统计,* 这样,下游要处理 QPS 等,要等下次才有数据*/fireEntry(context, resourceWrapper, node, count, prioritized, args);// 添加线程计数和通过计数node.increaseThreadNum();node.addPassRequest(count);                     // ref: sign_m_401... // 省略入口源节点添加计数// 全局入站节点添加计数 (其用于系统流控)if (resourceWrapper.getEntryType() == EntryType.IN) {Constants.ENTRY_NODE.increaseThreadNum();Constants.ENTRY_NODE.addPassRequest(count); // ref: sign_m_401}... // sign_call_100 省略注册的回调器处理} catch (PriorityWaitException ex) {... // 省略此异常处理 (相当于下游抛出此异常,上面的统计 (除 pass 外) 再走一次)} catch (BlockException e) {context.getCurEntry().setBlockError(e); // 记录异常node.increaseBlockQps(count);           // Add block count.... // 省略入口源节点添加计数// 全局入站节点添加计数if (resourceWrapper.getEntryType() == EntryType.IN) {Constants.ENTRY_NODE.increaseBlockQps(count);}... // 省略注册的回调器处理throw e;} catch (Throwable e) {context.getCurEntry().setError(e);      // 记录异常throw e;}}@Override // 记录完成数public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {Node node = context.getCurNode();if (context.getCurEntry().getBlockError() == null) {// 计算响应时间 (当前时间 - 入口创建时间)long completeStatTime = TimeUtil.currentTimeMillis();context.getCurEntry().setCompleteTimestamp(completeStatTime);long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();Throwable error = context.getCurEntry().getError();// 记录响应时间和成功次数recordCompleteFor(node, count, rt, error);recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error);if (resourceWrapper.getEntryType() == EntryType.IN) {recordCompleteFor(Constants.ENTRY_NODE, count, rt, error);}}... // sign_call_200 省略注册的回调器处理fireExit(context, resourceWrapper, count, args); // 传给下游}private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) {... // 省略 node 为空返回处理node.addRtAndSuccess(rt, batchCount);       // 添加 RT 和完成数node.decreaseThreadNum();                   // 减线程数if (error != null && !(error instanceof BlockException)) {node.increaseExceptionQps(batchCount);  // 添加异常计数}}
}
  • com.alibaba.csp.sentinel.node.DefaultNode
java">    // sign_m_401@Overridepublic void addPassRequest(int count) {super.addPassRequest(count);            // ref: sign_m_410this.clusterNode.addPassRequest(count); // ref: sign_m_410}
  • com.alibaba.csp.sentinel.node.StatisticNode
    • Metric.addPass 方法参考:节点与度量-addpass
java">    // sign_m_410@Overridepublic void addPassRequest(int count) {rollingCounterInSecond.addPass(count);  // 参考: 节点与度量-addpassrollingCounterInMinute.addPass(count);}

AuthoritySlot

  • 黑、白名单权限校验

  • com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot

java">@Spi(order = Constants.ORDER_AUTHORITY_SLOT)
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)throws Throwable {checkBlackWhiteAuthority(resourceWrapper, context);                     // 先校验  ref: sign_m_501fireEntry(context, resourceWrapper, node, count, prioritized, args);    // 再传给下游}// sign_m_501void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();  // 获取总规则... // 省略 authorityRules 为 null 返回Set<AuthorityRule> rules = authorityRules.get(resource.getName());  // 获取当前资源的规则... // 省略 rules 为 null 返回// 一般一个资源只有一条规则  ref: AuthorityRuleManager.RulePropertyListener #loadAuthorityConffor (AuthorityRule rule : rules) {if (!AuthorityRuleChecker.passCheck(rule, context)) {           // 依次校验  ref: sign_m_510throw new AuthorityException(context.getOrigin(), rule);    // 校验不通过则抛异常}}}
}
  • com.alibaba.csp.sentinel.slots.block.authority.AuthorityRuleChecker
java">    // sign_m_510 校验权限规则static boolean passCheck(AuthorityRule rule, Context context) {String requester = context.getOrigin();... // 省略 requester 和 rule.getLimitApp() 空判断int pos = rule.getLimitApp().indexOf(requester);boolean contain = pos > -1; // 包含// 加此判断可省略不包含时的多余处理// 下面的处理相当于:逗号分隔再依次精确匹配if (contain) {boolean exactlyMatch = false;String[] appArray = rule.getLimitApp().split(",");  // 英文逗号分隔for (String app : appArray) {if (requester.equals(app)) {    // 精确匹配exactlyMatch = true;        // 匹配上,才算包含break;}}contain = exactlyMatch;}int strategy = rule.getStrategy();if (strategy == RuleConstant.AUTHORITY_BLACK && contain) {  // 黑名单,包含:则不通过return false;}if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) { // 白名单,不包含:则不通过return false;}return true;    // 通过}

SystemSlot

  • com.alibaba.csp.sentinel.slots.system.SystemSlot
java">@Spi(order = Constants.ORDER_SYSTEM_SLOT)
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {SystemRuleManager.checkSystem(resourceWrapper, count);                  // 先校验  ref: sign_m_610fireEntry(context, resourceWrapper, node, count, prioritized, args);    // 再传给下游}
}
  • com.alibaba.csp.sentinel.slots.system.SystemRuleManager
java">    // sign_m_610public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException {... // 省略 resourceWrapper 为 null 返回... // 省略 checkSystemStatus 为 false 返回... // 省略 资源类型 不为 入站 返回// total qpsdouble currentQps = Constants.ENTRY_NODE.passQps(); // 在 StatisticSlot 里记录,ref: sign_m_400if (currentQps + count > qps) {throw new SystemBlockException(resourceWrapper.getName(), "qps");}... // 省略 线程数 校验... // 省略 平均RT 校验// load. BBR algorithm.if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {if (!checkBbr(currentThread)) { // ref: sign_m_611throw new SystemBlockException(resourceWrapper.getName(), "load");}}... // 省略 CPU 校验 (CPU 数据每秒读取一次)}// sign_m_611private static boolean checkBbr(int currentThread) {if (currentThread > 1 &&currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {return false;}return true;}

FlowSlot

  • 对当前资源进行流控

  • com.alibaba.csp.sentinel.slots.block.flow.FlowSlot

    • 流控规则设置参考:入口控制-设置规则 sign_demo_020
java">@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {private final FlowRuleChecker checker;@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {checkFlow(resourceWrapper, context, node, count, prioritized);          // 先校验 sign_m_710fireEntry(context, resourceWrapper, node, count, prioritized, args);    // 再传给下游}// sign_m_710void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)throws BlockException {checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);   // sign_m_720}// 返回资源对应的流控规则集合private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {@Overridepublic Collection<FlowRule> apply(String resource) {Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();   // ref: sign_demo_020return flowRules.get(resource);}};
}
  • com.alibaba.csp.sentinel.slots.block.flow.FlowRuleChecker
java">    // sign_m_720public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {... // 省略 ruleProvider 和 resource 为 null 返回Collection<FlowRule> rules = ruleProvider.apply(resource.getName());if (rules != null) {for (FlowRule rule : rules) {if (!canPassCheck(rule, context, node, count, prioritized)) {   // sign_m_721throw new FlowException(rule.getLimitApp(), rule);}}}}// sign_m_721public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {... // 省略 rule.limitApp 为 null 返回 trueif (rule.isClusterMode()) {// ref: 集群流控-流控原理-sign_m_411return passClusterCheck(rule, context, node, acquireCount, prioritized);}return passLocalCheck(rule, context, node, acquireCount, prioritized);  // sign_m_722}// sign_m_722private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);  // 查找统计节点if (selectedNode == null) {return true;}return rule.getRater().canPass(selectedNode, acquireCount, prioritized);    // 使用规则控制器进行判断,ref: sign_m_731}
  • com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController
java">    // sign_m_731 判断是否可以通过@Overridepublic boolean canPass(Node node, int acquireCount, boolean prioritized) {int curCount = avgUsedTokens(node);     // sign_m_732if (curCount + acquireCount > count) {  // 判断是否超过自身设置的限制数if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {long currentTime = TimeUtil.currentTimeMillis();long waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {node.addWaitingRequest(currentTime + waitInMs, acquireCount);node.addOccupiedPass(acquireCount);sleep(waitInMs);throw new PriorityWaitException(waitInMs);  // 报此异常可通过}}return false;   // 超过限制则返回 false (表示不通过)}return true;}// sign_m_732 返回当前计数private int avgUsedTokens(Node node) {if (node == null) {return DEFAULT_AVG_USED_TOKENS;}return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());}

DegradeSlot

  • 熔断处理

  • com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

java">@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {performChecking(context, resourceWrapper);                              // 先校验 sign_m_801fireEntry(context, resourceWrapper, node, count, prioritized, args);    // 再传给下游}// sign_m_801 熔断校验void performChecking(Context context, ResourceWrapper r) throws BlockException {List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());if (circuitBreakers == null || circuitBreakers.isEmpty()) {return;}for (CircuitBreaker cb : circuitBreakers) {if (!cb.tryPass(context)) { // 熔断判断 sign_m_810// 不通过则进行熔断报错throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());}}}@Override // 退出时进行计数和状态处理public void exit(Context context, ResourceWrapper r, int count, Object... args) {Entry curEntry = context.getCurEntry();... // 出现熔断 (有熔断异常) 则传给下游处理并返回List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());... // 无断路器则传给下游处理并返回if (curEntry.getBlockError() == null) {for (CircuitBreaker circuitBreaker : circuitBreakers) {circuitBreaker.onRequestComplete(context);  // 断路器计数与状态变更处理,ref: sign_m_830}}fireExit(context, r, count, args);}
}
  • com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.AbstractCircuitBreaker
java">    // sign_m_810 熔断判断@Overridepublic boolean tryPass(Context context) {if (currentState.get() == State.CLOSED) {return true;    // 断路器关闭:直接通过}if (currentState.get() == State.OPEN) {// 断路器已打开:超过指定熔断时间,尝试半打开处理return retryTimeoutArrived() && fromOpenToHalfOpen(context);    // sign_m_811 | sign_m_812}return false;       // 断路器半打开:不通过}// sign_m_811 判断是否超过熔断时长protected boolean retryTimeoutArrived() {return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;}// sign_m_812 尝试半打开处理protected boolean fromOpenToHalfOpen(Context context) {if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {notifyObservers(State.OPEN, State.HALF_OPEN, null);Entry entry = context.getCurEntry();entry.whenTerminate(new BiConsumer<Context, Entry>() {  // 添加 entry.exit() 回调@Overridepublic void accept(Context context, Entry entry) {  // 在 entry.exit() 被调用if (entry.getBlockError() != null) {                            // 尝试请求时出错currentState.compareAndSet(State.HALF_OPEN, State.OPEN);    // 重新打开notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);}}});return true;    // 只让一个线程 (且只进行一次) 处理}return false;       // 被其他线程抢占 (不通过)}
  • com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.ExceptionCircuitBreaker
java">    // sign_m_830 断路器计数与状态变更处理@Overridepublic void onRequestComplete(Context context) {Entry entry = context.getCurEntry();if (entry == null) {return;}Throwable error = entry.getError();SimpleErrorCounter counter = stat.currentWindow().value();if (error != null) {counter.getErrorCount().add(1); // 有异常,添加异常计数}counter.getTotalCount().add(1);     // 添加请求 (总) 计数handleStateChangeWhenThresholdExceeded(error);  // sign_m_831}// sign_m_831 状态变更处理private void handleStateChangeWhenThresholdExceeded(Throwable error) {... // 当前状态为打开,则返回if (currentState.get() == State.HALF_OPEN) {// 当前为半打开状态if (error == null) {fromHalfOpenToClose();              // 无异常,则关闭} else {fromHalfOpenToOpen(1.0d);           // 有异常,则继续打开}return;}List<SimpleErrorCounter> counters = stat.values();long errCount = 0;long totalCount = 0;for (SimpleErrorCounter counter : counters) {errCount += counter.errorCount.sum();totalCount += counter.totalCount.sum();}if (totalCount < minRequestAmount) {return; // 小于最小请求数,不处理}double curCount = errCount;if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {curCount = errCount * 1.0d / totalCount;    // 使用比率计算}if (curCount > threshold) {transformToOpen(curCount);                  // 超过阈值,开启熔断 (并设置开始重试的时间戳)}}

总结

  • 权限 (内置) 2 种规则 (但只支持设置 1 种,黑白名单互斥)
  • 流控 (内置) 有 4 (也可说 5) 种规则 (QPS 或线程数、速率、慢热 QPS、慢热速率)
  • 熔断 (内置) 有 2 (也可说 3) 种规则 (异常数或比率、响应时长)

http://www.ppmy.cn/server/18977.html

相关文章

机器学习 - 监督学习 - KNN、线性回归与岭回归

机器学习学习笔记 - 监督学习 - KNN、线性回归与岭回归 一、K-近邻算法&#xff08;KNN&#xff09; K-近邻算法&#xff08;K-Nearest Neighbors&#xff0c;简称KNN&#xff09;是一种基础且直观的监督学习算法。它的工作原理是&#xff1a;对于一个新的未知类别的样本&…

ORACLE 中varchar2类型的日期数字,例如20230814,转为2023-08-14

ORACLE 中varchar2类型的日期数字&#xff0c;例如20230814&#xff0c;转为2023-08-14 引言场景一&#xff1a;简单格式转换场景二&#xff1a;更新字段为日期类型场景三&#xff1a;在WHERE子句中处理varchar日期场景四&#xff1a;联合其他日期操作总结 引言 在Oracle数据库…

案例-部门管理-新增

黑马程序员JavaWeb开发教程 文章目录 一、页面原型二、接口文档三开发1、controller2、service&#xff08;1&#xff09;service接口层&#xff08;2&#xff09;Service实现层 3、 mapper4、postman 优化 一、页面原型 二、接口文档 在这里插入图片描述 三开发 1、control…

OCP Java17 SE Developers 复习题15(完)

答案 B, F. The Driver and PreparedStatement interfaces are part of the JDK, making options A and E incorrect. Option C is incorrect because we made it up. The concrete DriverManager class is also part of the JDK, making option D incorrect. Options B and…

机器学习之sklearn基础教程

ChatGPT Scikit-learn (简称sklearn) 是一个非常受欢迎的Python机器学习库。它包含了从数据预处理到训练模型的各种工具。下面是一个关于如何使用sklearn进行机器学习的基础教程。 1. 安装和导入sklearn库 首先&#xff0c;你需要安装sklearn库&#xff08;如果你还没有安装的…

排序算法-快速排序

一、快速排序 快速排序也属于交换排序&#xff0c;通过元素之间的比较和交换位置来达到排序的目的。 冒泡排序在每一轮中只把1个元素冒泡到数列的一端。而快速排序则在每一轮挑选一个基准元素&#xff0c;并让其他比它大的元素移动到数列一边&#xff0c;比它小的元素移动到数列…

[C++ QT项目实战]----C++ QT系统登陆界面设计

前言 在C QT项目开发过程中&#xff0c;设计系统登录界面可以使用QT框架来实现。以下是一个简单的系统登录界面设计示例&#xff1a; 创建登录界面UI&#xff1a;可以使用QT Designer来设计登录界面的UI&#xff0c;包括用户名输入框、密码输入框、登录按钮等。在QT Designer中…

wow-slist文件说明

wow-slist文件说明 项目地址&#xff1a;https://gitee.com/wow-iot/wow-iot7本文件的的功能主要用于链表相关操作&#xff0c;主要涉及创建、销毁、插入、查找、移除、替换、获取、清空、遍历&#xff1b; 创建与销毁: Slist_T* wow_slist_create(void) {Slist_T *slist C…