【Hystrix技术指南】(7)故障切换的运作流程原理分析(含源码)

news/2024/11/16 18:45:45/

背景介绍

目前对于一些非核心操作,如增减库存后保存操作日志发送异步消息时(具体业务流程),一旦出现MQ服务异常时,会导致接口响应超时,因此可以考虑对非核心操作引入服务降级、服务隔离。

Hystrix说明

官方文档

Hystrix是Netflix开源的一个容灾框架,解决当外部依赖故障时拖垮业务系统、甚至引起雪崩的问题。

为什么需要Hystrix?

  • 在大中型分布式系统中,通常系统很多依赖(HTTP,hession,Netty,Dubbo等),在高并发访问下,这些依赖的稳定性与否对系统的影响非常大,但是依赖有很多不可控问题:如网络连接缓慢,资源繁忙,暂时不可用,服务脱机等。
  • *当依赖阻塞时,大多数服务器的线程池就出现阻塞(BLOCK),影响整个线上服务的稳定性,在复杂的分布式架构的应用程序有很多的依赖,都会不可避免地在某些时候失败。高并发的依赖失败时如果没有隔离措施,当前应用服务就有被拖垮的风险。
例如:一个依赖30SOA服务的系统,每个服务99.9999.990.3换算成时间大约每月有2个小时服务不稳定.随着服务依赖数量的变多,服务不稳定的概率会成指数性提高.解决问题方案:对依赖做隔离。
复制代码

Hystrix设计理念

想要知道如何使用,必须先明白其核心设计理念,Hystrix基于命令模式,通过UML图先直观的认识一下这一设计模式

  • 可见,Command是在 ReceiverInvoker之间添加的中间层, Command实现了对Receiver的封装
  • API既可以是Invoker又可以是reciever,通过继承Hystrix核心类HystrixCommand来封装这些API(例如,远程接口调用,数据库查询之类可能会产生延时的操作)
  • *就可以为API提供弹性保护了。

Hystrix如何解决依赖隔离

  1. Hystrix使用命令模式HystrixCommand(Command)包装依赖调用逻辑,每个命令在单独线程中/信号授权下执行。
  2. 可配置依赖调用超时时间,超时时间一般设为比99.5%平均时间略高即可。当调用超时时,直接返回或执行fallback逻辑。
  3. 为每个依赖提供一个小的线程池(或信号),如果线程池已满调用将被立即拒绝,默认不采用排队,加速失败判定时间。
  4. 依赖调用结果分,成功,失败(抛出异常),超时,线程拒绝,短路。 请求失败(异常,拒绝,超时,短路)时执行fallback(降级)逻辑。
  5. 提供熔断器组件,可以自动运行或手动调用,停止当前依赖一段时间(10秒),熔断器默认错误率阈值为50%,超过将自动运行
  6. 提供近实时依赖的统计和监控

Hystrix流程结构解析

流程说明:

  1. 每次调用构建HystrixCommand或者HystrixObservableCommand对象,把依赖调用封装在run()方法中.
  2. 结果是否有缓存如果没有执行execute()/queue做sync或async调用,对应真正的run()/construct()
  3. 判断熔断器(circuit-breaker)是否打开,如果打开跳到步骤8,进行降级策略,如果关闭进入步骤.
  4. 判断线程池/队列/信号量是否跑满,如果跑满进入降级步骤8,否则继续后续步骤.
  5. 使用HystrixObservableCommand.construct()还是HystrixCommand.run(),运行依赖逻辑
  6. 依赖逻辑调用超时,进入步骤8
  7. 判断逻辑是否调用成功
  • 6a 返回成功调用结果
  • 6b 调用出错,进入步骤8.
  1. 计算熔断器状态,所有的运行状态(成功, 失败, 拒绝,超时)上报给熔断器,用于统计从而判断熔断器状态.
  2. getFallback()降级逻辑. a. 没有实现getFallback的Command将直接抛出异常 b. fallback降级逻辑调用成功直接返回 c. 降级逻辑调用失败抛出异常
  3. 返回执行成功结果

