归档
- GitHub: Ali-Sentinel-链路控制
链结构
- 参考:入口控制-处理链
具体实现
NodeSelectorSlot
-
给上下文设置统计节点
-
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
java">@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {DefaultNode node = map.get(context.getName());if (node == null) {synchronized (this) { // DCLnode = map.get(context.getName());if (node == null) {node = new DefaultNode(resourceWrapper, null); // 创建默认节点HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());cacheMap.putAll(map);cacheMap.put(context.getName(), node);map = cacheMap; // COW((DefaultNode) context.getLastNode()).addChild(node); // 构建调用树 ref: sign_m_010 | sign_m_020}}}context.setCurNode(node); // 保存到上下文 ref: sign_m_011fireEntry(context, resourceWrapper, node, count, prioritized, args); // (将统计节点) 流转给下游}
}
com.alibaba.csp.sentinel.context.Context
entranceNode
在 上下文-创建 sign_m_020 中赋值
java"> // sign_m_010 获取尾节点public Node getLastNode() {if (curEntry != null && curEntry.getLastNode() != null) {return curEntry.getLastNode(); // sign_m_040} else {return entranceNode; // 一般返回此}}// sign_m_011public Context setCurNode(Node node) {this.curEntry.setCurNode(node); // sign_m_030return this;}
com.alibaba.csp.sentinel.node.DefaultNode
java"> // sign_m_020 添加子节点public void addChild(Node node) {... // 省略 node 空判断if (!childList.contains(node)) {synchronized (this) { // DCLif (!childList.contains(node)) {Set<Node> newSet = new HashSet<>(childList.size() + 1);newSet.addAll(childList);newSet.add(node);childList = newSet; // COW}}}}
com.alibaba.csp.sentinel.Entry
java">public abstract Node getLastNode();// sign_m_030public void setCurNode(Node node) {this.curNode = node;}// sign_m_031public Node getCurNode() {return curNode;}
com.alibaba.csp.sentinel.CtEntry
java"> // sign_m_040@Overridepublic Node getLastNode() {return parent == null ? null : parent.getCurNode(); // sign_m_031}
ClusterBuilderSlot
-
给统计节点设置集群(统计)节点
- 只是用于统计
-
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
java">@Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>(); // 记录所有的节点private static final Object lock = new Object();private volatile ClusterNode clusterNode = null;@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {if (clusterNode == null) {synchronized (lock) { // DCLif (clusterNode == null) {clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));newMap.putAll(clusterNodeMap);newMap.put(node.getId(), clusterNode);clusterNodeMap = newMap; // COW}}}node.setClusterNode(clusterNode); // 设置集群节点... // 省略设置源节点处理fireEntry(context, resourceWrapper, node, count, prioritized, args); // 传递给下游节点}}
LogSlot
-
日志异常记录
-
com.alibaba.csp.sentinel.slots.logger.LogSlot
java">@Spi(order = Constants.ORDER_LOG_SLOT)
public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)throws Throwable {try {fireEntry(context, resourceWrapper, obj, count, prioritized, args); // 先传给下游} catch (BlockException e) {EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),context.getOrigin(), e.getRule().getId(), count);throw e; // 继续往上抛} catch (Throwable e) {// 下游处理出错,则记录异常日志RecordLog.warn("Unexpected entry exception", e);}}
}
StatisticSlot
-
统计各种数据
-
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
java">@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Override // sign_m_400 记录通过数public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {try {/*** 先传给下游处理,让下游先处理,* 后面自己才做统计,* 这样,下游要处理 QPS 等,要等下次才有数据*/fireEntry(context, resourceWrapper, node, count, prioritized, args);// 添加线程计数和通过计数node.increaseThreadNum();node.addPassRequest(count); // ref: sign_m_401... // 省略入口源节点添加计数// 全局入站节点添加计数 (其用于系统流控)if (resourceWrapper.getEntryType() == EntryType.IN) {Constants.ENTRY_NODE.increaseThreadNum();Constants.ENTRY_NODE.addPassRequest(count); // ref: sign_m_401}... // sign_call_100 省略注册的回调器处理} catch (PriorityWaitException ex) {... // 省略此异常处理 (相当于下游抛出此异常,上面的统计 (除 pass 外) 再走一次)} catch (BlockException e) {context.getCurEntry().setBlockError(e); // 记录异常node.increaseBlockQps(count); // Add block count.... // 省略入口源节点添加计数// 全局入站节点添加计数if (resourceWrapper.getEntryType() == EntryType.IN) {Constants.ENTRY_NODE.increaseBlockQps(count);}... // 省略注册的回调器处理throw e;} catch (Throwable e) {context.getCurEntry().setError(e); // 记录异常throw e;}}@Override // 记录完成数public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {Node node = context.getCurNode();if (context.getCurEntry().getBlockError() == null) {// 计算响应时间 (当前时间 - 入口创建时间)long completeStatTime = TimeUtil.currentTimeMillis();context.getCurEntry().setCompleteTimestamp(completeStatTime);long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();Throwable error = context.getCurEntry().getError();// 记录响应时间和成功次数recordCompleteFor(node, count, rt, error);recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error);if (resourceWrapper.getEntryType() == EntryType.IN) {recordCompleteFor(Constants.ENTRY_NODE, count, rt, error);}}... // sign_call_200 省略注册的回调器处理fireExit(context, resourceWrapper, count, args); // 传给下游}private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) {... // 省略 node 为空返回处理node.addRtAndSuccess(rt, batchCount); // 添加 RT 和完成数node.decreaseThreadNum(); // 减线程数if (error != null && !(error instanceof BlockException)) {node.increaseExceptionQps(batchCount); // 添加异常计数}}
}
com.alibaba.csp.sentinel.node.DefaultNode
java"> // sign_m_401@Overridepublic void addPassRequest(int count) {super.addPassRequest(count); // ref: sign_m_410this.clusterNode.addPassRequest(count); // ref: sign_m_410}
com.alibaba.csp.sentinel.node.StatisticNode
Metric.addPass
方法参考:节点与度量-addpass
java"> // sign_m_410@Overridepublic void addPassRequest(int count) {rollingCounterInSecond.addPass(count); // 参考: 节点与度量-addpassrollingCounterInMinute.addPass(count);}
AuthoritySlot
-
黑、白名单权限校验
-
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
java">@Spi(order = Constants.ORDER_AUTHORITY_SLOT)
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)throws Throwable {checkBlackWhiteAuthority(resourceWrapper, context); // 先校验 ref: sign_m_501fireEntry(context, resourceWrapper, node, count, prioritized, args); // 再传给下游}// sign_m_501void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules(); // 获取总规则... // 省略 authorityRules 为 null 返回Set<AuthorityRule> rules = authorityRules.get(resource.getName()); // 获取当前资源的规则... // 省略 rules 为 null 返回// 一般一个资源只有一条规则 ref: AuthorityRuleManager.RulePropertyListener #loadAuthorityConffor (AuthorityRule rule : rules) {if (!AuthorityRuleChecker.passCheck(rule, context)) { // 依次校验 ref: sign_m_510throw new AuthorityException(context.getOrigin(), rule); // 校验不通过则抛异常}}}
}
com.alibaba.csp.sentinel.slots.block.authority.AuthorityRuleChecker
java"> // sign_m_510 校验权限规则static boolean passCheck(AuthorityRule rule, Context context) {String requester = context.getOrigin();... // 省略 requester 和 rule.getLimitApp() 空判断int pos = rule.getLimitApp().indexOf(requester);boolean contain = pos > -1; // 包含// 加此判断可省略不包含时的多余处理// 下面的处理相当于:逗号分隔再依次精确匹配if (contain) {boolean exactlyMatch = false;String[] appArray = rule.getLimitApp().split(","); // 英文逗号分隔for (String app : appArray) {if (requester.equals(app)) { // 精确匹配exactlyMatch = true; // 匹配上,才算包含break;}}contain = exactlyMatch;}int strategy = rule.getStrategy();if (strategy == RuleConstant.AUTHORITY_BLACK && contain) { // 黑名单,包含:则不通过return false;}if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) { // 白名单,不包含:则不通过return false;}return true; // 通过}
SystemSlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
java">@Spi(order = Constants.ORDER_SYSTEM_SLOT)
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {SystemRuleManager.checkSystem(resourceWrapper, count); // 先校验 ref: sign_m_610fireEntry(context, resourceWrapper, node, count, prioritized, args); // 再传给下游}
}
com.alibaba.csp.sentinel.slots.system.SystemRuleManager
java"> // sign_m_610public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException {... // 省略 resourceWrapper 为 null 返回... // 省略 checkSystemStatus 为 false 返回... // 省略 资源类型 不为 入站 返回// total qpsdouble currentQps = Constants.ENTRY_NODE.passQps(); // 在 StatisticSlot 里记录,ref: sign_m_400if (currentQps + count > qps) {throw new SystemBlockException(resourceWrapper.getName(), "qps");}... // 省略 线程数 校验... // 省略 平均RT 校验// load. BBR algorithm.if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {if (!checkBbr(currentThread)) { // ref: sign_m_611throw new SystemBlockException(resourceWrapper.getName(), "load");}}... // 省略 CPU 校验 (CPU 数据每秒读取一次)}// sign_m_611private static boolean checkBbr(int currentThread) {if (currentThread > 1 &¤tThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {return false;}return true;}
FlowSlot
-
对当前资源进行流控
-
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
- 流控规则设置参考:入口控制-设置规则 sign_demo_020
java">@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {private final FlowRuleChecker checker;@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {checkFlow(resourceWrapper, context, node, count, prioritized); // 先校验 sign_m_710fireEntry(context, resourceWrapper, node, count, prioritized, args); // 再传给下游}// sign_m_710void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)throws BlockException {checker.checkFlow(ruleProvider, resource, context, node, count, prioritized); // sign_m_720}// 返回资源对应的流控规则集合private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {@Overridepublic Collection<FlowRule> apply(String resource) {Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap(); // ref: sign_demo_020return flowRules.get(resource);}};
}
com.alibaba.csp.sentinel.slots.block.flow.FlowRuleChecker
java"> // sign_m_720public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {... // 省略 ruleProvider 和 resource 为 null 返回Collection<FlowRule> rules = ruleProvider.apply(resource.getName());if (rules != null) {for (FlowRule rule : rules) {if (!canPassCheck(rule, context, node, count, prioritized)) { // sign_m_721throw new FlowException(rule.getLimitApp(), rule);}}}}// sign_m_721public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {... // 省略 rule.limitApp 为 null 返回 trueif (rule.isClusterMode()) {// ref: 集群流控-流控原理-sign_m_411return passClusterCheck(rule, context, node, acquireCount, prioritized);}return passLocalCheck(rule, context, node, acquireCount, prioritized); // sign_m_722}// sign_m_722private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); // 查找统计节点if (selectedNode == null) {return true;}return rule.getRater().canPass(selectedNode, acquireCount, prioritized); // 使用规则控制器进行判断,ref: sign_m_731}
com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController
java"> // sign_m_731 判断是否可以通过@Overridepublic boolean canPass(Node node, int acquireCount, boolean prioritized) {int curCount = avgUsedTokens(node); // sign_m_732if (curCount + acquireCount > count) { // 判断是否超过自身设置的限制数if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {long currentTime = TimeUtil.currentTimeMillis();long waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {node.addWaitingRequest(currentTime + waitInMs, acquireCount);node.addOccupiedPass(acquireCount);sleep(waitInMs);throw new PriorityWaitException(waitInMs); // 报此异常可通过}}return false; // 超过限制则返回 false (表示不通过)}return true;}// sign_m_732 返回当前计数private int avgUsedTokens(Node node) {if (node == null) {return DEFAULT_AVG_USED_TOKENS;}return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());}
DegradeSlot
-
熔断处理
-
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
java">@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {performChecking(context, resourceWrapper); // 先校验 sign_m_801fireEntry(context, resourceWrapper, node, count, prioritized, args); // 再传给下游}// sign_m_801 熔断校验void performChecking(Context context, ResourceWrapper r) throws BlockException {List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());if (circuitBreakers == null || circuitBreakers.isEmpty()) {return;}for (CircuitBreaker cb : circuitBreakers) {if (!cb.tryPass(context)) { // 熔断判断 sign_m_810// 不通过则进行熔断报错throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());}}}@Override // 退出时进行计数和状态处理public void exit(Context context, ResourceWrapper r, int count, Object... args) {Entry curEntry = context.getCurEntry();... // 出现熔断 (有熔断异常) 则传给下游处理并返回List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());... // 无断路器则传给下游处理并返回if (curEntry.getBlockError() == null) {for (CircuitBreaker circuitBreaker : circuitBreakers) {circuitBreaker.onRequestComplete(context); // 断路器计数与状态变更处理,ref: sign_m_830}}fireExit(context, r, count, args);}
}
com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.AbstractCircuitBreaker
java"> // sign_m_810 熔断判断@Overridepublic boolean tryPass(Context context) {if (currentState.get() == State.CLOSED) {return true; // 断路器关闭:直接通过}if (currentState.get() == State.OPEN) {// 断路器已打开:超过指定熔断时间,尝试半打开处理return retryTimeoutArrived() && fromOpenToHalfOpen(context); // sign_m_811 | sign_m_812}return false; // 断路器半打开:不通过}// sign_m_811 判断是否超过熔断时长protected boolean retryTimeoutArrived() {return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;}// sign_m_812 尝试半打开处理protected boolean fromOpenToHalfOpen(Context context) {if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {notifyObservers(State.OPEN, State.HALF_OPEN, null);Entry entry = context.getCurEntry();entry.whenTerminate(new BiConsumer<Context, Entry>() { // 添加 entry.exit() 回调@Overridepublic void accept(Context context, Entry entry) { // 在 entry.exit() 被调用if (entry.getBlockError() != null) { // 尝试请求时出错currentState.compareAndSet(State.HALF_OPEN, State.OPEN); // 重新打开notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);}}});return true; // 只让一个线程 (且只进行一次) 处理}return false; // 被其他线程抢占 (不通过)}
com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.ExceptionCircuitBreaker
java"> // sign_m_830 断路器计数与状态变更处理@Overridepublic void onRequestComplete(Context context) {Entry entry = context.getCurEntry();if (entry == null) {return;}Throwable error = entry.getError();SimpleErrorCounter counter = stat.currentWindow().value();if (error != null) {counter.getErrorCount().add(1); // 有异常,添加异常计数}counter.getTotalCount().add(1); // 添加请求 (总) 计数handleStateChangeWhenThresholdExceeded(error); // sign_m_831}// sign_m_831 状态变更处理private void handleStateChangeWhenThresholdExceeded(Throwable error) {... // 当前状态为打开,则返回if (currentState.get() == State.HALF_OPEN) {// 当前为半打开状态if (error == null) {fromHalfOpenToClose(); // 无异常,则关闭} else {fromHalfOpenToOpen(1.0d); // 有异常,则继续打开}return;}List<SimpleErrorCounter> counters = stat.values();long errCount = 0;long totalCount = 0;for (SimpleErrorCounter counter : counters) {errCount += counter.errorCount.sum();totalCount += counter.totalCount.sum();}if (totalCount < minRequestAmount) {return; // 小于最小请求数,不处理}double curCount = errCount;if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {curCount = errCount * 1.0d / totalCount; // 使用比率计算}if (curCount > threshold) {transformToOpen(curCount); // 超过阈值,开启熔断 (并设置开始重试的时间戳)}}
总结
- 权限 (内置) 2 种规则 (但只支持设置 1 种,黑白名单互斥)
- 流控 (内置) 有 4 (也可说 5) 种规则 (QPS 或线程数、速率、慢热 QPS、慢热速率)
- 熔断 (内置) 有 2 (也可说 3) 种规则 (异常数或比率、响应时长)