Ribbon 负载均衡策略 —— 图解、源码级解析

news/2024/10/18 12:27:58/

文章目录

  • 负载均衡策略
    • RandomRule
    • RoundRobinRule
    • RetryRule
    • WeightedResponseTimeRule
    • BestAvailableRule
    • AvailabilityFilteringRule
    • ZoneAvoidanceRule
  • Ribbon 负载均衡策略源码
    • RandomRule源码
    • RoundRobinRule源码
    • BestAvailableRule源码
    • RetryRule源码

通过本文你可以学习到:

  1. 常见的7种负载均衡策略思想
  2. 自旋锁的使用方式
  3. 防御性编程

负载均衡策略

RandomRule

该策略会从当前可用的服务节点中,随机挑选一个节点访问,使用了yield+自旋的方式做重试,还采用了严格的防御性编程。


RoundRobinRule

该策略会从一个节点一步一步地向后选取节点,如下图所示:
在这里插入图片描述
在多线程环境下,两个请求同时访问这个Rule也不会读取到相同节点:这靠的是RandomRobinRule底层的自旋锁+CAS的同步操作。

CAS+自旋锁这套组合技是高并发下最廉价的线程安全手段,因为这套操作不需要锁定系统资源。但缺点是,自旋锁如果迟迟不能释放,将会带来CPU资源的浪费,因为自旋本身并不会执行任何业务逻辑,而是单纯的使CPU空转。所以通常情况下会对自旋锁的旋转次数做一个限制,比如JDK中synchronize底层的锁升级策略,就对自旋次数做了动态调整。

while (true) {// cas操作if (cas(expected, update)) {// 业务逻辑代码// break或退出return}
}

Eureka为了防止服务下线被重复调用,就使用AtomicBoolean的CAS方法做同步控制;

奈飞提供的SpringCloud组件有特别多用到CAS的地方,感兴趣的小伙伴们可以发现一下


RetryRule

RetryRule是一个类似装饰器模式的规则,装饰器相当于一层套一层的套娃,每一层都会加上一层独特的功能。

经典的装饰器模式示意图:
在这里插入图片描述
借助上面的思路,RetryRule就是给其他负载均衡策略加上重试功能。在RetryRule里还藏着一个subRule,这才是真正被执行的负载均衡策略,RetryRule正是要为它添加重试功能(如果初始化时没指定subRule,将默认使用RoundRibinRule)。


WeightedResponseTimeRule

这个规则继承自RoundRibbonRule,他会根据服务节点的响应时间计算权重,响应时间越长权重就越低,响应越快则权重越高,权重的高低决定了机器被选中概率的高低。也就是说,响应时间越小的机器,被选中的概率越大。

服务器刚启动的时候,对各个服务节点采样不足,因此会采用轮询策略,当积累到一定的样本时候,才会切换到WeightedResponseTimeRule模式。


BestAvailableRule

过滤掉故障服务以后,它会基于过去30分钟的统计结果选取当前并发量最小的服务节点作为目标地址。如果统计结果尚未生成,则采用轮询的方式选定节点。


AvailabilityFilteringRule

这个规则底层依赖RandomRobinRule来选取节点,但必须要满足它的最低要求的节点才会被选中。如果节点满足了要求,无论其响应时间或者当前并发量是什么,都会被选中。

每次AvailabilityFilteringRule都会请求RobinRule挑选一个节点,然后对这个节点做以下两步检查:

  1. 是否处于熔断状态
  2. 节点当前的请求连接数超过阈值,超过了则表示节点目前太忙

如果被选中的server挂了,那么AFR会自动重试(最多10次),让RobinRule重新选择一个服务节点


ZoneAvoidanceRule

这个过滤器包含了组合过滤条件,分别是Zone级别和可用性级别。
在这里插入图片描述

  • Zone Filter: Zone可以理解为机房所属的大区域,这里会对这个Zone下面所有的服务节点进行健康情况过滤。

  • 可用性过滤: 这里和AvailabilityFilteringRule的验证过程很像,会过滤掉当前并发量较大,或者处于熔断状态的服务节点。


Ribbon 负载均衡策略源码

RandomRule源码

