大家好,这里是Good Note,关注 公主号:Goodnote,专栏文章私信限时Free。本文介绍Go并发,同步,顺序执行,设计的一些常见的场景,顺序执行主要用channel实现。在这种同步信号的使用场景中,使用无缓冲通道,可以选择不关闭通道。
文章目录
- 协程同步背景介绍
- 无缓冲通道的作用
- 为什么不需要关闭通道
- 何时需要关闭通道
- 总结
- 常见的同步场景
- 顺序执行10个Goroutine
- 两个 Goroutines 的交替执行,交替打印偶数和奇数
- 两个 server 的任意一个执行完成,就执行第三个
- 两个 server 必须全部执行完成,再执行第三个。
- 三个 server 的必须按 1 2 3 顺序执行
- 一个生产者Goroutine,多个消费者Goroutine(每条消息,一次消费)
- 一个生产者Goroutine,多个消费者Goroutine(每条消息每个消费者都会消费)
- 历史文章
- MySQL数据库
- Redis
- Golang
协程同步背景介绍
无缓冲通道(unbuffered channel)通常用于同步 Goroutine 的执行顺序。它的作用是让 Goroutines 在指定的时刻按顺序执行,而不是用来传递数据。因此,在这种同步信号传递的场景中,不需要关闭通道,程序依然能够正常运行。
无缓冲通道的作用
无缓冲通道的特性是:
- 发送方必须等待接收方接收数据,才能继续发送下一个数据。
- 它本质上是一个同步机制,用于协调不同 Goroutine 的执行顺序。
这种同步机制不涉及数据传输,只是信号传递。
为什么不需要关闭通道
-
同步控制:在你的代码中,通道仅仅用于同步 Goroutine 的执行,而不用于传输实际的数据。在同步场景下,通道充当的是“信号”的角色。接收方只关心信号的到来,处理完后就继续执行,通道是否关闭不会影响这一行为。
-
没有资源泄漏:由于通道没有存储任何数据,而是仅用于发送和接收信号,通道的关闭不会带来资源泄漏。即使不关闭通道,程序也会正确地运行,因为没有对通道进行进一步的发送操作。
-
避免复杂性:关闭通道通常用于“通知接收方数据已经完成传输”,而在这种情况下,你只是在等待信号,不需要关心通道的关闭状态。因此,不关闭通道反而使得程序逻辑更简洁,避免了复杂的资源管理。
-
垃圾回收机制close():Go 的垃圾回收机制会自动处理那些不再使用的对象和数据结构,包括通道。所以即使没有显式关闭通道,程序结束时,未关闭的通道也会被垃圾回收。
何时需要关闭通道
在其他情况下,关闭通道是必要的,尤其是在以下几种场景:
-
多接收方模式:如果你有多个接收方从同一个通道接收数据,关闭通道可以通知所有接收方没有更多的数据可以接收。
-
通知没有更多数据:如果通道用于传输数据,关闭通道可以标识发送方不再有数据发送,这对于避免接收方阻塞和避免无限等待是非常重要的。
总结
在 无缓冲通道 的同步信号传递场景下:
- 不关闭通道是合理的,因为通道的作用仅仅是控制同步,而不是用于传输数据。
- 通道的关闭通常是在传输数据时,通知接收方“没有更多数据了”,而在同步信号的场景中,这种关闭操作并不必要。
所以,在 顺序执行的同步场景 中,如果你只是通过无缓冲通道传递信号,不关闭通道完全没有问题。
常见的同步场景
顺序执行10个Goroutine
这里选择了关闭channel,其实可以删掉close(),后续的例子不会再关闭channel。
package mainimport ("fmt""time"
)func main() {// 创建多个无缓冲的 Channel,用来控制 Goroutine 的顺序steps := make([]chan struct{}, 10)for i := 0; i < 10; i++ {steps[i] = make(chan struct{})}// 定义 10 个 Goroutine,按照顺序执行for i := 0; i < 10; i++ {go func(i int) {if i == 0 {// 第一个 Goroutine 不需要等待fmt.Println("Goroutine 1: Start")time.Sleep(1 * time.Second) // 模拟工作fmt.Println("Goroutine 1: Done")// 通知 Goroutine 2 可以开始steps[0] <- struct{}{}close(steps[0])} else {// 等待上一个 Goroutine 完成<-steps[i-1]fmt.Printf("Goroutine %d: Start\n", i+1)time.Sleep(1 * time.Second) // 模拟工作fmt.Printf("Goroutine %d: Done\n", i+1)// 通知下一个 Goroutine 可以开始if i < 9 {steps[i] <- struct{}{}close(steps[i])} else {// 最后一个 Goroutine,通知主线程完成steps[9] <- struct{}{}close(steps[9])}}}(i)}// 等待最后一个 Goroutine 完成<-steps[9]fmt.Println("All Goroutines Finished!")
}
输出:
Goroutine 1: Start
Goroutine 1: Done
Goroutine 2: Start
Goroutine 2: Done
Goroutine 3: Start
Goroutine 3: Done
Goroutine 4: Start
Goroutine 4: Done
Goroutine 5: Start
Goroutine 5: Done
Goroutine 6: Start
Goroutine 6: Done
Goroutine 7: Start
Goroutine 7: Done
Goroutine 8: Start
Goroutine 8: Done
Goroutine 9: Start
Goroutine 9: Done
Goroutine 10: Start
Goroutine 10: Done
All Goroutines Finished!
两个 Goroutines 的交替执行,交替打印偶数和奇数
package mainimport ("fmt"
)func main() {// 定义两个channel,分别用于控制打印顺序ch1 := make(chan struct{})ch2 := make(chan struct{})done := make(chan struct{}) // 用于通知主协程完成// 启动第一个goroutine,负责打印偶数go func() {for i := 0; i <= 9; i += 2 {<-ch1 // 等待信号fmt.Println(i) // 打印当前偶数ch2 <- struct{}{} // 通知另一个goroutine}}()// 启动第二个goroutine,负责打印奇数go func() {for i := 1; i <= 9; i += 2 {<-ch2 // 等待信号fmt.Println(i) // 打印当前奇数if i == 9 {done <- struct{}{} // 打印完成通知主协程} else {ch1 <- struct{}{} // 通知另一个goroutine}}}()// 主协程启动打印过程ch1 <- struct{}{} // 先给ch1发送信号,开始打印偶数// 主协程等待所有任务完成<-done
}
输出:
0
1
2
3
4
5
6
7
8
9
两个 server 的任意一个执行完成,就执行第三个
这里执行了5次,以验证随机性,time.Sleep代表server的一些执行的逻辑。
package mainimport ("fmt""time"
)func server1(ch chan string) {time.Sleep(1 * time.Second)ch <- "server1"
}
func server2(ch chan string) {time.Sleep(1 * time.Second)ch <- "server2"
}
func main() {for i := 0; i < 5; i++ {output1 := make(chan string)output2 := make(chan string)go server1(output1)go server2(output2)select {case s1 := <-output1:fmt.Println(s1, "server3")case s2 := <-output2:fmt.Println(s2, "server3")}fmt.Println("--------------")}fmt.Println("5组执行完成")
}
输出:
下面输出5组,前面的server顺序不一定(哪个先完成都行)。
server1 server3
--------------
server2 server3
--------------
server1 server3
--------------
server1 server3
--------------
server1 server3
--------------
5组执行完成
说明:
select如果加上default会直接命中default,不会等待两个通道。
两个 server 必须全部执行完成,再执行第三个。
这里执行了5次,以验证随机性,time.Sleep代表server的一些执行的逻辑。
package mainimport ("fmt""time"
)func server1(ch chan string) {time.Sleep(1 * time.Second)ch <- "server1"
}
func server2(ch chan string) {time.Sleep(1 * time.Second)ch <- "server2"
}
func main() {for i := 0; i < 5; i++ {output1 := make(chan string)output2 := make(chan string)go server1(output1)go server2(output2)for i := 0; i < 2; i++ {select {case s1 := <-output1:fmt.Println(s1)case s2 := <-output2:fmt.Println(s2)}}fmt.Println("server3")fmt.Println("--------------")}fmt.Println("5组执行完成")
}
上面使用了for+select控制2个server完成,也可以使用WaitGroup,如下:
package mainimport ("fmt""sync""time"
)func server1(wg *sync.WaitGroup) {defer wg.Done() // 在函数退出时通知 WaitGrouptime.Sleep(1 * time.Second)fmt.Println("server1")
}
func server2(wg *sync.WaitGroup) {defer wg.Done() // 在函数退出时通知 WaitGrouptime.Sleep(1 * time.Second)fmt.Println("server2")
}
func main() {for i := 0; i < 5; i++ {wg := sync.WaitGroup{}wg.Add(2)go server1(&wg)go server2(&wg)wg.Wait()fmt.Println("server3")fmt.Println("--------------")}fmt.Println("5组执行完成")
}
输出:
下面输出5组,前面的server顺序不一定(哪个先完成都行)。
server1
server2
server3
--------------
server1
server2
server3
--------------
server2
server1
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
5组执行完成
三个 server 的必须按 1 2 3 顺序执行
这里执行了5次,以验证随机性,time.Sleep代表server的一些执行的逻辑。
package mainimport ("fmt""sync""time"
)func server1(ch chan string, wg *sync.WaitGroup) {defer wg.Done() // 在函数退出时通知 WaitGrouptime.Sleep(1 * time.Second) // 模拟工作ch <- "server1"
}func server2(ch chan string, wg *sync.WaitGroup) {defer wg.Done() // 在函数退出时通知 WaitGrouptime.Sleep(1 * time.Second) // 模拟工作ch <- "server2"
}func main() {for i := 0; i < 5; i++ {output1 := make(chan string)output2 := make(chan string)var wg sync.WaitGroup// 启动 server1 和 server2wg.Add(2) // 等待两个 Goroutine 完成go server1(output1, &wg)go server2(output2, &wg)// 确保按顺序输出 server1, server2, server3s1 := <-output1s2 := <-output2fmt.Println(s1)fmt.Println(s2)fmt.Println("server3")fmt.Println("--------------")}// 完成5组任务fmt.Println("5组执行完成")
}
输出:
server1
server2
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
5组执行完成
一个生产者Goroutine,多个消费者Goroutine(每条消息,一次消费)
此处生产者生产的每个消息,只会有一个消费者消费,并发消费。
package mainimport ("fmt""sync""time"
)func producer(ch chan int, wg *sync.WaitGroup) {defer wg.Done()for i := 1; i <= 5; i++ {fmt.Printf("Produced: %d\n", i)ch <- i // 向通道发送数据time.Sleep(time.Second)}close(ch) // 生产者完成后关闭通道
}func consumer(id int, ch chan int, wg *sync.WaitGroup) {defer wg.Done()for task := range ch { // 从通道读取任务直到通道关闭fmt.Printf("Consumer %d processing task: %d\n", id, task)time.Sleep(2 * time.Second) // 模拟消费任务的延时}
}func main() {var wg sync.WaitGrouptasks := make(chan int, 10) // 定义缓冲区大小为10的任务通道// 启动多个消费者for i := 1; i <= 3; i++ {wg.Add(1)go consumer(i, tasks, &wg)}// 启动生产者wg.Add(1)go producer(tasks, &wg)// 等待所有消费者完成工作wg.Wait()fmt.Println("All tasks completed.")
}
输出:
Produced: 1
Consumer 2 processing task: 1
Produced: 2
Consumer 1 processing task: 2
Produced: 3
Consumer 3 processing task: 3
Produced: 4
Consumer 2 processing task: 4
Produced: 5
Consumer 1 processing task: 5
All tasks completed.
一个生产者Goroutine,多个消费者Goroutine(每条消息每个消费者都会消费)
此处生产者生产的每个消息,每个消费者都会消费。
方案:
-
我们应该设计一个 多播机制(广播模式),即每个任务都会被多个消费者消费。最简单的方式是使用 复制通道(通过多个 goroutine 消费同一个任务通道)。
-
可以通过 使用多个通道,每个消费者都从这些通道中接收任务,或者使用
sync.WaitGroup
等方式来确保每个消费者都能够完成任务处理。
package mainimport ("fmt""sync""time"
)func producer(channels []chan int, wg *sync.WaitGroup) {defer wg.Done() // 确保 producer 完成时通知 WaitGroup// 生产 5 个任务for i := 1; i <= 5; i++ {fmt.Printf("Produced: %d\n", i)// 向每个通道发送任务for _, ch := range channels {ch <- i}time.Sleep(time.Second)}// 所有任务发送完毕后,关闭每个通道for _, ch := range channels {close(ch)}
}func consumer(id int, ch chan int, wg *sync.WaitGroup) {defer wg.Done() // 确保 consumer 完成时通知 WaitGroup// 从通道中接收任务并处理for task := range ch {fmt.Printf("Consumer %d processing task: %d\n", id, task)time.Sleep(2 * time.Second) // 模拟任务处理延迟}
}func main() {var wg sync.WaitGroupnumConsumers := 3channels := make([]chan int, numConsumers)// 创建多个消费者通道for i := 0; i < numConsumers; i++ {channels[i] = make(chan int, 10)}// 启动多个消费者for i := 1; i <= numConsumers; i++ {wg.Add(1)go consumer(i, channels[i-1], &wg)}// 启动生产者wg.Add(1)go producer(channels, &wg)// 等待所有消费者和生产者完成工作wg.Wait()fmt.Println("All tasks completed.")
}
输出:
Produced: 1
Consumer 1 processing task: 1
Consumer 2 processing task: 1
Consumer 3 processing task: 1
Produced: 2
Produced: 3
Consumer 1 processing task: 2
Consumer 2 processing task: 2
Consumer 3 processing task: 2
Produced: 4
Produced: 5
Consumer 2 processing task: 3
Consumer 1 processing task: 3
Consumer 3 processing task: 3
Consumer 1 processing task: 4
Consumer 2 processing task: 4
Consumer 3 processing task: 4
Consumer 3 processing task: 5
Consumer 2 processing task: 5
Consumer 1 processing task: 5
All tasks completed.
历史文章
MySQL数据库
MySQL数据库
Redis
Redis数据库笔记合集
Golang
- Golang笔记——语言基础知识
- Golang笔记——切片与数组
- Golang笔记——hashmap
- Golang笔记——rune和byte
- Golang笔记——channel
- Golang笔记——Interface类型
- Golang笔记——数组、Slice、Map、Channel的并发安全性
- Golang笔记——协程同步