Go 控制协程(goroutine)的并发数量

ops/2024/10/18 14:20:50/

在使用协程并发处理某些任务时, 其并发数量往往因为各种因素的限制不能无限的增大. 例如网络请求、数据库查询等等。

从运行效率角度考虑,在相关服务可以负载的前提下(限制最大并发数),尽可能高的并发。

在Go语言中,可以使用一些方法来控制协程(goroutine)的并发数量,以防止并发过多导致资源耗尽或性能下降

1、使用信号量(Semaphore)

可以使用 Go 语言中的 channel 来实现简单的信号量,限制并发数量

package mainimport ("fmt""sync"
)func worker(id int, sem chan struct{}) {sem <- struct{}{} // 占用一个信号量defer func() {<-sem // 方法运行结束释放信号量}()// 执行工作任务fmt.Printf("Worker %d: Working...\n", id)
}func main() {concurrency := 3sem := make(chan struct{}, concurrency)var wg sync.WaitGroupfor i := 0; i < 10; i++ {wg.Add(1)go func(id int) {defer wg.Done()worker(id, sem)}(i)}wg.Wait()close(sem)
}

sem 是一个有缓冲的 channel,通过控制 channel 中元素的数量,实现了一个简单的信号量机制 

 2、使用协程池

可以创建一个固定数量的协程池,将任务分发给这些协程执行。 

package mainimport ("fmt""sync"
)func worker(id int, jobs <-chan int, results chan<- int) {//jobs等待主要协程往jobs放数据for j := range jobs {fmt.Printf("协程池 %d: 协程池正在工作 %d\n", id, j)results <- j}
}func main() {const numJobs = 5    //协程要做的工作数量const numWorkers = 3 //协程池数量jobs := make(chan int, numJobs)results := make(chan int, numJobs)var wg sync.WaitGroup// 启动协程池for i := 1; i <= numWorkers; i++ {wg.Add(1)go func(id int) {defer wg.Done()worker(id, jobs, results)}(i)}// 提交任务for j := 1; j <= numJobs; j++ {jobs <- j}close(jobs)// 等待所有工作完成go func() {wg.Wait()close(results)}()// 处理结果for result := range results {fmt.Println("Result:", result)}
}

jobs 通道用于存储任务,results 通道用于存储处理结果。通过创建固定数量的工作协程,可以有效地控制并发数量。

3、使用其他包

Go 1.16 引入了 golang.org/x/sync/semaphore 包,它提供了一个更为灵活的信号量实现。

案例一

限制对外部API的并发请求 

假设我们有一个外部API,它对并发请求有限制,我们希望确保不超过这个限制。我们可以使用semaphore.Weighted来控制对API的并发访问

package mainimport ("context""fmt""golang.org/x/sync/semaphore""sync""time"
)func main() {/*1、在并发量一定的情况下,通过改变允许并发请求数可以更快处理请求任务(在CPU够用的前提下)2、sem := semaphore.NewWeighted(n),参数n就是权重量3、当一个协程需要获取的单位的权重越多,运行就会慢(比如权重总量n=5个,一个协程分配了2个,跟一个协程分配1个效率是不一样的)4、信号量没有足够的可用权重的情况发生在所有已分配的权重单位都已经被占用,即信号量的当前权重计数达到了它的总容量。在这种情况下,任何尝试通过Acquire方法获取更多权重的调用都将无法立即完成,从而导致调用者(通常是goroutine)阻塞,直到其他调用者释放一些权重单位。*//*1、权权重较大的任务在资源竞争时有更高的优先级,更容易获得执行的机会2、如果当前资源足够满足高权重任务的需求,这些任务将立即执行;若资源不足,则按照权重高低顺序排队等待3、一旦任务开始执行,其完成的速度主要取决于任务自身的逻辑复杂度、所需资源以及系统的当前负载等因素,与任务在信号量中的权重无关3、高权重的任务并不会中断已经在执行的低权重任务,而是等待这些任务自行释放资源。一旦资源释放,等待队列中的高权重任务将优先被唤醒4、Acquire 方法会检查当前信号量的可用资源量是否满足请求的权重,如果满足,则立即减少信号量的资源计数并返回,允许任务继续执行。如果不满足,任务将阻塞等待,直到有足够的资源被释放*/// 记录开始时间startTime := time.Now()// 假设外部API允许的最大并发请求为5(信号量的总容量是5个权重单位)const (maxConcurrentRequests = 5)sem := semaphore.NewWeighted(maxConcurrentRequests)var wg sync.WaitGroup// 模拟对外部API的10个并发请求for i := 0; i < 10; i++ {wg.Add(1)go func(requestId int) {defer wg.Done()// 假设我们想要获取2个单位的权重if err := sem.Acquire(context.Background(), 2); err != nil {fmt.Printf("请求 %d 无法获取信号量: %v\n", requestId, err)return}defer sem.Release(2) // 请求完成后释放信号量// 模拟对API的请求处理fmt.Printf("请求 %d 开始...\n", requestId)time.Sleep(2 * time.Second) // 模拟网络延迟fmt.Printf("请求 %d 完成。\n", requestId)}(i)}wg.Wait()// 记录结束时间endTime := time.Now()// 计算并打印总耗时fmt.Printf("程序总耗时: %v\n", endTime.Sub(startTime))
}