以下四种情况将触发getFallback调用:

  1. run()方法抛出非HystrixBadRequestException异常
  2. run()方法调用超时
  3. 熔断器开启短路调用
  4. 线程池/队列/信号量是否跑满

熔断器:Circuit Breaker

每个熔断器默认维护10个bucket,每秒一个bucket,每个bucket记录成功,失败,超时,拒绝的状态,默认错误超过50%且10秒内超过20个请求进行中断短路。

Hystrix隔离分析

Hystrix隔离方式采用线程/信号的方式,通过隔离限制依赖的并发量和阻塞扩散.

线程隔离

  • 执行依赖代码的线程与请求线程(如:jetty线程)分离,请求线程可以自由控制离开的时间(异步过程)。
  • 通过线程池大小可以控制并发量,当线程池饱和时可以提前拒绝服务,防止依赖问题扩散。
  • *线上建议线程池不要设置过大,否则大量堵塞线程有可能会拖慢服务器。
实际案例:

Netflix公司内部认为线程隔离开销足够小,不会造成重大的成本或性能的影响。Netflix 内部API 每天100亿的HystrixCommand依赖请求使用线程隔,每个应用大约40多个线程池,每个线程池大约5-20个线程。

信号隔离

信号隔离也可以用于限制并发访问,防止阻塞扩散, 与线程隔离最大不同在于执行依赖代码的线程依然是请求线程(该线程需要通过信号申请),如果客户端是可信的且可以快速返回,可以使用信号隔离替换线程隔离,降低开销。

信号量的大小可以动态调整, 线程池大小不可以。

线程隔离与信号隔离区别如下图:

fallback故障切换降级机制

有兴趣的小伙伴可以看看:官方参考文档

源码分析

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java

executeCommandAndObserve
private Observable executeCommandAndObserve(final AbstractCommand _cmd) {final Func1> handleFallback = new Func1>() {public Observable call(Throwable t) {circuitBreaker.markNonSuccess();Exception e = getExceptionFromThrowable(t);executionResult = executionResult.setExecutionException(e);if (e instanceof RejectedExecutionException) {return handleThreadPoolRejectionViaFallback(e);} else if (t instanceof HystrixTimeoutException) {return handleTimeoutViaFallback();} else if (t instanceof HystrixBadRequestException) {return handleBadRequestByEmittingError(e);} else {if (e instanceof HystrixBadRequestException) {eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);return Observable.error(e);}return handleFailureViaFallback(e);}}};Observable execution;if (properties.executionTimeoutEnabled().get()) {execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator(_cmd));} else {execution = executeCommandWithSpecifiedIsolation(_cmd);}return execution.doOnNext(markEmits).doOnCompleted(markOnCompleted).onErrorResumeNext(handleFallback).doOnEach(setRequestContext);}
复制代码

使用Observable的onErrorResumeNext,里头调用了handleFallback,handleFallback中区分不同的异常来调用不同的fallback。

  • RejectedExecutionException调用handleThreadPoolRejectionViaFallback
  • HystrixTimeoutException调用handleTimeoutViaFallback
  • *非HystrixBadRequestException的调用handleFailureViaFallback
applyHystrixSemantics
    private Observable applyHystrixSemantics(final AbstractCommand _cmd) {executionHook.onStart(_cmd);if (circuitBreaker.attemptExecution()) {final TryableSemaphore executionSemaphore = getExecutionSemaphore();final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);final Action0 singleSemaphoreRelease = new Action0() {public void call() {if (semaphoreHasBeenReleased.compareAndSet(false, true)) {executionSemaphore.release();}}};final Action1 markExceptionThrown = new Action1() {public void call(Throwable t) {eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);}};if (executionSemaphore.tryAcquire()) {try {executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());return executeCommandAndObserve(_cmd).doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);} catch (RuntimeException e) {return Observable.error(e);}} else {return handleSemaphoreRejectionViaFallback();}} else {return handleShortCircuitViaFallback();}}
复制代码
  • applyHystrixSemantics方法针对executionSemaphore.tryAcquire()没通过的调用
  • handleSemaphoreRejectionViaFallback
  • applyHystrixSemantics方法针对circuitBreaker.attemptExecution()没通过的调用handleShortCircuitViaFallback()