先从RandomRule看起,核心的方法是:
请添加图片描述

public Server choose(ILoadBalancer lb, Object key) {if (lb == null) {return null;}Server server = null;while (server == null) {if (Thread.interrupted()) {return null;}List<Server> upList = lb.getReachableServers();List<Server> allList = lb.getAllServers();int serverCount = allList.size();if (serverCount == 0) {/** No servers. End regardless of pass, because subsequent passes* only get more restrictive.*/return null;}int index = chooseRandomInt(serverCount);server = upList.get(index);if (server == null) {Thread.yield();continue;}if (server.isAlive()) {return (server);}server = null;Thread.yield();}return server;}

RandomRule里方法的入参key没有用到,所以可以先暂时忽略


while循环逻辑是如果server为空,则找到一个可用的server

if (Thread.interrupted()) {return null;
}

如果线程暂停了,则直接返回空(防御性编程)


List<Server> upList = lb.getReachableServers();
List<Server> allList = lb.getAllServers();

allList存储的是所有的服务,upList存储的是可运行状态的服务


int serverCount = allList.size();
if (serverCount == 0) {return null;
}

服务中心上没有server注册,则返回空


int index = chooseRandomInt(serverCount);
server = upList.get(index);

随机选择一个server


其中,chooseRandomInt的逻辑如下:

protected int chooseRandomInt(int serverCount) {return ThreadLocalRandom.current().nextInt(serverCount);
}

返回0到serverCount中间的任意一个值

java中的随机是可以预测到结果的,真随机数一般会掺杂一些不可预测的数据,比如当前cpu的温度


回到RandomRulechoose方法:

如果发现随机选择的server为空表示此时serverList正在被修正,此时让出线程资源,进行下一次循环,对应最开始的防御性编程

if (server == null) {Thread.yield();continue;
}
if (server.isAlive()) {return (server);
}

如果server可用直接return


server = null;
Thread.yield();

如果不可用则server置为空,下一次循环会选一个新的,最后让出资源。

所以该方法每次进入下一次循环时都会让出线程。


RoundRobinRule源码

接下来看RoundRobinRule

public Server choose(ILoadBalancer lb, Object key) {if (lb == null) {log.warn("no load balancer");return null;}Server server = null;int count = 0;while (server == null && count++ < 10) {List<Server> reachableServers = lb.getReachableServers();List<Server> allServers = lb.getAllServers();int upCount = reachableServers.size();int serverCount = allServers.size();if ((upCount == 0) || (serverCount == 0)) {log.warn("No up servers available from load balancer: " + lb);return null;}int nextServerIndex = incrementAndGetModulo(serverCount);server = allServers.get(nextServerIndex);if (server == null) {Thread.yield();continue;}if (server.isAlive() && (server.isReadyToServe())) {return (server);}server = null;}if (count >= 10) {log.warn("No available alive servers after 10 tries from load balancer: "+ lb);}return server;
}

while循环里面有一个计数器,如果重试10次依然没有结果返回就不重试了。


List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();

reachableServers就是up状态的server


if ((upCount == 0) || (serverCount == 0)) {log.warn("No up servers available from load balancer: " + lb);return null;
}

没有可用服务器则返回空


int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);

选择哪个下标的server,进入incrementAndGetModulo方法


private int incrementAndGetModulo(int modulo) {for (;;) {int current = nextServerCyclicCounter.get();int next = (current + 1) % modulo;if (nextServerCyclicCounter.compareAndSet(current, next))return next;}
}

使用了自旋锁,nextServerCyclicCounter是一个线程安全的数字。


if (server == null) {Thread.yield();continue;
}

如果获取到的server为空则让出资源,继续下一次循环


if (server.isAlive() && (server.isReadyToServe())) {return (server);
}

server是正常的则返回


server = null;

最后没有让出线程资源,因为重试10次后就退出循环了


BestAvailableRule源码

接下来看BestAvailableRule

