本文属于sentinel学习笔记系列。网上看到吴就业老师的专栏,原文地址如下:
https://blog.csdn.net/baidu_28523317/category_10400605.html
上一篇梳理了概念与核心类:Sentinel 学习笔记2- 概念与核心类介绍-CSDN博客
补一个点:
Sph: 定义了获取资源访问权的接口, 代表性接口: Entry entry = sph.entry(String name);
CtSph: Sph接口的默认实现, 负责校验流控规则, 当校验不通过时抛出BlockException异常
SphU: 工具类, 封装了CtSph的调用, 未做特殊处理, 当校验不通过时抛出BlockException异常
SphO: 工具类, 封装了CtSph的调用, 与SphU不同的是, 校验不通过时不会抛出异常, 而是return false;
责任链
当执行到 SphU.entry
接口时,就到了 Sentinel 的核心骨架--ProcessorSlotChain,将不同的 Slot 按照顺序串在一起(责任链模式),从而将不同的功能(限流、降级、系统保护)组合在一起。slot chain 其实可以分为两部分:统计数据构建部分(statistic)和判断部分(rule checking)。
其中:辅助资源指标数据统计的 ProcessorSlot
NodeSelectorSlot、ClusterBuilderSlot、StatisticSlot
判断部分:
- AuthoritySlot:实现黑白名单降级
- SystemSlot:实现系统自适应降级
- FlowSlot:实现限流降级
- DegradeSlot:实现熔断降级
核心结构如下图所示
- NodeSelectorSlot:给当前请求(context)绑定一个DefaultNode,如果不存在则根据 context 创建 DefaultNode,然后维护整个调用树的父子关系,然后将相关数据保存在 Context 中。
- ClusterBuilderSlot:给当前请求(context)绑定一个ClusterNode, 如果不存在首先根据 resourceName 创建 ClusterNode,并将其引用保存在 defaultNode中,然后再根据 origin 创建来源节点(类型为 StatisticNode),并将源节点保存在当前的调用点 Entry 中。一个资源(resource)的所有请求共享同一个ClusterNode。
- StatisticSlot:
StatisticSlot
是 Sentinel 最为重要的类之一,用于根据规则判断结果进行相应的统计操作,先调用后续 Slot 的检查过程,如果检查通过增加各个维度的计数器。- entry 过程:依次执行后面的判断 slot。每个 slot 触发流控的话会抛出异常(BlockException 的子类)。若有 BlockException 抛出,则记录 block 数据;若无异常抛出则算作可通过(pass),记录 pass 数据。
- exit 过程:若无 error(无论是业务异常还是流控异常),记录 complete(success)以及 RT,线程数-1。
Sentinel 的整体工具流程就是使用责任链模式将所有的 ProcessorSlot 按照一定的顺序串成一个单向链表。比如辅助完成资源指标数据统计的 ProcessorSlot 的排序顺序为:
NodeSelectorSlot->ClusterBuilderSlot->StatisticSlot
责任链构建
实现将 ProcessorSlot 串成一个单向链表的是 ProcessorSlotChain,这个 ProcessorSlotChain 是由 SlotChainBuilder 构造的,代码如下:
@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {@Overridepublic ProcessorSlotChain build() {ProcessorSlotChain chain = new DefaultProcessorSlotChain();//sortedSlotList获取所有的处理器对象List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();for (ProcessorSlot slot : sortedSlotList) {if (!(slot instanceof AbstractLinkedProcessorSlot)) {RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");continue;}//通过尾添法将职责slot添加到DefaultProcessorSlotChain当中chain.addLast((AbstractLinkedProcessorSlot<?>) slot);}return chain;}
}
- ProcessorSlotChain作为Slot的责任链,负责责任链的构建和执行。
- 责任链上的处理器对象 AbstractLinkedProcessorSlot 通过保存指向下一个处理器的对象的进行关联,整体以链表的形式进行串联。
处理器
ProcessorSlot 比较抽象,前面概念说了Entry表示一个资源的访问权, 通过Sph接口获取,这里的
fireEntry是获取访问权成功后的处理,具体的接口的定义如下:
public interface ProcessorSlot<T> {/*** Entrance of this slot.* 入口方法* @param context current {@link Context}* @param resourceWrapper current resource* @param param generics parameter, usually is a {@link com.alibaba.csp.sentinel.node.Node}* @param count tokens needed* @param prioritized whether the entry is prioritized* @param args parameters of the original call* @throws Throwable blocked exception or unexpected error*/void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,Object... args) throws Throwable;/*** Means finish of {@link #entry(Context, ResourceWrapper, Object, int, boolean, Object...)}.* 调用下一个 ProcessorSlot#entry 方法* @param context current {@link Context}* @param resourceWrapper current resource* @param obj relevant object (e.g. Node)* @param count tokens needed* @param prioritized whether the entry is prioritized* @param args parameters of the original call* @throws Throwable blocked exception or unexpected error*/void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,Object... args) throws Throwable;/*** Exit of this slot.* 出口方法* @param context current {@link Context}* @param resourceWrapper current resource* @param count tokens needed* @param args parameters of the original call*/void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);/*** Means finish of {@link #exit(Context, ResourceWrapper, int, Object...)}.* 调用下一个 ProcessorSlot#exit 方法* @param context current {@link Context}* @param resourceWrapper current resource* @param count tokens needed* @param args parameters of the original call*/void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}
方法参数解析:
- context:当前调用链路上下文。
- resourceWrapper:资源 ID。
- param:泛型参数,一般用于传递 DefaultNode。
- count:表示申请占用共享资源的数量,只有申请到足够的共享资源才能继续执行。
- prioritized:表示是否对请求进行优先级排序,SphU#entry 传递过来的值是 false。
- args:调用方法传递的参数,用于实现热点参数限流。
之所以能够将所有的 ProcessorSlot 构造成一个 ProcessorSlotChain,还是依赖这些 ProcessorSlot 继承了 AbstractLinkedProcessorSlot 类。
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {//当前节点的下一个节点private AbstractLinkedProcessorSlot<?> next = null;@Overridepublic void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)throws Throwable {if (next != null) { // 触发下一个处理器对象的处理next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);}}@SuppressWarnings("unchecked")void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)throws Throwable {T t = (T)o;// 执行具体处理器的逻辑,由具体的处理器自行实现entry(context, resourceWrapper, t, count, prioritized, args);}@Overridepublic void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {if (next != null) {//调用下一个 ProcessorSlot 的 exit 方法next.exit(context, resourceWrapper, count, args);}}public AbstractLinkedProcessorSlot<?> getNext() {return next;}public void setNext(AbstractLinkedProcessorSlot<?> next) {this.next = next; // 绑定下一个处理器的逻辑}}
ProcessorSlotChain 也继承 AbstractLinkedProcessorSlot,多了2个方法
public abstract class ProcessorSlotChain extends AbstractLinkedProcessorSlot<Object> {/*** Add a processor to the head of this slot chain.* 添加到链表的头节点* @param protocolProcessor processor to be added.*/public abstract void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor);/*** Add a processor to the tail of this slot chain.* 添加到链表末尾* @param protocolProcessor processor to be added.*/public abstract void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor);
}
ProcessorSlotChain 的默认实现类是 DefaultProcessorSlotChain,DefaultProcessorSlotChain 有一个指向链表头节点的 first 字段和一个指向链表尾节点的 end 字段。
public class DefaultProcessorSlotChain extends ProcessorSlotChain {// first,指向链表头节点AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)throws Throwable {super.fireEntry(context, resourceWrapper, t, count, prioritized, args);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {super.fireExit(context, resourceWrapper, count, args);}};//尾节点AbstractLinkedProcessorSlot<?> end = first;@Overridepublic void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {protocolProcessor.setNext(first.getNext());first.setNext(protocolProcessor);if (end == first) {end = protocolProcessor;}}@Overridepublic void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {end.setNext(protocolProcessor);end = protocolProcessor;}
说明:
- Sentinel 中的 Slot 需要实现 com.alibaba.csp.sentinel.slotchain.ProcessorSlot 的通用接口。
- 自定义 Slot 一般继承抽象类 AbstractLinkedProcessorSlot 且只要改写 entry/exit 方法实现自定义逻辑。
- Slot 通过 next 变量保存下一个处理器Slot对象。
- 在自定义实现的 entry 方法中需要通过 fireEntry 触发下一个处理器的执行,在 exit 方法中通过fireExit 触发下一个处理器的执行。
责任链执行
com.alibaba.csp.sentinel.CtSph#entryWithPriority()
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)throws BlockException {Context context = ContextUtil.getContext();if (context instanceof NullContext) {// The {@link NullContext} indicates that the amount of context has exceeded the threshold,// so here init the entry only. No rule checking will be done.return new CtEntry(resourceWrapper, null, context);}if (context == null) {// Using default context.context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);}// Global switch is close, no rule checking will do.if (!Constants.ON) {return new CtEntry(resourceWrapper, null, context);}//开始构造chainProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);/** Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},* so no rule checking will be done.*/if (chain == null) {return new CtEntry(resourceWrapper, null, context);}Entry e = new CtEntry(resourceWrapper, chain, context);try {// 驱动责任链上的第一个处理器,进而由处理器自驱动执行下一个处理器chain.entry(context, resourceWrapper, null, count, prioritized, args);} catch (BlockException e1) {e.exit(count, args);throw e1;} catch (Throwable e1) {// This should not happen, unless there are errors existing in Sentinel internal.RecordLog.info("Sentinel unexpected exception", e1);}return e;}
说明:
- 整个责任链上处理器的执行通过Invoker对象的驱动,而非责任链对象的驱动。
- DefaultProcessorSlotChain的entry首先头部对象first,进而触发处理器的自驱实现处理器的执行。
- 整体按照entry → fireEntry → transformEntry → entry 的循环顺序依次触发处理器的自驱。