ViaFallback方法
    private Observable handleSemaphoreRejectionViaFallback() {Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution");executionResult = executionResult.setExecutionException(semaphoreRejectionException);eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey);logger.debug("HystrixCommand Execution Rejection by Semaphore.");return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,"could not acquire a semaphore for execution", semaphoreRejectionException);}private Observable handleShortCircuitViaFallback() {eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey);Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN");executionResult = executionResult.setExecutionException(shortCircuitException);try {return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,"short-circuited", shortCircuitException);} catch (Exception e) {return Observable.error(e);}}private Observable handleThreadPoolRejectionViaFallback(Exception underlying) {eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);threadPool.markThreadRejection();return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "could not be queued for execution", underlying);}private Observable handleTimeoutViaFallback() {return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());}private Observable handleFailureViaFallback(Exception underlying) {logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", underlying);eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);executionResult = executionResult.setException(underlying);return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);}
复制代码
  • handleSemaphoreRejectionViaFallback、handleShortCircuitViaFallback、handleThreadPoolRejectionViaFallback、handleTimeoutViaFallback、handleFailureViaFallback这几个方法调用了getFallbackOrThrowException
  • 其eventType分别是SEMAPHORE_REJECTED、SHORT_CIRCUITED、THREAD_POOL_REJECTED、TIMEOUT、FAILURE
  • AbstractCommand.getFallbackOrThrowException

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java

private Observable getFallbackOrThrowException(final AbstractCommand _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();executionResult = executionResult.addEvent((int) latency, eventType);if (isUnrecoverable(originalException)) {logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException);Exception e = wrapWithOnErrorHook(failureType, originalException);return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null));} else {if (isRecoverableError(originalException)) {logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);}if (properties.fallbackEnabled().get()) {final Action1super R>> setRequestContext = new Action1super R>>() {public void call(Notificationsuper R> rNotification) {setRequestContextIfNeeded(requestContext);}};final Action1 markFallbackEmit = new Action1() {public void call(R r) {if (shouldOutputOnNextEvents()) {executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT);eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey);}}};final Action0 markFallbackCompleted = new Action0() {public void call() {long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey);executionResult = executionResult.addEvent((int) latency,HystrixEventType.FALLBACK_SUCCESS);}};final Func1> handleFallbackError = new Func1>() {public Observable call(Throwable t) {Exception e = wrapWithOnErrorHook(failureType, originalException);Exception fe = getExceptionFromThrowable(t);long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();Exception toEmit;if (fe instanceof UnsupportedOperationException) {logger.debug("No fallback for HystrixCommand. ", fe);eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe);} else {logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe);}if (shouldNotBeWrapped(originalException)) {return Observable.error(e);}return Observable.error(toEmit);}};final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);final Action0 singleSemaphoreRelease = new Action0() {public void call() {if (semaphoreHasBeenReleased.compareAndSet(false, true)) {fallbackSemaphore.release();}}};Observable fallbackExecutionChain;if (fallbackSemaphore.tryAcquire()) {try {if (isFallbackUserDefined()) {executionHook.onFallbackStart(this);fallbackExecutionChain = getFallbackObservable();} else {fallbackExecutionChain = getFallbackObservable();}} catch (Throwable ex) {fallbackExecutionChain = Observable.error(ex);}return fallbackExecutionChain.doOnEach(setRequestContext).lift(new FallbackHookApplication(_cmd)).lift(new DeprecatedOnFallbackHookApplication(_cmd)).doOnNext(markFallbackEmit).doOnCompleted(markFallbackCompleted).onErrorResumeNext(handleFallbackError).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);} else {return handleFallbackRejectionByEmittingError();}} else {return handleFallbackDisabledByEmittingError(originalException, failureType, message);}}}
复制代码
  • fallbackExecutionChain的onErrorResumeNext,调用了handleFallbackError
  • fallbackExecutionChain的doOnCompleted,调用了markFallbackCompleted
  • AbstractCommand.getFallbackSemaphore

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java