@Override
public Server choose(Object key) {if (loadBalancerStats == null) {return super.choose(key);}List<Server> serverList = getLoadBalancer().getAllServers();int minimalConcurrentConnections = Integer.MAX_VALUE;long currentTime = System.currentTimeMillis();Server chosen = null;for (Server server: serverList) {ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);if (!serverStats.isCircuitBreakerTripped(currentTime)) {int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);if (concurrentConnections < minimalConcurrentConnections) {minimalConcurrentConnections = concurrentConnections;chosen = server;}}}if (chosen == null) {return super.choose(key);} else {return chosen;}
}

if (loadBalancerStats == null) {return super.choose(key);
}

如果loadBalancerStats为空则调用父类的choose方法,父类方法直接委托给RoundRobinRule来完成choose


for循环里先从loadBalancerStats中获取到当前服务的状态

ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
public ServerStats getSingleServerStat(Server server) {return getServerStats(server);
}
protected ServerStats getServerStats(Server server) {try {return serverStatsCache.get(server);} catch (ExecutionException e) {ServerStats stats = createServerStats(server);serverStatsCache.asMap().putIfAbsent(server, stats);return serverStatsCache.asMap().get(server);}
}

这里是从缓存中获取server的stats,如果获取失败则默认创建一个stats并添加到缓存中,然后从cache中再获取一次。


随后判断是否处于熔断状态

if (!serverStats.isCircuitBreakerTripped(currentTime)) {...}
public boolean isCircuitBreakerTripped(long currentTime) {long circuitBreakerTimeout = getCircuitBreakerTimeout();if (circuitBreakerTimeout <= 0) {return false;}return circuitBreakerTimeout > currentTime;
}

首先获得熔断的TimeOut(表示截止到未来某个时间熔断终止),如果大于当前时间说明处于熔断状态。


熔断的TimeOut由下面方法计算得到:

private long getCircuitBreakerTimeout() {long blackOutPeriod = getCircuitBreakerBlackoutPeriod();if (blackOutPeriod <= 0) {return 0;}return lastConnectionFailedTimestamp + blackOutPeriod;
}

返回上一次连接失败的时间戳 + blackOutPeriod

其中又调用了

private long getCircuitBreakerBlackoutPeriod() {int failureCount = successiveConnectionFailureCount.get();int threshold = connectionFailureThreshold.get();if (failureCount < threshold) {return 0;}int diff = (failureCount - threshold) > 16 ? 16 : (failureCount - threshold);int blackOutSeconds = (1 << diff) * circuitTrippedTimeoutFactor.get();if (blackOutSeconds > maxCircuitTrippedTimeout.get()) {blackOutSeconds = maxCircuitTrippedTimeout.get();}return blackOutSeconds * 1000L;
}

failureCount是失败的个数,从一个计数器里获得,阈值从一个缓存的属性中获得,之后计算两个的差值,再根据缓存中的一些属性计算最终的秒数,最后乘以1000返回。


回到BestAvailableRulechoose方法,只有不处于熔断状态才能继续走后面的流程

if (concurrentConnections < minimalConcurrentConnections) {minimalConcurrentConnections = concurrentConnections;chosen = server;
}

选出连接数最小的服务器


if (chosen == null) {return super.choose(key);
} else {return chosen;
}

最后返回


核心是找到一个最轻松的服务器。


RetryRule源码

查看RetryRule源码:

public Server choose(ILoadBalancer lb, Object key) {long requestTime = System.currentTimeMillis();long deadline = requestTime + maxRetryMillis;Server answer = null;answer = subRule.choose(key);if (((answer == null) || (!answer.isAlive()))&& (System.currentTimeMillis() < deadline)) {InterruptTask task = new InterruptTask(deadline- System.currentTimeMillis());while (!Thread.interrupted()) {answer = subRule.choose(key);if (((answer == null) || (!answer.isAlive()))&& (System.currentTimeMillis() < deadline)) {/* pause and retry hoping it's transient */Thread.yield();} else {break;}}task.cancel();}if ((answer == null) || (!answer.isAlive())) {return null;} else {return answer;}
}
long requestTime = System.currentTimeMillis();
long deadline = requestTime + maxRetryMillis;

先记录当前时间和deadline,在截止时间之前可以一直重试。


answer = subRule.choose(key);

方法里面是由subRule来实现具体的负载均衡逻辑,这里默认类型是RoundRobinRule


