Golang笔记——优秀的并发实现

server/2025/1/26 6:21:42/

大家好,这里是Good Note,关注 公主号:Goodnote,专栏文章私信限时Free。本文介绍Go并发,同步,顺序执行,设计的一些常见的场景,顺序执行主要用channel实现。在这种同步信号的使用场景中,使用无缓冲通道,可以选择不关闭通道。

在这里插入图片描述

文章目录

    • 协程同步背景介绍
      • 无缓冲通道的作用
      • 为什么不需要关闭通道
      • 何时需要关闭通道
      • 总结
    • 常见的同步场景
      • 顺序执行10个Goroutine
      • 两个 Goroutines 的交替执行,交替打印偶数和奇数
      • 两个 server 的任意一个执行完成,就执行第三个
      • 两个 server 必须全部执行完成,再执行第三个。
      • 三个 server 的必须按 1 2 3 顺序执行
      • 一个生产者Goroutine,多个消费者Goroutine(每条消息,一次消费)
      • 一个生产者Goroutine,多个消费者Goroutine(每条消息每个消费者都会消费)
    • 历史文章

协程同步背景介绍

无缓冲通道(unbuffered channel)通常用于同步 Goroutine 的执行顺序。它的作用是让 Goroutines 在指定的时刻按顺序执行,而不是用来传递数据。因此,在这种同步信号传递的场景中,不需要关闭通道,程序依然能够正常运行。

无缓冲通道的作用

无缓冲通道的特性是:

  • 发送方必须等待接收方接收数据,才能继续发送下一个数据。
  • 它本质上是一个同步机制,用于协调不同 Goroutine 的执行顺序

这种同步机制不涉及数据传输,只是信号传递。

为什么不需要关闭通道

  • 同步控制:在你的代码中,通道仅仅用于同步 Goroutine 的执行,而不用于传输实际的数据。在同步场景下,通道充当的是“信号”的角色。接收方只关心信号的到来,处理完后就继续执行,通道是否关闭不会影响这一行为。

  • 没有资源泄漏:由于通道没有存储任何数据,而是仅用于发送和接收信号,通道的关闭不会带来资源泄漏。即使不关闭通道,程序也会正确地运行,因为没有对通道进行进一步的发送操作。

  • 避免复杂性:关闭通道通常用于“通知接收方数据已经完成传输”,而在这种情况下,你只是在等待信号,不需要关心通道的关闭状态。因此,不关闭通道反而使得程序逻辑更简洁,避免了复杂的资源管理。

  • 垃圾回收机制close():Go 的垃圾回收机制会自动处理那些不再使用的对象和数据结构,包括通道。所以即使没有显式关闭通道,程序结束时,未关闭的通道也会被垃圾回收。

何时需要关闭通道

在其他情况下,关闭通道是必要的,尤其是在以下几种场景:

  • 多接收方模式:如果你有多个接收方从同一个通道接收数据,关闭通道可以通知所有接收方没有更多的数据可以接收。

  • 通知没有更多数据:如果通道用于传输数据,关闭通道可以标识发送方不再有数据发送,这对于避免接收方阻塞和避免无限等待是非常重要的。

总结

无缓冲通道 的同步信号传递场景下:

  • 不关闭通道是合理的,因为通道的作用仅仅是控制同步,而不是用于传输数据。
  • 通道的关闭通常是在传输数据时,通知接收方“没有更多数据了”,而在同步信号的场景中,这种关闭操作并不必要。

所以,在 顺序执行的同步场景 中,如果你只是通过无缓冲通道传递信号,不关闭通道完全没有问题。

常见的同步场景

顺序执行10个Goroutine

这里选择了关闭channel,其实可以删掉close(),后续的例子不会再关闭channel。

package mainimport ("fmt""time"
)func main() {// 创建多个无缓冲的 Channel,用来控制 Goroutine 的顺序steps := make([]chan struct{}, 10)for i := 0; i < 10; i++ {steps[i] = make(chan struct{})}// 定义 10 个 Goroutine,按照顺序执行for i := 0; i < 10; i++ {go func(i int) {if i == 0 {// 第一个 Goroutine 不需要等待fmt.Println("Goroutine 1: Start")time.Sleep(1 * time.Second) // 模拟工作fmt.Println("Goroutine 1: Done")// 通知 Goroutine 2 可以开始steps[0] <- struct{}{}close(steps[0])} else {// 等待上一个 Goroutine 完成<-steps[i-1]fmt.Printf("Goroutine %d: Start\n", i+1)time.Sleep(1 * time.Second) // 模拟工作fmt.Printf("Goroutine %d: Done\n", i+1)// 通知下一个 Goroutine 可以开始if i < 9 {steps[i] <- struct{}{}close(steps[i])} else {// 最后一个 Goroutine,通知主线程完成steps[9] <- struct{}{}close(steps[9])}}}(i)}// 等待最后一个 Goroutine 完成<-steps[9]fmt.Println("All Goroutines Finished!")
}

