限流,流量整形算法

ops/2024/9/23 6:23:51/

写在前面

源码 。
本文看下流量整形相关算法

目前流量整形算法主要有三种,计数器,漏桶,令牌桶。分别看下咯!

1:计数器

1.1:描述

单位时间内只允许指定数量的请求,如果是时间区间内超过指定数量,则直接拒绝,如果时间区间结束,则重置计数器,开始下一个时间区间。
在这里插入图片描述

1.2:程序

package com.dahuyou.algrithm.triffic.shaper.counter;import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;// 计速器 限速
public class CounterLimiter {// 起始时间private static long startTime = System.currentTimeMillis();// 时间区间的时间间隔 msprivate static long interval = 1000;// 每interval时间内限制数量private static long maxCount = 2;//累加器private static AtomicLong accumulator = new AtomicLong();// 计数判断, 是否超出限制private static long tryAcquire(long taskId, int turn) {long nowTime = System.currentTimeMillis();//在时间区间之内if (nowTime < startTime + interval) {long count = accumulator.incrementAndGet();if (count <= maxCount) {System.out.println("taskId: " + taskId + " 正常执行!");return count;} else {// 返回-1说明时间区间内被限制了
//                return -count;System.out.println("时区内达到次数咯!");return -1;}} else {//在时间区间之外synchronized (CounterLimiter.class) {System.out.println("新时间区到了,taskId:" + taskId + ", turn {}.." + turn);// 再一次判断,防止重复初始化if (nowTime > startTime + interval) {accumulator.set(0);startTime = nowTime;}}return 0;}}final int threads = 1;//线程池,用于多线程模拟测试
//    private ExecutorService pool = Executors.newFixedThreadPool(10);private ExecutorService pool = Executors.newFixedThreadPool(threads);@Testpublic void testLimit() {// 被限制的次数AtomicInteger limited = new AtomicInteger(0);// 线程数
//        final int threads = 2;// 每条线程的执行轮数final int turns = 20;// 同步器CountDownLatch countDownLatch = new CountDownLatch(threads);long start = System.currentTimeMillis();for (int i = 0; i < threads; i++) {pool.submit(() ->{try {for (int j = 0; j < turns; j++) {long taskId = Thread.currentThread().getId();long index = tryAcquire(taskId, j);
//                        if (index <= 0) {if (index == -1) {// 被限制的次数累积limited.getAndIncrement();}Thread.sleep(200);}} catch (Exception e) {e.printStackTrace();}//等待所有线程结束countDownLatch.countDown();});}try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}float time = (System.currentTimeMillis() - start) / 1000F;//输出统计结果System.out.println("限制的次数为:" + limited.get() +",通过的次数为:" + (threads * turns - limited.get()));System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));System.out.println("运行的时长为:" + time);}}

输出:
在这里插入图片描述

1.3:优缺点