protected TryableSemaphore getFallbackSemaphore() {if (fallbackSemaphoreOverride == null) {TryableSemaphore _s = fallbackSemaphorePerCircuit.get(commandKey.name());if (_s == null) {fallbackSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.fallbackIsolationSemaphoreMaxConcurrentRequests()));return fallbackSemaphorePerCircuit.get(commandKey.name());} else {return _s;}} else {return fallbackSemaphoreOverride;}}
复制代码

针对每个commandKey获取或创建TryableSemaphoreActual

fallback源码分析小结

hystrix的fallback主要分为5种类型:

  • SEMAPHORE_REJECTED对应handleSemaphoreRejectionViaFallback
  • SHORT_CIRCUITED对应handleShortCircuitViaFallback
  • THREAD_POOL_REJECTED对应handleThreadPoolRejectionViaFallback
  • TIMEOUT对应handleTimeoutViaFallback
  • FAILURE对应handleFailureViaFallback
  • 这几个方法最后都调用了getFallbackOrThrowException方法。

分享资源

资源分享
获取以上资源请访问开源项目 点击跳转


http://www.ppmy.cn/news/1022654.html

相关文章

TPS(C++)字符匹配

TPS(C++)字符匹配 C++ 中 char [] 和Char*相等的值却不相等 在C++中,char[] 和char* 都可以表示一个字符串,但是它们的类型不同,因此在比较时可能会出现不相等的情况。 char[] 是一个字符数组,它在内存中有自己的一块空间,存储字符串的每个字符,以及一个终止符’\0’。…

C语言——自定义类型详解[结构体][枚举][联合体]

自定义类型详解 前言:一、结构体1.1结构体的声明1.2结构体内存对齐1.3位段(位域) 二、枚举2.1枚举类型的定义2.2枚举类型的优点2.3枚举的使用 三、联合体3.1联合体类型的定义3.2联合体的特点3.3联合体大小的计算 前言: 我打算把结…

Flutter参考资料

Flutter 官网 : https://flutter.dev/ Flutter 插件下载地址 : https://pub.dev/packages Flutter 开发文档 : https://flutter.cn/docs ( 强烈推荐 ) 官方 GitHub 地址 : https://github.com/flutter Flutter 中文社区 : https://flutter.cn/ Flutter 实用教程 : https://flut…

实现可编辑下拉框

<div class"form-group mar_r10"><label for"user_name">部门名称&#xff1a;</label><input name"deptName" id"dept_name" type"text" style"position:absolute;width:120px;height:26px; ma…

飞凌嵌入式「国产」嵌入式核心板大盘点(三)——龙芯中科、赛昉科技

为了帮助各位工程师朋友详细了解飞凌嵌入式推出的“国产化”产品&#xff0c;小编专门开设了「国产平台大盘点专题」。上周&#xff0c;已经带大家盘点了飞凌嵌入式联合瑞芯微电子和全志科技两个国产处理器品牌打造的平台&#xff0c;今天&#xff0c;将继续为大家介绍龙芯和赛…

MySQL多表连接查询

目录 表结构 创建表 表数据插入 查询需求 1.找出销售部门中年纪最大的员工的姓名 2.求财务部门最低工资的员工姓名 3.列出每个部门收入总和高于9000的部门名称 4.求工资在7500到8500元之间&#xff0c;年龄最大的人的姓名及部门 5.找出销售部门收入最低的员工入职时间…

关于@JSONField的使用

1.此注解来自jar包com.alibaba.fastjson 今天分享一个有意思的事情。这个注解作用与类的属性上&#xff0c;如下&#xff1a; ApiModelProperty(value"开始时间,格式:yyyy-MM-dd",required true) JSONField(name"start_date",ordinal 1) private String…

OpenText 企业安全 调查 产品简介

关于OpenText OpenText是一家信息软件公司&#xff0c;使企业能够通过市场领先的信息管理解决方案&#xff08;内部或云中&#xff09;获得洞察力。 全球面临的数字风险 市场合力驱动的信息管理 处于风暴中心的信息 →安全漏洞和数据保护 • 防止威胁并将破坏影响降至最低 …