输出:

Goroutine 1: Start
Goroutine 1: Done
Goroutine 2: Start
Goroutine 2: Done
Goroutine 3: Start
Goroutine 3: Done
Goroutine 4: Start
Goroutine 4: Done
Goroutine 5: Start
Goroutine 5: Done
Goroutine 6: Start
Goroutine 6: Done
Goroutine 7: Start
Goroutine 7: Done
Goroutine 8: Start
Goroutine 8: Done
Goroutine 9: Start
Goroutine 9: Done
Goroutine 10: Start
Goroutine 10: Done
All Goroutines Finished!

两个 Goroutines 的交替执行,交替打印偶数和奇数

package mainimport ("fmt"
)func main() {// 定义两个channel,分别用于控制打印顺序ch1 := make(chan struct{})ch2 := make(chan struct{})done := make(chan struct{}) // 用于通知主协程完成// 启动第一个goroutine,负责打印偶数go func() {for i := 0; i <= 9; i += 2 {<-ch1             // 等待信号fmt.Println(i)    // 打印当前偶数ch2 <- struct{}{} // 通知另一个goroutine}}()// 启动第二个goroutine,负责打印奇数go func() {for i := 1; i <= 9; i += 2 {<-ch2          // 等待信号fmt.Println(i) // 打印当前奇数if i == 9 {done <- struct{}{} // 打印完成通知主协程} else {ch1 <- struct{}{} // 通知另一个goroutine}}}()// 主协程启动打印过程ch1 <- struct{}{} // 先给ch1发送信号,开始打印偶数// 主协程等待所有任务完成<-done
}

输出:

0
1
2
3
4
5
6
7
8
9

两个 server 的任意一个执行完成,就执行第三个

这里执行了5次,以验证随机性,time.Sleep代表server的一些执行的逻辑。

package mainimport ("fmt""time"
)func server1(ch chan string) {time.Sleep(1 * time.Second)ch <- "server1"
}
func server2(ch chan string) {time.Sleep(1 * time.Second)ch <- "server2"
}
func main() {for i := 0; i < 5; i++ {output1 := make(chan string)output2 := make(chan string)go server1(output1)go server2(output2)select {case s1 := <-output1:fmt.Println(s1, "server3")case s2 := <-output2:fmt.Println(s2, "server3")}fmt.Println("--------------")}fmt.Println("5组执行完成")
}

输出:
下面输出5组,前面的server顺序不一定(哪个先完成都行)。

server1 server3
--------------
server2 server3
--------------
server1 server3
--------------
server1 server3
--------------
server1 server3
--------------
5组执行完成

说明:
select如果加上default会直接命中default,不会等待两个通道。

两个 server 必须全部执行完成,再执行第三个。

这里执行了5次,以验证随机性,time.Sleep代表server的一些执行的逻辑。

package mainimport ("fmt""time"
)func server1(ch chan string) {time.Sleep(1 * time.Second)ch <- "server1"
}
func server2(ch chan string) {time.Sleep(1 * time.Second)ch <- "server2"
}
func main() {for i := 0; i < 5; i++ {output1 := make(chan string)output2 := make(chan string)go server1(output1)go server2(output2)for i := 0; i < 2; i++ {select {case s1 := <-output1:fmt.Println(s1)case s2 := <-output2:fmt.Println(s2)}}fmt.Println("server3")fmt.Println("--------------")}fmt.Println("5组执行完成")
}

上面使用了for+select控制2个server完成,也可以使用WaitGroup,如下:

package mainimport ("fmt""sync""time"
)func server1(wg *sync.WaitGroup) {defer wg.Done() // 在函数退出时通知 WaitGrouptime.Sleep(1 * time.Second)fmt.Println("server1")
}
func server2(wg *sync.WaitGroup) {defer wg.Done() // 在函数退出时通知 WaitGrouptime.Sleep(1 * time.Second)fmt.Println("server2")
}
func main() {for i := 0; i < 5; i++ {wg := sync.WaitGroup{}wg.Add(2)go server1(&wg)go server2(&wg)wg.Wait()fmt.Println("server3")fmt.Println("--------------")}fmt.Println("5组执行完成")
}