  • 优点
简单
  • 缺点
无法处理流量分配不均匀的情况,可能导致大量的请求被拒绝

1.4:适用场景

流量比较平稳业务场景。比如我司的机器人外呼业务,因为是程序在跑,所以流量很稳定,一旦业务配置导致流量增高,则可以使用该算法进行限流。

但对于突发流量场景,可能会因为很短时间内的突发流量就导致计数器达到最大值,从而时间区间内的剩余时间所有请求全部丢弃,这也存在着被攻击的风险。
在这里插入图片描述

2:漏桶

2.1:描述

水(对应请求)从进水口进入到漏桶里,漏桶以一定的速度出水(请求放行),当水流入速度过大,桶内的总水量大于桶容量会直接溢出,请求被拒绝,如图所示:
在这里插入图片描述

2.2:程序

package com.dahuyou.algrithm.triffic.shaper.counter;import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;// 漏桶 限流
public class LeakBucketLimiter {// 计算的起始时间private static long lastOutTime = System.currentTimeMillis();// 流出速率 每100毫秒漏2次private static int leakRate = 1;
//    private static int leakRate = 2000;// 桶的容量private static int capacity = 5;//剩余的水量private static AtomicInteger water = new AtomicInteger(0);//返回值说明:// false 没有被限制到// true 被限流public static synchronized boolean isLimit(long taskId, int turn) {// 如果是空桶,就当前时间作为漏出的时间if (water.get() == 0) {lastOutTime = System.currentTimeMillis();water.addAndGet(1);return false;}// 执行漏水
//        int waterLeaked = ((int) ((System.currentTimeMillis() - lastOutTime) / 1000)) * leakRate;int waterLeaked = ((int) ((System.currentTimeMillis() - lastOutTime) / 100)) * leakRate;// 计算剩余水量,当前的量减去漏出去的量就是剩余的量int waterLeft = water.get() - waterLeaked;// 要注意:剩余的量最小是0water.set(Math.max(0, waterLeft));// 重新更新leakTimeStamplastOutTime = System.currentTimeMillis();// 尝试加水,并且水还未满 ,放行if ((water.get()) < capacity) {System.out.println("水未满,成功加水");water.addAndGet(1);return false;} else {System.out.println("水已满,水溢出");// 水满,拒绝加水, 限流return true;}}final int threads = 1;//线程池,用于多线程模拟测试(负责加水)private ExecutorService pool = Executors.newFixedThreadPool(threads);private ExecutorService outWaterPool = Executors.newFixedThreadPool(threads);@Testpublic void testLimit() {//        new Thread(() -> {
//            for (int i = 0; i < 1000; i++) {
//                if (water.get() > 0) {
//                    System.out.println("出水了");
//                    water.decrementAndGet();
//                } else {
//                    System.out.println("无水可出了");
//                }
//                try {
//                    TimeUnit.MILLISECONDS.sleep(100);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//            }
//        }).start();// 被限制的次数AtomicInteger limited = new AtomicInteger(0);// 线程数
//        final int threads = 2;// 每条线程的执行轮数final int turns = 20;// 线程同步器CountDownLatch countDownLatch = new CountDownLatch(threads);long start = System.currentTimeMillis();for (int i = 0; i < threads; i++) {pool.submit(() ->{try {for (int j = 0; j < turns; j++) {long taskId = Thread.currentThread().getId();boolean intercepted = isLimit(taskId, j);if (intercepted) {// 被限制的次数累积limited.getAndIncrement();}Thread.sleep(200);}} catch (Exception e) {e.printStackTrace();}//等待所有线程结束countDownLatch.countDown();});}try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}float time = (System.currentTimeMillis() - start) / 1000F;//输出统计结果System.out.println("限制的次数为:" + limited.get() +",通过的次数为:" + (threads * turns - limited.get()));System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));System.out.println("运行的时长为:" + time);}
}

运行:

水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
限制的次数为:0,通过的次数为:20
限制的比例为:0.0
运行的时长为:4.136Process finished with exit code 0

此时因为水流出的速度快于流入的速度,所以,一直可以成功加水,可以修改leakRate=0,再运行:

水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
限制的次数为:15,通过的次数为:5
限制的比例为:0.75
运行的时长为:4.176Process finished with exit code 0

就可以看到水满溢出的情况了。

2.3:优缺点

  • 优点
可应对突发流量,避免服务被冲垮,从而起到保护服务的作用
  • 缺点
因为出口速率固定,所以当服务能力提升时,无法自动匹配后端服务的能力提升

2.4:适用场景

3:令牌桶

3.1:描述

有一个固定容量的令牌桶,按照一定的速率(可以调节)向令牌桶中放入令牌,请求想要被执行,必须能够从令牌桶中获取到令牌,否则将会被抛弃,参考下图:
在这里插入图片描述

3.2:程序

package com.dahuyou.algrithm.triffic.shaper.counter;//import lombok.extern.slf4j.Slf4j;import org.junit.Test;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;// 令牌桶 限速
//@Slf4j
public class TokenBucketLimiter {// 上一次令牌发放时间public long lastTime = System.currentTimeMillis();// 桶的容量public int capacity = 2;// 令牌生成速度 /s,如果是调大令牌的生成速度,则服务能力也会得到提高(在服务扛得住的前提下)public int rate = 2;// 当前令牌数量public AtomicInteger tokens = new AtomicInteger(0);//返回值说明:// false 没有被限制到// true 被限流public synchronized boolean isLimited(long taskId, int applyCount) {long now = System.currentTimeMillis();//时间间隔,单位为 mslong gap = now - lastTime;//计算时间段内的令牌数int reverse_permits = (int) (gap * rate / 1000);int all_permits = tokens.get() + reverse_permits;// 当前令牌数(固有的令牌加上时间段内新产生的令牌就是当前真实的令牌数啦),// 因为令牌桶也有固定的数量所以要取下最小值tokens.set(Math.min(capacity, all_permits));
//        log.info("tokens {} capacity {} gap {} ", tokens, capacity, gap);
//        System.out.println("tokens " + tokens + " capacity " + capacity + " gap  " + gap);/*** 如果申请的数量大于可用令牌数,则拒绝,否则发放令牌,执行请求*/if (tokens.get() < applyCount) {System.out.println("没有辣么多令牌啦!!!");// 若拿不到令牌,则拒绝// log.info("被限流了.." + taskId + ", applyCount: " + applyCount);return true;} else {System.out.println("令牌拿去撒!!!");// 还有令牌,领取令牌tokens.getAndAdd( - applyCount);lastTime = now;// log.info("剩余令牌.." + tokens);return false;}}//线程池,用于多线程模拟测试private ExecutorService pool = Executors.newFixedThreadPool(10);@Testpublic void testLimit() {// 被限制的次数AtomicInteger limited = new AtomicInteger(0);// 线程数final int threads = 2;// 每条线程的执行轮数final int turns = 20;// 同步器CountDownLatch countDownLatch = new CountDownLatch(threads);long start = System.currentTimeMillis();for (int i = 0; i < threads; i++) {pool.submit(() ->{try {for (int j = 0; j < turns; j++) {long taskId = Thread.currentThread().getId();boolean intercepted = isLimited(taskId, 1);if (intercepted) {// 被限制的次数累积limited.getAndIncrement();}Thread.sleep(200);}} catch (Exception e) {e.printStackTrace();}//等待所有线程结束countDownLatch.countDown();});}try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}float time = (System.currentTimeMillis() - start) / 1000F;//输出统计结果System.out.println("限制的次数为:" + limited.get() +",通过的次数为:" + (threads * turns - limited.get()));System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));System.out.println("运行的时长为:" + time);}
}

输出:
在这里插入图片描述
展示的是既有申请到令牌也有没有申请到令牌的场景,修改代码public int rate = 2000;给令牌发放一个非常大的速度,此时就会一直可以拿得到令牌:
在这里插入图片描述
修改程序public int rate = 0;直接不发放令牌,就可以看到令牌全部申请失败的场景:
在这里插入图片描述

3.3:优缺点