如果选到的是空或者选到的不是up的,且时间在ddl之前则进入重试逻辑:

while (!Thread.interrupted()) {answer = subRule.choose(key);if (((answer == null) || (!answer.isAlive()))&& (System.currentTimeMillis() < deadline)) {/* pause and retry hoping it's transient */Thread.yield();} else {break;}
}

如果线程中断了就中断重试。之后重新选择服务器,如果又没选到则把资源让出去,下一次while循环再选,在while循环之前会起一个任务

InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis());

到了截止时间之后,程序会中断重试的流程

task.cancel();

最后返回

if ((answer == null) || (!answer.isAlive())) {return null;
} else {return answer;
}

http://www.ppmy.cn/news/193204.html

相关文章

MyBatis(多表查询,动态SQL的使用)

目录 多表查询 查询文章详情 查询一个用户底下的所有文章 动态SQL的使用 if 标签 trim 标签 where 标签 set 标签 foreach 标签 多表查询 现在有俩张表,一张是文章表,一张是用户表.如下: 查询文章详情 我们现在想查询得到一张表,表里面的内容和文章表大多一致,只是要在…

MMPose关键点检测实战

安装教程 https://github.com/TommyZihao/MMPose_Tutorials/blob/main/2023/0524/%E3%80%90A1%E3%80%91%E5%AE%89%E8%A3%85MMPose.ipynb git clone https://github.com/open-mmlab/mmpose.git -b tutorial2023 -b代表切换到某个分支&#xff0c;保证分支和作者的教程一致 ra…

显卡 1050Ti pytorch 安装

显卡 1050Ti 配置的是cuda 10.1 cuda 安装教程查看 https://blog.csdn.net/weixin_43735353/article/details/107412849 不能安装torch 官网最新 的版本 &#xff0c;需要安装适合 cuda 10的版本 pip install torch1.8.1cu101 torchvision0.9.1cu101 torchaudio0.8.1 -f http…

php主板主要是支持,GTX1050Ti配什么CPU和主板好?适合GTX1050Ti搭配的CPU与主板解答...

伴随着GTX1050Ti显卡正式发布和即将上市,这款显卡是面向电竞游戏市场的旗帜产品,定位千元中端市场。相比于更高端GTX1060显卡,这款新品同样继承新Pascal架构部分特性,使得GTX1050Ti显卡定位显得要更主流亲民不少,这样一来不少网友可能会问,GTX1050Ti配什么CPU和主板呢?如…

archlinux安装nvidia-1050ti闭源驱动教程,亲测

全程root用户运行, 本次的闭源驱动最新版是430 1、安装闭源驱动 $ pacman -S nvidia nvidia-utils nvidia-settings2、查看n卡的BusID $ lspci | egrep VGA|3D 出现如下格式&#xff1a; ---------------------------------------------------------------------- 00:02.0 V…

win10+1050ti安装配置tensorflow-gpu 1.6.0

** anaconda安装配置tensorflow-gpu 1.6.0 1.下载配置anaconda&#xff0c;默认安装就行 2.查看自己电脑查看能支持的最高版本的cuda版本 我自己GTX 1050ti采用的是cuda9.0cudnn7.0 官网中有对应显卡能使用的cuda和cudnn版本&#xff0c;可以自行查看 3.官网上下载cuda和cu…

1050Ti解决csgo打不开、电脑无缘无故蓝屏的终极方法

我大概经历了一年电脑无端蓝屏、csgo和NBA2KOL2等游戏打不开&#xff0c;提示dx11有问题等等问题。 我分析后感觉大概是显卡驱动的问题&#xff0c;所以下载安装过从461开始几乎所有的n卡驱动。 每次在更新完驱动的第一天电脑还是很正常的&#xff0c;但是之后就会经常突然蓝…

Win10+1050Ti配置Tensorflow教程

Win101050Ti配置Tensorflow教程 笔者使用的是联想Y7000笔记本&#xff0c;显卡是1050Ti&#xff0c;在安装TensorFlow时&#xff0c;发现自己的显卡型号并不在NVDIA官网上支持型号的名单中&#xff0c;于是网上看了很多教程&#xff0c;很多都有问题&#xff08;或者不适用于我…