输出:
下面输出5组,前面的server顺序不一定(哪个先完成都行)。

server1
server2
server3
--------------
server1
server2
server3
--------------
server2
server1
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
5组执行完成

三个 server 的必须按 1 2 3 顺序执行

这里执行了5次,以验证随机性,time.Sleep代表server的一些执行的逻辑。

package mainimport ("fmt""sync""time"
)func server1(ch chan string, wg *sync.WaitGroup) {defer wg.Done()             // 在函数退出时通知 WaitGrouptime.Sleep(1 * time.Second) // 模拟工作ch <- "server1"
}func server2(ch chan string, wg *sync.WaitGroup) {defer wg.Done()             // 在函数退出时通知 WaitGrouptime.Sleep(1 * time.Second) // 模拟工作ch <- "server2"
}func main() {for i := 0; i < 5; i++ {output1 := make(chan string)output2 := make(chan string)var wg sync.WaitGroup// 启动 server1 和 server2wg.Add(2) // 等待两个 Goroutine 完成go server1(output1, &wg)go server2(output2, &wg)// 确保按顺序输出 server1, server2, server3s1 := <-output1s2 := <-output2fmt.Println(s1)fmt.Println(s2)fmt.Println("server3")fmt.Println("--------------")}// 完成5组任务fmt.Println("5组执行完成")
}

输出:

server1
server2
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
5组执行完成

一个生产者Goroutine,多个消费者Goroutine(每条消息,一次消费)

此处生产者生产的每个消息,只会有一个消费者消费,并发消费。

package mainimport ("fmt""sync""time"
)func producer(ch chan int, wg *sync.WaitGroup) {defer wg.Done()for i := 1; i <= 5; i++ {fmt.Printf("Produced: %d\n", i)ch <- i // 向通道发送数据time.Sleep(time.Second)}close(ch) // 生产者完成后关闭通道
}func consumer(id int, ch chan int, wg *sync.WaitGroup) {defer wg.Done()for task := range ch { // 从通道读取任务直到通道关闭fmt.Printf("Consumer %d processing task: %d\n", id, task)time.Sleep(2 * time.Second) // 模拟消费任务的延时}
}func main() {var wg sync.WaitGrouptasks := make(chan int, 10) // 定义缓冲区大小为10的任务通道// 启动多个消费者for i := 1; i <= 3; i++ {wg.Add(1)go consumer(i, tasks, &wg)}// 启动生产者wg.Add(1)go producer(tasks, &wg)// 等待所有消费者完成工作wg.Wait()fmt.Println("All tasks completed.")
}

输出:

Produced: 1
Consumer 2 processing task: 1
Produced: 2
Consumer 1 processing task: 2
Produced: 3
Consumer 3 processing task: 3
Produced: 4
Consumer 2 processing task: 4
Produced: 5
Consumer 1 processing task: 5
All tasks completed.

一个生产者Goroutine,多个消费者Goroutine(每条消息每个消费者都会消费)

此处生产者生产的每个消息,每个消费者都会消费。
方案:

  1. 我们应该设计一个 多播机制(广播模式),即每个任务都会被多个消费者消费。最简单的方式是使用 复制通道(通过多个 goroutine 消费同一个任务通道)。

  2. 可以通过 使用多个通道,每个消费者都从这些通道中接收任务,或者使用 sync.WaitGroup 等方式来确保每个消费者都能够完成任务处理。

package mainimport ("fmt""sync""time"
)func producer(channels []chan int, wg *sync.WaitGroup) {defer wg.Done() // 确保 producer 完成时通知 WaitGroup// 生产 5 个任务for i := 1; i <= 5; i++ {fmt.Printf("Produced: %d\n", i)// 向每个通道发送任务for _, ch := range channels {ch <- i}time.Sleep(time.Second)}// 所有任务发送完毕后,关闭每个通道for _, ch := range channels {close(ch)}
}func consumer(id int, ch chan int, wg *sync.WaitGroup) {defer wg.Done() // 确保 consumer 完成时通知 WaitGroup// 从通道中接收任务并处理for task := range ch {fmt.Printf("Consumer %d processing task: %d\n", id, task)time.Sleep(2 * time.Second) // 模拟任务处理延迟}
}func main() {var wg sync.WaitGroupnumConsumers := 3channels := make([]chan int, numConsumers)// 创建多个消费者通道for i := 0; i < numConsumers; i++ {channels[i] = make(chan int, 10)}// 启动多个消费者for i := 1; i <= numConsumers; i++ {wg.Add(1)go consumer(i, channels[i-1], &wg)}// 启动生产者wg.Add(1)go producer(channels, &wg)// 等待所有消费者和生产者完成工作wg.Wait()fmt.Println("All tasks completed.")
}

输出:

Produced: 1
Consumer 1 processing task: 1
Consumer 2 processing task: 1
Consumer 3 processing task: 1
Produced: 2
Produced: 3
Consumer 1 processing task: 2
Consumer 2 processing task: 2
Consumer 3 processing task: 2
Produced: 4
Produced: 5
Consumer 2 processing task: 3
Consumer 1 processing task: 3
Consumer 3 processing task: 3
Consumer 1 processing task: 4
Consumer 2 processing task: 4
Consumer 3 processing task: 4
Consumer 3 processing task: 5
Consumer 2 processing task: 5
Consumer 1 processing task: 5
All tasks completed.

历史文章

MySQL数据库

MySQL数据库

Redis

Redis数据库笔记合集

Golang

  1. Golang笔记——语言基础知识
  2. Golang笔记——切片与数组
  3. Golang笔记——hashmap
  4. Golang笔记——rune和byte
  5. Golang笔记——channel
  6. Golang笔记——Interface类型
  7. Golang笔记——数组、Slice、Map、Channel的并发安全性
  8. Golang笔记——协程同步

http://www.ppmy.cn/server/161192.html

相关文章

125个Docker的常用命令

125个Docker的常用命令 基本命令 镜像管理指令 容器管理指令 数据管理与卷指令 网络配置指令

Jetson nano 安装 PCL 指南

本指南帮助 ARM64 架构的 Jetson Nano 安装 PCL&#xff08;点云库&#xff09;。 安装步骤 第一步&#xff1a;安装依赖 在终端中运行以下命令&#xff0c;安装 PCL 所需的依赖&#xff1a; sudo apt-get update sudo apt-get install git build-essential linux-libc-dev s…

利用现有模型处理面部视频获取特征向量(4)

于是载入完整版视频 conda activate video_features cd video_features python main.py \feature_typer21d \device"cuda:0" \video_paths"[/home/ubuntu/low/0.mp4,/home/ubuntu/low/1.mp4,/home/ubuntu/low/2.mp4,/home/ubuntu/low/3.mp4,/home/ubuntu/low/…

将IDLE里面python环境pyqt5配置的vscode

首先安装pyqt5全套&#xff1a;pip install pyqt5-tools 打开Vscode&#xff1a; 安装第三方扩展&#xff1a;PYQT Integration 成功配置designer.exe的路径【个人安装pyqt5的执行路径】&#xff0c;便可直接打开UI文件&#xff0c;进行编辑。 配置pyuic,如果下图填写方法使用…

CSS align-content 属性

定义和用法 align-content 属性修改 flex-wrap 属性的行为。它与 align-items 相似&#xff0c;但是它不对齐弹性项目&#xff0c;而是对齐弹性线。 注意&#xff1a;必须有多行项目&#xff0c;此属性才能生效&#xff01; 提示&#xff1a;使用 justify-content 属性可将主…

0基础跟德姆(dom)一起学AI 自然语言处理19-输出部分实现

1 输出部分介绍 输出部分包含: 线性层softmax层 2 线性层的作用 通过对上一步的线性变化得到指定维度的输出, 也就是转换维度的作用. 3 softmax层的作用 使最后一维的向量中的数字缩放到0-1的概率值域内, 并满足他们的和为1. 3.1 线性层和softmax层的代码分析 # 解码器类…

【Uniapp-Vue3】图片lazy-load懒加载

图片懒加载是指当预览到某张图片的时候才发送请求获取该图片。 在没有开启懒加载的时候&#xff0c;所有的图片都会一次性请求完毕。 下面我们开启懒加载&#xff1a; <image lazy-load></image> 一次请求图片的数量减少了很多&#xff0c;只有当我们浏览到新的图…

grafana+prometheus监控linux指标

先查看linux架构 [rootnode-0006 node_exporter-1.6.1.linux-amd64]# uname -m aarch64 我服务器是ARM 架构 所以是下载适用于 ARM64 的 Node Exporter&#xff1a; 新建一个文件夹 进入 wget https://github.com/prometheus/node_exporter/releases/download/v1.6.1/node…