0. 像喝点东西,但不知道喝什么
先来段源码,看一下 我们在dashboard 录入的降级规则,都映射到哪些字段上
package com.alibaba.csp.sentinel.slots.block.degrade;public class DegradeRule extends AbstractRule {public DegradeRule(String resourceName) {setResource(resourceName);}/*** Circuit breaking strategy (0: average RT, 1: exception ratio, 2: exception count).*/private int grade = RuleConstant.DEGRADE_GRADE_RT;/*** Threshold count.*/private double count;/*** Recovery timeout (in seconds) when circuit breaker opens. After the timeout, the circuit breaker will* transform to half-open state for trying a few requests.*/private int timeWindow;/*** Minimum number of requests (in an active statistic time span) that can trigger circuit breaking.** @since 1.7.0*/private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;/*** The threshold of slow request ratio in RT mode.*/private double slowRatioThreshold = 1.0d;private int statIntervalMs = 1000;
}
1. sentinel 的断路器实现
- 效果跟 netflix.hystrix 差不离
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;public abstract class AbstractCircuitBreaker implements CircuitBreaker {protected final DegradeRule rule;protected final int recoveryTimeoutMs;private final EventObserverRegistry observerRegistry;protected final AtomicReference<State> currentState = new AtomicReference<>(State.CLOSED);protected volatile long nextRetryTimestamp;public AbstractCircuitBreaker(DegradeRule rule) {this(rule, EventObserverRegistry.getInstance());}AbstractCircuitBreaker(DegradeRule rule, EventObserverRegistry observerRegistry) {AssertUtil.notNull(observerRegistry, "observerRegistry cannot be null");if (!DegradeRuleManager.isValidRule(rule)) {throw new IllegalArgumentException("Invalid DegradeRule: " + rule);}this.observerRegistry = observerRegistry;this.rule = rule;this.recoveryTimeoutMs = rule.getTimeWindow() * 1000;}// true, 成功获得令牌@Overridepublic boolean tryPass(Context context) {// 断路器关闭// Template implementation.if (currentState.get() == State.CLOSED) {return true;}// 断路器开启if (currentState.get() == State.OPEN) {// 半开状态,允许通过1个请求来尝试,可行的话,即 true// For half-open state we allow a request for probing.return retryTimeoutArrived() && fromOpenToHalfOpen(context);}return false;}// 当前系统时间 >= 下一次重试时间protected boolean retryTimeoutArrived() {return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;}protected boolean fromOpenToHalfOpen(Context context) {if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {// 通知订阅者: 状态的变化 开 -> 半开notifyObservers(State.OPEN, State.HALF_OPEN, null);Entry entry = context.getCurEntry();// 过程中断时的回调,回滚状态entry.whenTerminate(new BiConsumer<Context, Entry>() {@Overridepublic void accept(Context context, Entry entry) {// Note: This works as a temporary workaround for https://github.com/alibaba/Sentinel/issues/1638// Without the hook, the circuit breaker won't recover from half-open state in some circumstances// when the request is actually blocked by upcoming rules (not only degrade rules).if (entry.getBlockError() != null) {// Fallback to OPEN due to detecting request is blockedcurrentState.compareAndSet(State.HALF_OPEN, State.OPEN);notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);}}});return true;}return false;}private void notifyObservers(CircuitBreaker.State prevState, CircuitBreaker.State newState, Double snapshotValue) {for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {observer.onStateChange(prevState, newState, rule, snapshotValue);}}
}
从 DegradeRule.grade 可知道:
- 默认的降级策略,即 响应时长
- 除此之外,还支持 异常 的触发方式
下面分别借助两个 AbstractCircuitBreaker 的实现类来说明实现细节
1.1 ResponseTimeCircuitBreaker(RT)
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;// 根据请求的响应时间(慢调用比例)
public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker {private static final double SLOW_REQUEST_RATIO_MAX_VALUE = 1.0d;private final long maxAllowedRt;private final double maxSlowRequestRatio;private final int minRequestAmount;private final LeapArray<SlowRequestCounter> slidingCounter;public ResponseTimeCircuitBreaker(DegradeRule rule) {this(rule, new SlowRequestLeapArray(1, rule.getStatIntervalMs()));}ResponseTimeCircuitBreaker(DegradeRule rule, LeapArray<SlowRequestCounter> stat) {super(rule);AssertUtil.isTrue(rule.getGrade() == RuleConstant.DEGRADE_GRADE_RT, "rule metric type should be RT");AssertUtil.notNull(stat, "stat cannot be null");this.maxAllowedRt = Math.round(rule.getCount());this.maxSlowRequestRatio = rule.getSlowRatioThreshold();this.minRequestAmount = rule.getMinRequestAmount();this.slidingCounter = stat;}@Overridepublic void onRequestComplete(Context context) {SlowRequestCounter counter = slidingCounter.currentWindow().value();Entry entry = context.getCurEntry();if (entry == null) {return;}long completeTime = entry.getCompleteTimestamp();if (completeTime <= 0) {completeTime = TimeUtil.currentTimeMillis();}long rt = completeTime - entry.getCreateTimestamp();if (rt > maxAllowedRt) {counter.slowCount.add(1);}counter.totalCount.add(1);handleStateChangeWhenThresholdExceeded(rt);}private void handleStateChangeWhenThresholdExceeded(long rt) {if (currentState.get() == State.OPEN) {return;}if (currentState.get() == State.HALF_OPEN) {// In detecting request// TODO: improve logic for half-open recoveryif (rt > maxAllowedRt) {fromHalfOpenToOpen(1.0d);} else {fromHalfOpenToClose();}return;}List<SlowRequestCounter> counters = slidingCounter.values();long slowCount = 0;long totalCount = 0;for (SlowRequestCounter counter : counters) {slowCount += counter.slowCount.sum();totalCount += counter.totalCount.sum();}if (totalCount < minRequestAmount) {return;}double currentRatio = slowCount * 1.0d / totalCount;if (currentRatio > maxSlowRequestRatio) {transformToOpen(currentRatio);}if (Double.compare(currentRatio, maxSlowRequestRatio) == 0 &&Double.compare(maxSlowRequestRatio, SLOW_REQUEST_RATIO_MAX_VALUE) == 0) {transformToOpen(currentRatio);}}static class SlowRequestCounter {private LongAdder slowCount;private LongAdder totalCount;}
}
1.2 ExceptionCircuitBreaker
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;// 策略:异常比例、异常数
public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {private final int strategy; // 策略的枚举值private final int minRequestAmount; // 最小请求数private final double threshold; // 设置的阈值private final LeapArray<SimpleErrorCounter> stat;public ExceptionCircuitBreaker(DegradeRule rule) {this(rule, new SimpleErrorCounterLeapArray(1, rule.getStatIntervalMs()));}ExceptionCircuitBreaker(DegradeRule rule, LeapArray<SimpleErrorCounter> stat) {super(rule);this.strategy = rule.getGrade();boolean modeOk = strategy == DEGRADE_GRADE_EXCEPTION_RATIO || strategy == DEGRADE_GRADE_EXCEPTION_COUNT;AssertUtil.isTrue(modeOk, "rule strategy should be error-ratio or error-count");AssertUtil.notNull(stat, "stat cannot be null");this.minRequestAmount = rule.getMinRequestAmount();this.threshold = rule.getCount();this.stat = stat;}@Overridepublic void onRequestComplete(Context context) {Entry entry = context.getCurEntry();if (entry == null) {return;}Throwable error = entry.getError();SimpleErrorCounter counter = stat.currentWindow().value();// 异常发生了,累加if (error != null) {counter.getErrorCount().add(1);}// 总数(异常+非异常),同样累加counter.getTotalCount().add(1);// step into ...handleStateChangeWhenThresholdExceeded(error);}private void handleStateChangeWhenThresholdExceeded(Throwable error) {// 断路器早已启动? 好吧,后面不用看了if (currentState.get() == State.OPEN) {return;}// 半开状态,试探一下if (currentState.get() == State.HALF_OPEN) {// In detecting requestif (error == null) {fromHalfOpenToClose();} else {fromHalfOpenToOpen(1.0d);}return;}// 把这个异常计数传播到整个时间窗(LeapArray)的计数器中List<SimpleErrorCounter> counters = stat.values();long errCount = 0;long totalCount = 0;for (SimpleErrorCounter counter : counters) {errCount += counter.errorCount.sum();totalCount += counter.totalCount.sum();}// 虽然有异常,但是比配置的最小请求数还小,那不需要使用断路器if (totalCount < minRequestAmount) {return;}double curCount = errCount;// 如果策略是:按照异常率的话,计算概率if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {// Use errorRatiocurCount = errCount * 1.0d / totalCount;}// 这里开启断路器if (curCount > threshold) {transformToOpen(curCount);}}static class SimpleErrorCounter {private LongAdder errorCount;private LongAdder totalCount;public SimpleErrorCounter() {this.errorCount = new LongAdder();this.totalCount = new LongAdder();}public LongAdder getErrorCount() {return errorCount;}public LongAdder getTotalCount() {return totalCount;}public SimpleErrorCounter reset() {errorCount.reset();totalCount.reset();return this;}@Overridepublic String toString() {return "SimpleErrorCounter{" +"errorCount=" + errorCount +", totalCount=" + totalCount +'}';}}
}