信号量没有足够的可用权重的情况发生在所有已分配的权重单位都已经被占用,即信号量的当前权重计数达到了它的总容量。在这种情况下,任何尝试通过Acquire方法获取更多权重的调用都将无法立即完成,从而导致调用者(通常是goroutine)阻塞,直到其他调用者释放一些权重单位。

以下是一些导致信号量没有足够可用权重的具体情况:

  1. 信号量初始化容量较小:如果信号量的总容量设置得较小,而并发请求的数量较大,那么很快就会出现权重不足的情况。

  2. 长时间占用权重:如果某些goroutine长时间占用权重单位而不释放,这会导致其他goroutine无法获取到权重,即使这些goroutine只是少数。

  3. 权重分配不均:在某些情况下,可能存在一些goroutine占用了不成比例的权重单位,导致其他goroutine无法获取足够的权重。

  4. 权重释放不及时:如果goroutine因为错误或异常情况提前退出,而没有正确释放它们所占用的权重,那么这些权重单位将不会被回收到信号量中。

  5. 高频率的请求:在短时间内有大量goroutine请求权重,即使它们请求的权重不大,累积起来也可能超过信号量的总容量。

  6. 信号量权重未正确管理:如果信号量的权重管理逻辑存在缺陷,例如错误地释放了过多的权重,或者在错误的时间点释放权重,也可能导致可用权重不足。

为了避免信号量没有足够的可用权重,可以采取以下措施:

  • 合理设置信号量容量:根据资源限制和并发需求合理设置信号量的总容量。
  • 及时释放权重:确保在goroutine完成工作后及时释放权重。
  • 使用超时:在Acquire调用中使用超时,避免无限期地等待权重。
  • 监控和日志记录:监控信号量的使用情况,并记录关键信息,以便及时发现和解决问题。
  • 权重分配策略:设计合理的权重分配策略,确保权重的公平和高效分配。

通过这些措施,可以更好地管理信号量的使用,避免因权重不足导致的并发问题。

案例二

假设有一个在线视频平台,它需要处理不同分辨率的视频转码任务。由于高清视频转码比标清视频更消耗计算资源,因此平台希望设计一个系统,能够优先处理更多标清视频转码请求,同时又不完全阻塞高清视频的转码,以保持整体服务质量和资源的有效利用。

