1>. 信号量
2>. 限流能力

var sem = make(chan int, MaxOutstanding) func Serve(queue chan *Request) {for req := range queue {req:= reqsem <- 1   go func() {   // 只会开启MaxOutstanding个并发协程process(req)<-sem}()}


① sem 提供了限制服务端并发处理请求的信号量
② queue 提供了一个客户端请求队列,起媒介/解耦的作用

进一步指南给出了信道的另一个用法: 3>. 解多路复用

多路复用是网络编程中一个耳熟能详的概念,nginx redis等高性能web、内存kv都用到了这个技术 。


离散/独立/并发的客户端请求被服务端Serve收敛之后, Serve就起到了多路复用的概念,在Request定义resultChan信道,就给每个客户端请求提供了独立获取请求结果的能力,这便是一种解多路复用。




下面是我的工程化实现, 记录实践中遇到的问题。


  • 信道queue接收客户端请求,解耦客户端和服务器,天然具备排队能力

  • 信号量信道sem提供了并发受限的能力

  • 服务器处理完,向解多路复用信道req.resultChan写入响应结果。

/* 实现一个有请求队列功能的并发请求受限服务器*/package mainimport ("fmt""sync""time"
)var sem = make(chan int, Maxoutstanding)var wg2 sync.WaitGroupfunc server(queue chan *Request) {fmt.Printf("Server is already, listen req \n")for req := range queue {req := reqsem <- 1wg2.Add(1)go func() {defer wg2.Done()process(req)<-sem}()}
}func process(req *Request) {s := sum(req.args)req.resultChan <- s
func sum(a []int) (s int) {for i := 1; i <= a[0]; i++ {s += i}time.Sleep(time.Millisecond * 20)return s

time.Sleep模拟服务器处理请求单次耗时20ms, 输出数字的累加,
eg: input: 100;
output: (1+100)/2*100 =5050

wg2 sync.WaitGroup是一个动态活跃的Goroutine计数器,注意用法和位置,wg2的作用是:等待所有请求处理完成。



  • 每个请求入驻一个独立的Goroutine,独立向信道queue投递请求和接收响应

package mainimport ("fmt""sync"
)type Request struct {args       []intresultChan chan int
}var wg1 sync.WaitGroupfunc clients() {fmt.Printf("start %d concurrency client request\n ", concurrencyClients)for i := 1; i <= concurrencyClients; i++ {r := &Request{args:       []int{i},resultChan: make(chan int),}wg1.Add(1)go ClientReq(r)}wg1.Wait() }func ClientReq(r *Request) {defer wg1.Done()queue <- rgo func() {res := <-r.resultChanfmt.Printf("current args is %d, the result is %d \n", r.args[0], res)}()

wg1 WaitGroup的目的是确保所有的客户端请求都已经发出,之后客户端任务结束,所以此处我们新开Goroutine处理响应结果(这里又有闭包的参与)。



本处clients在主协程整体上是同步发送,如果放在clients()的后面,clients内的wg1可能会有部分请求Goroutine阻塞在信道queue, 且没法唤醒, 运行时会检测出报死锁。

package mainimport ("fmt""time"
)var concurrencyClients = 1000
var queueLength = 100
var queue = make(chan *Request, queueLength) // 请求队列长度
var Maxoutstanding int = 10                  // 服务器并发受限10func main() {go server(queue)var start = time.Now()clients() // 确保所有的请求都已经发出去wg2.Wait() // 确保服务器处理完所有的请求fmt.Printf("客户端并发%d请求,服务器请求队列长度%d,服务器限流%d,总共耗时%d ms \n", concurrencyClients, queueLength, Maxoutstanding, time.Since(start).Milliseconds())

1>.  客户端并发请求数量concurrencyClients=100
2>.  服务器排队队列长度queueLength=100, 会作用到信道queue
3>.  服务器并发受限阈值Maxoutstanding=10

服务器限流阈值 Maxoutstanding































That’s All,本文根据golang有关信道的指南, 实现了一个带有请求队列功能的并发受限服务器, 巩固了信道、WaitGroup的用法。



golang官方指南: https://go.dev/doc/effective_go


完整代码传送门: https://github.com/zwbdzb/go_sample1

本篇文字和图片均为原创,读者可结合图片探索源码, 欢迎反馈 ~。。~。欢迎添加微信号 niumabujuan 交流撕逼。 