  • 优点
1:因为令牌桶容量有限制,所以可以应对突发流量
2:服务QPS增加或者降低时只需要对应调整令牌的发放速度即可适配
  • 缺点

3.4:适用场景

写在后面

参考文章列表

限流:计数器、漏桶、令牌桶 三大算法的原理与实战(史上最全) 。


http://www.ppmy.cn/ops/110686.html

相关文章

LSTM处理时序数据:深入解析与实战

大家好&#xff0c;我是你们的深度学习老群群。今天&#xff0c;我们来聊一聊LSTM&#xff08;长短期记忆网络&#xff09;是如何处理时序数据并得到预测结果的。LSTM作为循环神经网络&#xff08;RNN&#xff09;的一种变体&#xff0c;因其能够有效捕捉长期依赖关系&#xff…

vmware esxi 6.5 开启 snmp 服务

学习目标&#xff1a; 如何开启 vmware esxi 6.xx 开启 snmp 服务 查看SNMP 是否开启状态&#xff1a; 如何开启SNMP&#xff1a; 1.用 MAC、Linux SSH 工具 (如 SecureCRT) 连接 esxi 2、修改 SNMP 配置文件 vi /etc/vmware/snmp.xml3 、将标签 false 改为 true 在 后加上…

20、网络数据安全管理条例

第一章 总则 第一条 为了规范网络数据处理活动, 保障数据安全, 保护个人、 组织在网络空间的合法权益, 维护国家安全、 公共利益, 根据《中华人民共和国网络安全法》《中华人民共和国数据安全法》《中华人民共和国个人信息保护法》等法律,制定本条例。 第二条 在中华人民…

JAVA相关知识

JAVA基础知识 说一下对象创建的过程&#xff1f; 类加载检查&#xff1a;当Java虚拟机&#xff08;JVM&#xff09;遇到一个类的new指令时&#xff0c;它首先检查这个类是否已经被加载、链接和初始化。如果没有&#xff0c;JVM会通过类加载器&#xff08;ClassLoader&#xff…

计算机网络八股总结

这里写目录标题 网络模型划分&#xff08;五层和七层&#xff09;及每一层的功能五层网络模型七层网络模型&#xff08;OSI模型&#xff09; 三次握手和四次挥手具体过程及原因三次握手四次挥手 TCP/IP协议组成UDP协议与TCP/IP协议的区别Http协议相关知识网络地址&#xff0c;子…

1T机械硬盘需要分区吗?你必须知道的分区知识

随着科技的不断发展&#xff0c;计算机存储设备的容量日益增大&#xff0c;1T(1TB&#xff0c;即1024GB)机械硬盘已成为许多电脑用户的标配。然而&#xff0c;在这样一个大容量硬盘面前&#xff0c;很多用户都会面临一个问题&#xff1a;是否需要对这块硬盘进行分区&#xff1f…

ctfshow-PHP反序列化

web254 源码 <?php/* # -*- coding: utf-8 -*- # Author: h1xa # Date: 2020-12-02 17:44:47 # Last Modified by: h1xa # Last Modified time: 2020-12-02 19:29:02 # email: h1xactfer.com # link: https://ctfer.com //mytime 2023-12-4 0:22 */ error_reporting(0)…

Python 常用模块(二):json模块

目录 1. json 模块介绍1.1 json 模块快用导航1.2 什么是JSON1.2.1 JSON的特点1.2.2 JSON的基本语法1.2.3 JSON数据类型1.2.4 JSON示例1.2.5 JSON使用场景1.2.6 JSON的优缺点1.2.7 JSON和XML的比较 1.3 json 模块 2. dump() 方法 --- 转换为 JSON 格式写入文件2.1 语法参考2.2 实…