package mainimport ("fmt""golang.org/x/net/context""golang.org/x/sync/semaphore""runtime""sync""time"
)// VideoTranscodeJob 视频转码任务
type VideoTranscodeJob struct {resolution stringweight     int64
}func main() {cpuCount := runtime.NumCPU()fmt.Printf("当前CPU个数%v\n", cpuCount)/*1、权权重较大的任务在资源竞争时有更高的优先级,更容易获得执行的机会2、如果当前资源足够满足高权重任务的需求,这些任务将立即执行;若资源不足,则按照权重高低顺序排队等待3、一旦任务开始执行,其完成的速度主要取决于任务自身的逻辑复杂度、所需资源以及系统的当前负载等因素,与任务在信号量中的权重无关3、高权重的任务并不会中断已经在执行的低权重任务,而是等待这些任务自行释放资源。一旦资源释放,等待队列中的高权重任务将优先被唤醒4、Acquire 方法会检查当前信号量的可用资源量是否满足请求的权重,如果满足,则立即减少信号量的资源计数并返回,允许任务继续执行。如果不满足,任务将阻塞等待,直到有足够的资源被释放*/// 初始化两个信号量,一个用于标清,一个用于高清,假设总共有8个CPU核心可用normalSem := semaphore.NewWeighted(6)  // 标清任务,分配6个单位权重,因为它们消耗资源较少highDefSem := semaphore.NewWeighted(2) // 高清任务,分配2个单位权重,因为它们更消耗资源var wg sync.WaitGroup//假设有20个需要转码的视频videoJobs := []VideoTranscodeJob{{"HD", 2}, {"HD", 2}, {"SD", 1}, {"SD", 1}, {"HD", 2},{"SD", 1}, {"SD", 1}, {"HD", 2}, {"SD", 1}, {"HD", 2},{"SD", 4}, {"SD", 4}, {"HD", 2}, {"SD", 1}, {"HD", 2},{"SD", 1}, {"SD", 4}, {"HD", 2}, {"SD", 6}, {"HD", 2},}for _, job := range videoJobs {wg.Add(1)go func(job VideoTranscodeJob) {defer wg.Done()var sem *semaphore.Weightedswitch job.resolution {case "SD":sem = normalSem //分配权重大,当前为6,任务在获取执行机会上有优势,但并不直接意味着执行速度快case "HD":sem = highDefSemdefault:panic("无效的分辨率")}if err := sem.Acquire(context.Background(), job.weight); err != nil {fmt.Printf("名为 %s 视频无法获取信号量: %v\n", job.resolution, err)return}defer sem.Release(job.weight) //释放权重对应的信号量// 模拟转码任务执行fmt.Printf("转码 %s 视频 (权重: %d)...\n", job.resolution, job.weight)//通过利用VideoTranscodeJob的weight值来模拟转码时间的长短,HD用时长则设置2比SD的1大,*时间就自然长,运行就时间长time.Sleep(time.Duration(job.weight*100) * time.Millisecond) // 模拟不同分辨率视频转码所需时间fmt.Printf("------------------------%s 视频转码完成。。。\n", job.resolution)}(job)}wg.Wait()
}

标清(SD)和高清(HD),分别分配了不同的权重(1和2)。通过创建两个不同权重的信号量,我们可以控制不同类型任务的同时执行数量,从而优先保证标清视频的快速处理,同时也确保高清视频能够在不影响系统稳定性的情况下进行转码。这展示了带权重的并发控制如何帮助在资源有限的情况下优化任务调度和执行效率。

 注意:对协程分配的权重单位数不能大于对应上下文semaphore.NewWeighted(n)中参数n的单位数

案例三

