DeepSeek 说 Goroutines 和 Channels
最近非常流行询问DeepSeek某些相关概念或热点的解释,因此在开始系统性地学习《Go语言圣经》之前,我首先向DeepSeek进行了提问。具体的Prompt如下:
有关Golang当中的Goroutines和Channels,我现在要对这两个知识点进行系统性地学习,请您深入浅出地为我介绍这些知识点,并重点强调我需要注意的地方。
Goroutines
基本概念
Goroutine 是 Golang 中的轻量级线程(协程),由 Go 运行时管理。与操作系统级别的线程相比,Goroutine 的创建和管理开销更小,且内存占用更低。可以通过go
关键字启动一个 Goroutine,go
关键字的操作对象是 Golang 的函数。
一个启动以匿名函数为对象的协程的示例如下:
go func() {fmt.Println("Hello from a goroutine")
}()
注意事项
- 并发 vs 并行:Goroutines 是并发执行的,但不一定是并行的。Golang 运行时会根据 CPU 核数来调度 Goroutines;
- Goroutine 生命周期:Goroutine 在函数返回时结束(这个函数值的是使用
go
关键字启动的函数)。如果主 Goroutine 结束(即 main 函数结束),那么所有其他 Goroutines 都将被终止。 - 资源管理:Goroutines 可能会泄露资源,尤其是在它们被阻塞或无限循环时。确保 Goroutines 能够正常退出,这样可以有效地对资源进行管理,避免资源泄露。
示例
package mainimport ("fmt""time"
)func printNumbers() {for i := 1; i <= 5; i++ {fmt.Println(i)time.Sleep(500 * time.Millisecond)}
}func main() {go printNumbers()go printNumbers()time.Sleep(3 * time.Second)
}
在 Goland 当中的运行结果如下:
1
1
2
2
3
3
4
4
5
5进程 已完成,退出代码为 0
(实际上,Golang 当中的 go 关键字我个人认为与 Python 当中的 Thread 非常的相似,Golang 当中 Goroutines 的优点可能是更加轻量和高效)
Channels
基本概念
Channel 是 Goroutines 之间通信的管道。可以通过 Channel 发送和接收数据,确保 Goroutines 之间的同步(我个人认为这是 Golang 当中最明显的优点,因为根据我非常有限的 Python 开发的经验,Python 至少不是在语言级别上支持线程之间通信的)。
可以通过如下方式创建一个 channel,使用关键字chan
:
ch := make(chan int)
注意事项
- Channel 类型:Channel 可以是带缓冲的和无缓冲的。无缓冲 Channel 是同步的,发送和接受操作都会阻塞,直到另一端准备好。带缓冲 Channel 允许在缓冲区未满时发送数据,或在缓冲区不为空时接收数据。
- 关闭 Channel:使用
close(ch)
来关闭 channel。关闭后,channel 不能再发送数据,但是仍然可以接收数据。 - 死锁:如果 Goroutines 都在等待对方发送或接收数据,那么可能会导致死锁。开发人员应确保 Channel 的使用不会导致这种情况。
示例
package mainimport ("fmt""time"
)func worker(id int, jobs <-chan int, results chan<- int) {for j := range jobs {fmt.Println("worker", id, "started job", j)time.Sleep(time.Second)fmt.Println("worker", id, "finished job", j)results <- j * 2// 根据 DeepSeek 给出的上述用法// 可以推测使用关键字 <-chan 声明的管道是发送数据的管道// 使用 chan<- 是接收数据的管道}
}func main() {jobs := make(chan int, 100)results := make(chan int, 100)for w := 1; w <= 3; w++ {go worker(w, jobs, results)}for j := 1; j <= 5; j++ {jobs <- j // 在主 goroutine 向 协程 发送数据, 通过管道 jobs 来完成}close(jobs)for a := 1; a <= 5; a++ {<-results}
}
在 Goland 中输出的结果如下:
worker 3 started job 1
worker 1 started job 2
worker 2 started job 3
worker 2 finished job 3
worker 2 started job 4
worker 1 finished job 2
worker 1 started job 5
worker 3 finished job 1
worker 1 finished job 5
worker 2 finished job 4进程 已完成,退出代码为 0
结合使用 Goroutines 和 Channels
基本概念
Goroutines 和 Channels 通常结合使用,以实现并发任务的协调和通信。
注意事项
- 同步:使用 Channel 来同步 Goroutines 的执行顺序;
- 数据竞争:确保多个 Goroutines 不会同时访问和修改共享数据,可以使用 Channel 或
sync
包中的同步原语(如Mutex
)来避免数据竞争。
示例
package mainimport ("fmt""time"
)func producer(ch chan<- int) {for i := 0; i < 5; i++ {ch <- ifmt.Println("Produced:", i)time.Sleep(500 * time.Millisecond)}close(ch)
}func consumer(ch <-chan int) {for i := range ch {fmt.Println("Consumed:", i)time.Sleep(1 * time.Second)}
}func main() {ch := make(chan int)go producer(ch)consumer(ch)
}
Goland 下的输出:
Produced: 0
Consumed: 0
Produced: 1
Consumed: 1
Consumed: 2
Produced: 2
Consumed: 3
Produced: 3
Consumed: 4
Produced: 4进程 已完成,退出代码为 0
总结
- Goroutines 是 Go 并发编程的核心,轻量且高效。
- Channels 是 Goroutines 之间通信的主要方式,确保数据的安全传递;
- 注意事项:理解并发和并行的区别,避免资源泄露和死锁,确保数据同步和避免数据竞争。
第八节:Goroutines 和 Channels
Golang 的并发程序可以通过两种手段来实现。本章讲解 goroutine 和 channel,其支持“顺序通信进程”(communicating sequential processes,CSP)。CSP 是一种现代的并发编程模型,在这种模型中,值会在不同的实例(goroutine,正如我们刚才所看到的)之间传递。第九章将会覆盖更为传统的并发模型:多线程共享内存(比如通过原语Mutex
来锁定共享资源)。
8.1 Goroutines
Golang 当中,每个并发执行的单元叫做一个 goroutine。
我们可以将 goroutine 理解为操作系统当中的一个线程,goroutine 与线程之间的关系将在 9.8 节提及。
当一个程序启动时,其 main 函数就是一个在 goroutine 上运行的函数,这个 goroutine 也被称为主 goroutine(类似于 daemon thread 的概念)。
当一个程序启动时,其主函数就在一个单独的 goroutine 中运行,称之为 main goroutine。新的 goroutine 应该使用 Golang 当中的 go
关键字来创建(正如我们刚才在 DeepSeek 的答复当中所看到的)。其用法如下:
f()
go f() // 注意在 f 后面需要再添加一对括号
下例在 main 当中计算 Fibonacci 的第四十五个元素的值,由于递归函数的计算速度较慢,我们使用 Goroutine 启动一个简单的协程,来在递归函数计算的过程中显示加载符号,表示程序仍在运行当中:
package mainimport ("fmt""time"
)func spinner(delay time.Duration) {str := []rune{'-', '\\', '|', '/'}for {for r := range str {fmt.Printf("\r%c", r)time.Sleep(delay)}}
}func fib(x int) int {if x < 2 {return x}return fib(x-1) + fib(x-2)
}func main() {go spinner(100 * time.Millisecond)const n = 45fibN := fib(n)fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN)
}
8.2 示例:并发的 Clock 服务
网络编程是并发大显身手的领域,因为i服务器是最典型的需要同时处理很多连接的程序,这些链接一般来自于彼此独立的客户端。本小节将讲解 Golang 的 net 包,这个包提供编写一个网络客户端或服务器程序的基本组件,无论二者通信使用 TCP、UDP 还是 Unix domain sockets。
第一个执行的例子是一个顺序执行的时钟服务器,它会每隔一秒将当前时间写入客户端。
首先,我们按照如下方式对文件夹进行组织:
其中 server 下的 main 函数中写入 server 的逻辑,client 下的 main 函数写入 client 的逻辑。
首先来编写 server 的逻辑,它是一个时钟服务器,每隔一段时间它就要将当前时间发送给客户端,它的业务代码如下:
package mainimport ("io""log""net""time"
)func main() {lis, err := net.Listen("tcp", "localhost:8080") // 在 localhost:8080 对 tcp 连接进行监听if err != nil {log.Fatal(err)}for {conn, err := lis.Accept() // 监听是否有客户端连接, 使用 for {...} 来完成if err != nil {log.Print(err)continue}handleConn(conn) // 监听到连接之后, 通过 handleConn 向客户端发送数据}
}func handleConn(c net.Conn) {defer c.Close() // 使用 defer 在业务完成后关闭连接for {_, err := io.WriteString(c, time.Now().Format("15:04:05\n"))if err != nil {return}time.Sleep(1 * time.Second)}
}
net.Listen
函数创建一个net.Listener
对象,该对象会监听一个网络端口上到来的连接,此例中使用的是 TCP 的 localhost:8080
端口。listener
的Accept
方法将会直接阻塞,直到一个新的连接被闯进啊,然后返回一个net.Conn
对象来表示这个连接。
handleConn
函数会处理一个完整的客户端连接。在一个 for 的永真循环当中,用 time.Now()
获取当前时刻,然后写入到客户端。由于 net.Conn
实现了 io.Writer
接口,因此我们可以直接向其中写入内容。这个循环会一直执行下去,直到写入失败。写入失败的一个最可能的原因是客户端主动断开连接。该情况下 handleConn
函数会用 defer 来关闭服务端连接,返回主函数,继续等待下一个连接请求。
time.Time.Format
方法提供了一种格式化日期和时间信息的方式。
为了连接服务器,我们用 Golang 实现一个简单的 telnet 程序:
package mainimport ("io""log""net""os"
)func main() {conn, err := net.Dial("tcp", "localhost:8080")if err != nil {log.Fatal(err)}defer conn.Close()mustCopy(os.Stdout, conn)
}func mustCopy(dst io.Writer, src io.Reader) {if _, err := io.Copy(dst, src); err != nil {log.Fatal(err)}
}
这个程序会从连接中读取数据,并将读到的内容写到标准输出中,直到遇到end of file的条件或者发生错误。
只运行一个客户端时候的输出如下:
22:54:01
22:54:02
22:54:03
22:54:04
22:54:05
22:54:06
22:54:07
22:54:08
...
下面测试运行两个客户端,在两个终端中对 client 下的 main 使用go run main.go
,发现只有当第一个客户端终止时,第二个客户端才开始接收服务端的消息。此时最好的办法是将服务端的 handleConn 变为协程,使得服务端可以不断地处理连接,而不必阻塞:
func main() {lis, err := net.Listen("tcp", "localhost:8080") // 在 localhost:8080 对 tcp 连接进行监听if err != nil {log.Fatal(err)}for {conn, err := lis.Accept() // 监听是否有客户端连接, 使用 for {...} 来完成if err != nil {log.Print(err)continue}go handleConn(conn) // 启动协程}
}
8.3 示例:并发的 Echo 服务
Clock 服务器的每个连接都会起一个 goroutine。本节创建一个 Echo 服务器,这个服务在每个连接中会有多个 goroutine。大多数 echo 服务非常简单,仅返回它们读取到的内容:
func handleConn(c net.Conn) {io.Copy(c, c) // NOTE: ignoring errorsc.Close()
}
一个更有趣的 echo 服务模拟一个实际的 echo 回响:
func echo(c net.Conn, shout string, delay time.Duration) {fmt.Fprintln(c, "\t", strings.ToUpper(shout))time.Sleep(delay)fmt.Fprintln(c, "\t", shout)time.Sleep(delay)fmt.Fprintln(c, "\t", strings.ToLower(shout))
}func handleConn(c net.Conn) {input := bufio.NewScanner(c)for input.Scan() {echo(c, input.Text(), 1*time.Second)}// NOTE: ignoring potential errors from input.Err()c.Close()
}
此时我们需要升级我们的客户端程序,这样它就可以发送终端的输入给服务器,并把服务端的返回输出到终端上,这使得我们有了使用并发的另一个好机会:
func main() {conn, err := net.Dial("tcp", "localhost:8080")if err != nil {log.Fatal(err)}defer conn.Close()go mustCopy(os.Stdout, conn)mustCopy(conn, os.Stdin)
}
当 main goroutine 从标准输入流读取数据并将其发送给服务器时(mustCopy(conn, os.Stdin)
在做的事情),另一个 goroutine (go mustCopy(os.Stdout, conn)
在做的事情)会读取并打印服务端的响应。
下面我们先后启动服务端和客户端程序,并输入:
现存的一个问题是,如果在客户端连续输入,服务端仍然会顺序执行客户端的请求,我们希望服务端可以并行地处理客户端的请求,此时就需要我们再一次使用 go 关键字,来调用 echo:
func handleConn(c net.Conn) {input := bufio.NewScanner(c)for input.Scan() {go echo(c, input.Text(), 1*time.Second)}// NOTE: ignoring potential errors from input.Err()c.Close()
}
8.4 Channels
如果说 Goroutine 是 Golang 的并发体,那么 channels 则是它们之间的通信机制。
一个 goroutine 可以通过 channel 向另一个 goroutine 发送信息。每个 channel 都有一个特殊的类型,也就是 channels 可发送数据的类型。一个可以发送的 int 类型数据的 channel 一般类型写作 chan int
。
可以通过内置的 make 函数创建 channel:
ch := make(chan int)
和 map 类似,channel 也对应一个 make 创建的底层数据结构的引用。当我们复制一个 channel 或用于函数参数传递时,我们只是拷贝了一个 channel 的引用,因此调用者和被调用者将引用同一个 channel 对象。和其他的引用类型一样,channel 的零值也是 nil。
两个相同类型的 channel 可以使用 ==
来比较,如果两个 channel 引用的是相同的对象(比如,比较一个复制的 channel 和原先的 channel),那么比较结果为真。channel 也可以和 nil 比较。
channel 有发送和接收两个主要操作,都是通信行为。一个发送语句将一个值从一个 goroutine 通过 channel 发送到另一个执行接受操作的 goroutine。发送和接收两个操作都使用 <-
运算符。在发送语句中,<-
运算符分割 channel 和要发送的值。在接收语句中,<-
运算符写在 channel 对象之前。一个不使用接收结果的接收操作也是合法的。
ch <- x // a send statement
x = <- ch // a receive expression in an assignment statement
<- ch // a receive statement; result is discarded
Channel 支持 close 操作。close 了的 channel 不能够再发送消息,任何通过 close 掉的 channel 发送的消息都会导致 panic 异常。但是 close 了的 channel 仍然可以接收到之前已经成功发送了的数据:
close(ch)
可以使用内置的 make 函数创建最简单的 channel,不加额外参数地使用 make 创建的 channel 是不带缓存的,可以指定第二个整型参数,它代表指定 channel 的容量。如果 channel 的容量大于零,它就是带缓存的 channel:
ch = make(chan int) // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // buffered channel with capacity 3
8.4.1 不带缓存的 channels
(重要:)一个基于无缓存的 channel 的发送操作将导致发送者的 goroutine 阻塞,直到另一个 goroutine 在相同的 channel 上执行接收操作。当发送的值通过 channel 成功传输之后,两个 goroutine 才可以继续执行后面的语句。反之,如果接收操作先发生,那么接收者 goroutine 也将阻塞,直到另一个 goroutine 在相同的 channel 上执行发送操作。
基于无缓存的 channel 的发送和接收操作将导致两个 goroutine 做一次同步操作。因为这个原因,无缓存 channel 也被称为同步 channels。当在一个 goroutine A 当作使用无缓存 channel 发送数据时,我们假定另一个 goroutine B 接收 A 发送的数据,goroutine B 接收到数据发生在唤醒发送这条数据的 goroutine A 之前(译注:happens before,这是 Golang 并发内存模型的一个关键术语)。
在讨论并发编程时,当我们说事件 x 在 y 之前发生(happens before),并不是说 x 在时间上比 y 更早完成,而是保证在 y 执行之前,x 已经完成了。而当 x 既不是在 y 之前也不是在 y 之后发生时,称 x 和 y 是并发的。并发不意味着两件事情同时发生,而是表明了我们不能确定两件事情的执行顺序。
基于 channels 发送消息有两个重点。首先,每个消息都有一个值,但是有时候通讯的事实和发生的时刻也同样重要。当我们更希望强调通讯发生的时刻时,称之为消息事件。有些消息事件并不携带额外的信息,它仅仅用作两个 goroutine 之间的同步,此时可以使用空结构体struct{}
。
8.4.2 串联的 channels
channels 也可用于将多个 goroutine 连接在一起,一个 channel 的输出作为下一个 channel 的输入。串联的 channel 就是所谓的管道(pipeline)。下面的程序用两个 channels 将三个 goroutine 串联起来,如图 8.1 所示:
第一个 goroutine 是一个计数器,生成0, 1, 2, ...
形式的整数序列,通过 channel 将序列发送给第二个 goroutine;第二个 goroutine 是一个求平方的程序,对收到的每个整数求平方,然后将平方后的结果通过第二个 channel 发送给第三个 goroutine;第三个 goroutine 是一个打印程序,打印每个收到的整数:
package mainimport "fmt"func main() {naturals := make(chan int)squares := make(chan int)// Countergo func() {for x := 0; ; x++ {naturals <- x}}()// Squarergo func() {for {x := <-naturalssquares <- x * x}}()for {fmt.Println(<-squares)}
}
上述串联的 channel 管道(Pipelines)可以用在需要长时间运行的服务上【经过实测,上述程序将会一直处在永真循环当中不断输出】。每个长时间运行的 goroutine 可能会包含一个永真循环,在不同 goroutine 的永真循环内部可以使用串联的 channels 来通信。但如果我们希望通过 channels 只发送有限个数据,应该如何处理?【将 Counter 对应的 goroutine 当中 x 的上限改为 100,确实会在 Goland 当中报错】
可以通过内置的 close 来关闭 channel,通知接收者 goroutine 没有更多的数据从 channel 传输过来了,比如:
close(naturals)
当一个 channel 被关闭后,再向该 channel 发送数据将导致 panic 异常。当一个被关闭的 channel 中已经发送的数据都被成功接收后,后续的操作将不再被阻塞,它会立即返回一个零值。关闭 naturals 对应的 channel 不会终止循环,接收 naturals 发送数据的 Squarer goroutine 依然会收到无休止的零值,然后发送给 Printer 打印。
虽然我们没有办法直接测试一个 channel 是否关闭,但是接收操作有一个变体形式:第一个接收值是结果,而第二个接收值是一个 bool 类型,true 表示成功从 channel 中接收到值,false 表示 channel 已经被关闭。基于上述特性修改 Squarer 的逻辑:
// Squarer
go func() {for {x, ok := <- naturalsif !ok {break}squares <- x * x}close(squares)
}()
上述语法是笨拙的,由于我们经常需要判断接收者所使用的 channel 目前是否有效,因此 Golang 直接支持使用 range 来对 channel 的接收操作进行迭代。经过完善的 pipeline 程序如下:
package mainimport "fmt"func main() {naturals := make(chan int)squares := make(chan int)// Countergo func() {for x := 0; x < 100; x++ {naturals <- x}close(naturals)}()// Squarergo func() {for x := range naturals {squares <- x * x}close(squares)}()for x := range squares {fmt.Println(x)}
}
【确实没问题,此时程序不再报错】
8.4.3 单方向的 channel
当程序越来越大时,我们可以将程序当中的功能拆分为若干个函数。以刚才的 pipeline 程序为例,可以将三个功能拆分为三个函数:
func counter(out chan int)
func squarer(out, in chan int)
func printer(in chan int)
一个问题在于,以 squarer 函数为例,我们只希望名为 out 的 channel 发送数据,名为 in 的 channel 接收数据,为了明确这种意图并防止 channel 的滥用,Golang 的类型系统提供了单方向的 channel 类型,分别用于表示只发送和只接受的 channel。具体来说,chan<- int
表示一个只发送 int 的 channel,而<-chan int
表示一个只接受 int 的 channel。
综上,改进版的 pipeline 如下:
package mainimport "fmt"func main() {naturals := make(chan int)squares := make(chan int)go counter(naturals)go squarer(squares, naturals)printer(squares)
}func counter(out chan<- int) {for x := 0; x < 100; x++ {out <- x}close(out)
}func squarer(out chan<- int, in <-chan int) {for v := range in {out <- v * v}close(out)
}func printer(in <-chan int) {for v := range in {fmt.Println(v)}
}
8.4.4 带缓存的 channels
带缓存的 channel 内部持有一个元素队列。队列的最大容量是在调用 make 函数创建 channel 的第二个参数指定的。下述语句创建了一个可以持有三个字符串元素的带缓存的 channel:
ch = make(chan string, 3)
向缓存 channel 发送数据的操作就是向元素内部缓存队列的尾部插入元素,接收操作则是从队列的头部删除元素。如果内部缓存队列已满,那么发送操作将阻塞直到另一个 goroutine 执行接收操作而释放了新的队列空间。相反,如果 channel 是空的,接受操作将阻塞直到有另一个 goroutine 执行发送操作而向队列插入元素。
我们可以无阻塞地向新创建的 channel 发送三个值:
ch <- "A"
ch <- "B"
ch <- "C"
此时,channel 的内部缓存队列将是满的,如果在这个时候有第四个向该 channel 的发送操作,则这个操作将会被阻塞。
如果我们接收一个值:
fmt.Println(<-ch)
channel 缓存队列的队首将弹出,此时 channel 的缓存队列既不满也不空,因此对 channel 执行发送和接收都不会阻塞。通过这种方式,channel 的缓存队列解耦了接收和发送的 goroutine。
可以使用内置的 cap 函数获取 channel 缓存区的容量:
fmt.Println(cap(ch)) // "3"
而内置的 len 方法会返回 channel 当中有效数据的个数:
fmt.Println(len(ch)) // "2"
在上例中,发送和接收操作都发生在同一个 goroutine 当中。但在真实的程序中它们一般由不同的 goroutine 来执行。Golang 新手有时候会将一个带缓存的 channel 当作同一个 goroutine 中的队列使用,但实际上这是错误的。channel 和 goroutine 机制是紧密相连的,如果没有其它 goroutine 从 channel 接收,发送者(或是整个程序)将面临永远阻塞的风险【这也很好理解,当 channel 满了的时候,没有消息的接收者必然会引发阻塞】。如果想使用队列,请使用 slice。
下面的例子展示了一个使用了带缓存channel的应用。它并发地向三个镜像站点发出请求,三个镜像站点分散在不同的地理位置。它们分别将收到的响应发送到带缓存channel,最后接收者只接收第一个收到的响应,也就是最快的那个响应。因此mirroredQuery函数可能在另外两个响应慢的镜像站点响应之前就返回了结果。(顺便说一下,多个goroutines并发地向同一个channel发送数据,或从同一个channel接收数据都是常见的用法。)
func mirroredQuery() string {responses := make(chan string, 3)go func() { responses <- request("asia.gopl.io") }()go func() { responses <- request("europe.gopl.io") }()go func() { responses <- request("americas.gopl.io") }()return <-responses // return the quickest response
}func request(hostname string) (response string) { /* ... */ }
如果我们使用了无缓存的channel,那么两个慢的goroutines将会因为没有人接收而被永远卡住。这种情况,称为goroutines泄漏,这将是一个BUG。和垃圾变量不同,泄漏的goroutines并不会被自动回收,因此确保每个不再需要的goroutine能正常退出是重要的。
关于无缓存或带缓存channels之间的选择,或者是带缓存channels的容量大小的选择,都可能影响程序的正确性。无缓存channel更强地保证了每个发送操作与相应的同步接收操作;但是对于带缓存channel,这些操作是解耦的。同样,即使我们知道将要发送到一个channel的信息的数量上限,创建一个对应容量大小的带缓存channel也是不现实的,因为这要求在执行任何接收操作之前缓存所有已经发送的值。如果未能分配足够的缓存将导致程序死锁。
Channel的缓存也可能影响程序的性能。想象一家蛋糕店有三个厨师,一个烘焙,一个上糖衣,还有一个将每个蛋糕传递到它下一个厨师的生产线。在狭小的厨房空间环境,每个厨师在完成蛋糕后必须等待下一个厨师已经准备好接受它;这类似于在一个无缓存的channel上进行沟通。
如果在每个厨师之间有一个放置一个蛋糕的额外空间,那么每个厨师就可以将一个完成的蛋糕临时放在那里而马上进入下一个蛋糕的制作中;这类似于将channel的缓存队列的容量设置为1。只要每个厨师的平均工作效率相近,那么其中大部分的传输工作将是迅速的,个体之间细小的效率差异将在交接过程中弥补。如果厨师之间有更大的额外空间——也是就更大容量的缓存队列——将可以在不停止生产线的前提下消除更大的效率波动,例如一个厨师可以短暂地休息,然后再加快赶上进度而不影响其他人。
另一方面,如果生产线的前期阶段一直快于后续阶段,那么它们之间的缓存在大部分时间都将是满的。相反,如果后续阶段比前期阶段更快,那么它们之间的缓存在大部分时间都将是空的。对于这类场景,额外的缓存并没有带来任何好处。
生产线的隐喻对于理解channels和goroutines的工作机制是很有帮助的。例如,如果第二阶段是需要精心制作的复杂操作,一个厨师可能无法跟上第一个厨师的进度,或者是无法满足第三阶段厨师的需求。要解决这个问题,我们可以再雇佣另一个厨师来帮助完成第二阶段的工作,他执行相同的任务但是独立工作。这类似于基于相同的channels创建另一个独立的goroutine。
8.5 并发的循环
这一节的学习我不再原封不动地照搬 《Go 语言圣经》的原文,而是重点挑其中两个代码片段来说。
这一节引入一个名为thumbnail.ImageFile()
的函数,它的参数是一个文件,我们可以简单地认为这个函数可以对文件进行处理。
第一个例子当中实现了一个带 buffer 的 channel,返回生成的文件的名字(调用上述方法的第一个返回值),附带生成时的错误(第二个参数):
// makeThumbnails5 makes thumbnails for the specified files in parallel.
// It returns the generated file names in an arbitrary order,
// or an error if any step failed.
func makeThumbnails5(filenames []string) (thumbfiles []string, err error) {type item struct {thumbfile stringerr error}ch := make(chan item, len(filenames))for _, f := range filenames {go func(f string) {var it itemit.thumbfile, it.err = thumbnail.ImageFile(f)ch <- it}(f)}for range filenames {it := <-chif it.err != nil {return nil, it.err}thumbfiles = append(thumbfiles, it.thumbfile)}return thumbfiles, nil
}
使用带 buffer 的 channel 的原因是,如果使用 unbuffered channel,那么 channel 接收第一个消息之后,没有另一个 goroutine 从 channel 当中处理消息,这个 channel 将会一直阻塞下去,导致 goroutine leak。
第二个例子实现了一个新版的 makeThumbnails,它返回新生成文件的大小计数。此例当中没有把文件名放在 slice 当中,而是通过一个 string 的 channel 传过来,因此我们无法对循环的次数进行预测。
为了知道最后一个 goroutine 何时结束,我们需要一个递增的计数器,每个 goroutine 启动时加一。这个特殊的计数器被称为sync.WaitGroup
:
// makeThumbnails6 makes thumbnails for each file received from the channel.
// It returns the number of bytes occupied by the files it creates.
func makeThumbnails6(filenames <-chan string) int64 {sizes := make(chan int64)var wg sync.WaitGroup // number of working goroutinesfor f := range filenames {wg.Add(1) // 计数器自增// workergo func(f string) {defer wg.Done() // 计数器递减thumb, err := thumbnail.ImageFile(f)if err != nil {log.Println(err)return}info, _ := os.Stat(thumb) // OK to ignore errorsizes <- info.Size()}(f)}// closergo func() {wg.Wait() // 阻塞直到所有 goroutines 执行完毕close(sizes)// 所有 goroutine 执行完毕后, 可以关闭通道了}()var total int64for size := range sizes {total += size}return total
}
8.7 基于 select 的多路复用
下面的程序会进行火箭发射的倒计时。time.Tick
函数返回一个 channel,程序会周期性地向 channel 发送事件。每一个事件的值是一个时间戳:
func main() {fmt.Println("Commencing countdown.")tick := time.Tick(1 * time.Second)for countdown := 10; countdown > 0; countdown -- {fmt.Println(countdown)<- tick}launch()
}
现在为这个程序引入“中断发射流程”的功能。首先我们启动一个 goroutine,这个 goroutine 会尝试从标准输入中读入一个单独地 byte,并且,如果读入操作成功了,会向名为 abort 的 channel 发送一个值。
abort := make(chan struct{})
go func() {os.Stdin.Read(make([]byte, 1))abort <- struct{}{}
}()
现在每一次计数循环的迭代都需要等待两个 channel 中的其中一个返回事件了。此时我们需要 select 来帮助我们实现多路复用更能:
select {case <- ch1:// ...case x := <- ch2:// ... use x ...case ch3 <- y:// ...default:// ...
}
上面是 select 语句的一般形式。和 switch 语句稍微有点相似,也会有几个 case 和最后的 default 选择分支。每一个 case 代表一个通信操作(在某个channel上进行发送或者接收),并且会包含一些语句组成的一个语句块。一个接收表达式可能只包含接收表达式自身(译注:不把接收到的值赋值给变量什么的),就像上面的第一个 case ,或者包含在一个简短的变量声明中,像第二个 case 里一样;第二种形式让你能够引用接收到的值。
select 会等待 case 中有能够执行的 case 时去执行。当条件满足时,select 才会去通信并执行 case 之后的语句;这时候其它通信是不会执行的。一个没有任何 case 的 select 语句写作 select{} ,会永远地等待下去。
回到火箭发射程序,time.After
函数会立即返回一个 channel,并起一个新的 goroutine 在经过特定时间后向该 channel 发送一个独立的值。下面的 select 会一直等待两个事件当中的一个到达,无论是 abort 还是一个需要 10s 的事件。如果 10s 经过之后 abort 还没有进入,那么火箭将会发射:
func main() {// ...create abort channel...fmt.Println("Commencing countdown. Press return to abort.")select {case <-time.After(10 * time.Second):// Do nothing.case <-abort:fmt.Println("Launch aborted!")return}launch()
}
下例更为微妙,创建一个 buffer 1 的 channel,它会交替为空和满。所以只有一个 case 可以进行下去,无论 i 是奇数还是偶数,都会打印 0、2、4、6、8。
ch := make(chan int, 1)
for i := 0; i < 10; i++ {select {case x := <-ch:fmt.Println(x) // "0" "2" "4" "6" "8"case ch <- i:}
}
如果多个case同时就绪时,select会随机地选择一个执行,这样来保证每一个channel都有平等的被select的机会。增加前一个例子的buffer大小会使其输出变得不确定,因为当buffer既不为满也不为空时,select语句的执行情况就像是抛硬币的行为一样是随机的。
下面让我们的发射程序打印倒计时。这里的 select 语句会使每次循环迭代等待一秒来执行退出操作。
func main() {// ...create abort channel...fmt.Println("Commencing countdown. Press return to abort.")tick := time.Tick(1 * time.Second)for countdown := 10; countdown > 0; countdown-- {fmt.Println(countdown)select {case <-tick:// Do nothing.case <-abort:fmt.Println("Launch aborted!")return}}launch()
}
上述程序的一个问题在于,当程序返回之后,它会停止从 tick 接收事件,但是由 time.Tick
创建的 goroutine 依然存活,继续徒劳地向名为 tick
的 channel 发送数据,然而此时已经没有其他的 goroutine 会从该 channel 接收值了,这就被称为 goroutine 泄露。
Tick函数挺方便,但是只有当程序整个生命周期都需要这个时间时我们使用它才比较合适。否则的话,我们应该使用下面的这种模式:
ticker := time.NewTicker(1 * time.Second)
<-ticker.C // receive from the ticker's channel
ticker.Stop() // cause the ticker's goroutine to terminate
有时候我们希望能够从channel中发送或者接收值,并避免因为发送或者接收导致的阻塞,尤其是当channel没有准备好写或者读时。select语句就可以实现这样的功能。select会有一个default来设置当其它的操作都不能够马上被处理时程序需要执行哪些逻辑。
8.9 并发的退出
有时候我们需要通知 goroutine 停止它正在干的事情,比如一个正在执行计算的 web 服务,然而它的客户端已经断开了和服务端的连接。
我们应该如何退出两个或任意多个 goroutine?
为了达到上述目的,我们可以通过一个 channel 把消息光比出去,这样 goroutine 们看到这条消息,并且在事件完成之后,就知道这件事已经发生过了。
我们可以对关闭 channel 的机制进行拓展,来实现广播机制:不向 channel 发送值,而是用关闭一个 channel 来进行广播。
首先创建一个退出的 channel,不需要向这个 channel 发送任何值,但是其所在的闭包内要写明程序需要退出。同时定义一个工具函数 cancelled,这个函数在被调用时会轮询退出状态:
var done = make(chan struct{})func cancelled() bool {select {case <- done:return truedefault:return false}
}
下面我们创建一个从标准输入流中读取内容的goroutine,这是一个比较典型的连接到终端的程序。每当有输入被读到(比如用户按了回车键),这个goroutine就会把取消消息通过关闭done的channel广播出去。
// Cancel traversal when input is detected.
go func() {os.Stdin.Read(make([]byte, 1)) // read a single byteclose(done)
}()
现在我们需要使我们的goroutine来对取消进行响应。在main goroutine中,我们添加了select的第三个case语句,尝试从done channel中接收内容。如果这个case被满足的话,在select到的时候即会返回,但在结束之前我们需要把fileSizes channel中的内容“排”空,在channel被关闭之前,舍弃掉所有值。这样可以保证对walkDir的调用不要被向fileSizes发送信息阻塞住,可以正确地完成。
for {select {case <-done:// Drain fileSizes to allow existing goroutines to finish.for range fileSizes {// Do nothing.}returncase size, ok := <-fileSizes:// ...}
}
walkDir这个goroutine一启动就会轮询取消状态,如果取消状态被设置的话会直接返回,并且不做额外的事情。这样我们将所有在取消事件之后创建的goroutine改变为无操作。
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {defer n.Done()if cancelled() {return}for _, entry := range dirents(dir) {// ...}
}func dirents(dir string) []os.FileInfo {select {case sema <- struct{}{}: // acquire tokencase <-done:return nil // cancelled}defer func() { <-sema }() // release token// ...read directory...
}
现在当取消发生时,所有后台的goroutine都会迅速停止并且主函数会返回。当然,当主函数返回时,一个程序会退出,而我们又无法在主函数退出的时候确认其已经释放了所有的资源(译注:因为程序都退出了,你的代码都没法执行了)。这里有一个方便的窍门我们可以一用:取代掉直接从主函数返回,我们调用一个panic,然后runtime会把每一个goroutine的栈dump下来。如果main goroutine是唯一一个剩下的goroutine的话,他会清理掉自己的一切资源。但是如果还有其它的goroutine没有退出,他们可能没办法被正确地取消掉,也有可能被取消但是取消操作会很花时间;所以这里的一个调研还是很有必要的。我们用panic来获取到足够的信息来验证我们上面的判断,看看最终到底是什么样的情况。