package mainimport ("context""fmt""sync""time""golang.org/x/sync/semaphore"
)type weightedTask struct {id     intweight int64
}func main() {const (maxTotalWeight = 20 // 最大总权重)sem := semaphore.NewWeighted(maxTotalWeight)var wg sync.WaitGrouptasksCh := make(chan weightedTask, 10)// 发送任务for i := 1; i <= 10; i++ {tasksCh <- weightedTask{id: i, weight: int64(i)} // 假设任务ID即为其权重}close(tasksCh)// 启动任务处理器for task := range tasksCh {wg.Add(1)go func(task weightedTask) {defer wg.Done()if err := sem.Acquire(context.Background(), int64(task.id)); err != nil {fmt.Printf("任务 %d 无法获取信号量: %v\n", task.id, err)return}defer sem.Release(int64(task.id)) //释放// 模拟任务执行fmt.Printf("任务 %d (权重: %d) 正在运行...\n", task.id, task.weight)time.Sleep(time.Duration(task.weight*100) * time.Millisecond) // 示例中简单用时间模拟权重影响fmt.Printf("任务 %d 完成.\n", task.id)}(task)}wg.Wait()
}

 

总结

选择哪种方法取决于具体的应用场景和需求。使用信号量是一种简单而灵活的方法,而协程池则更适用于需要批量处理任务的情况。golang.org/x/sync/semaphore 包提供了一个标准库外的更灵活的信号量实现         


http://www.ppmy.cn/ops/45041.html

相关文章

搭建YOLOv10环境 训练+推理+模型评估

文章目录 前言一、环境搭建必要环境1. 创建yolov10虚拟环境2. 下载pytorch (pytorch版本>1.8)3. 下载YOLOv10源码4. 安装所需要的依赖包 二、推理测试1. 将如下代码复制到ultralytics文件夹同级目录下并运行 即可得到推理结果2. 关键参数 三、训练及评估1. 数据结构介绍2. 配…

Java_基础加强——日志、类加载器、单元测试、注解

1.日志&#xff1a; 日志的优势&#xff1a; 体系结构&#xff1a; 日志规范&#xff1a; 日志规范大多是一些接口&#xff0c;提供给实现框架去设计的 常见的规范是Commons Logging&#xff08;JCL&#xff09;、Simple Logging Facade for Java&#xff08;slf4j&#xff09…

第十三节:带你梳理Vue2 : watch侦听器

官方解释:> 观察 Vue 实例变化的一个表达式或计算属性函数。回调函数得到的参数为新值和旧值。表达式只接受监督的键路径。对于更复杂的表达式&#xff0c;用一个函数取代<br/>## 1. 侦听器的基本使用侦听器可以监听data对象属性或者计算属性的变化watch是观察属性的…

电脑同时配置两个版本mysql数据库常见问题

1.配置时&#xff0c;要把bin中的mysql.exe和mysqld.exe 改个名字&#xff0c;不然两个版本会重复&#xff0c;当然&#xff0c;在初始化数据库的时候&#xff0c;如果时57版本的&#xff0c;就用mysql57(已经改名的)和mysqld57 代替 mysql 和 mysqld 例如 mysql -u root -p …

Unix环境高级编程--8-进程控制---8.1-8.2进程标识-8.3fork函数-8.4 vfork函数

1、进程控制几个过程 创建进程--》执行进程---》终止进程 2、进程标识 &#xff08;1&#xff09;专用进程&#xff1a;ID为0的进程是调度进程&#xff0c;常常被称为交换进程&#xff0c;也称为系统进程&#xff1b; ID为1通常是init进程&#xff0c;在自举结束时由内核调用…

轧钢测径仪分析软件,四大图表带来产线新视角!

轧钢测径仪是智能化检测设备&#xff0c;除了测径仪主体外&#xff0c;还配有测控软件系统&#xff0c;从这里可对测径仪进行各种设置&#xff0c;亦可从此观测到测径仪获得的各种信息&#xff0c;如检测信息、分析图表、计算尺寸、历史数据等。而从测径仪获得的图表信息主要有…

leetcode-顺时针旋转矩阵-111

题目要求 思路 1.假设现在有一个矩阵 123 456 789 2.我们可以根据19这个对角线将数据进行交换&#xff0c;得到矩阵 147 258 369 3.然后将矩阵每一行的数据再翻转&#xff0c;得到矩阵 741 852 963 代码实现 class Solution { public:vector<vector<int> > rot…

Web基础与HTTP协议

域名概念 1、网络是基于 TCP/IP 协议进行通讯和连接的&#xff0c;每一台主机都有一个唯一的标识&#xff08;固定的IP地址&#xff09;&#xff0c;用以区别在网络上成千上万个用户和计算机。网络在区分所有与之相连的主机和网络时&#xff0c;均采用一种唯一、通用的地址格式…