目录
- 1、背景
- 2、go版本
- 3、 selectgo函数解释
- 【1】函数参数解释
- 【2】函数具体解释
- 第一步:遍历pollorder,选出准备好的case
- 第二步:将当前goroutine放到所有case通道中对应的收发队列上
- 第三步:唤醒groutine
- 4、总结
1、背景
select多路复用在go的异步和并发控制场景中非常好用,对于无case和只有单个case的情况,编译器在编译的时候就会对其做优化,无case就相当于调用了一个阻塞函数,单个case就相当于对一个通道进行读写操作,如果单个case中有default分支时,就相当于是一个if else逻辑,对于多个case的情况,是在运行时调用selectgo函数决定的,接下来我们就来研究一下selectgo函数。
2、go版本
$ go version
go version go1.21.4 windows/386
3、 selectgo函数解释
【1】函数参数解释
selectgo函数位于:src/runtime/select.go中,定义如下:
//cas0:case数组地址,按照往通道写数据在前,从通道读数据在后的排列顺序(编译时编译器优化行为操作的)
//nsends:往通道写数据的case数量
//nrecvs:从通道读数据的case数量
//block:是否阻塞
//返回值分别代表选中规定case位置和是否成功从通道接收数据,如果选中的是default,第一个返回值就返回-1
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)
select中每一个case都对应一个scase结构,定义如下:
type scase struct {c *hchan //case对应的读或写通道elem unsafe.Pointer //指向要写入元素或存放读取元素的地址
}
【2】函数具体解释
selectgo函数中会遍历所有的case,为确保遍历case的随机性和安全性,有两个关键的顺序:pollorder和lockorder,不用关心其具体实现,明白其的作用就行。
pollorder:随机的case顺序,确保公平的处理每一个case。
lockorder:加锁的case顺序,确保并发安全。
计算出pollorder和lockorder顺序之后,会根据这2个顺序进行遍历分为了3步。
第一步:遍历pollorder,选出准备好的case
第一部分的代码如下:
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {...var casi int //准备好的case位置var cas *scase //case对象var caseSuccess boolvar caseReleaseTime int64 = -1var recvOK bool //如果是从通道读取数据,是否读取成功for _, casei := range pollorder { //遍历随机顺序的casecasi = int(casei) //case的位置cas = &scases[casi] //case对象c = cas.c //case通道if casi >= nsends { //前面讲过,写通道在前,读通道在后,所以这里是读通道casesg = c.sendq.dequeue() //取出往读通道写数据的协程队列中的第一个协程if sg != nil { //如果存在往通道写数据的协程goto recv //从往通道写数据的协程中读取数据并返回case位置和读取结果}if c.qcount > 0 { //如果缓冲区还有数据goto bufrecv //从缓冲区读取数据并返回case位置和读取结果}if c.closed != 0 { //如果通道已关闭goto rclose //释放相关资源}} else { //写通道的caseif raceenabled {racereadpc(c.raceaddr(), casePC(casi), chansendpc)}if c.closed != 0 { //如果通道已经关闭goto sclose //直接panic}sg = c.recvq.dequeue() //从正在往通道读数据的协程队列中取得第一个if sg != nil { //如果往通道读数据的协程存在goto send //发送数据到读通道的协程}if c.qcount < c.dataqsiz { //缓冲区还有位置goto bufsend}}}if !block { //如果不阻塞,也就是带default分支selunlock(scases, lockorder)casi = -1 //case位置为-1goto retc //直接返回,不用进入下一步}...
}
bufrecv标签:
bufrecv:recvOK = true //返回读数据成功qp = chanbuf(c, c.recvx) //缓冲区中要读取数据的地址if cas.elem != nil {typedmemmove(c.elemtype, cas.elem, qp) //将读取的缓冲区数据拷贝到case中的elem位置}typedmemclr(c.elemtype, qp) //清理缓冲区被读的数据c.recvx++ //读取缓冲区的位置+1if c.recvx == c.dataqsiz { //下一个要读取缓冲区的位置如果等于缓冲区大小就将下次要读取的缓冲区位置置为0c.recvx = 0}c.qcount-- //缓冲区中元素个数-1selunlock(scases, lockorder)goto retc
bufsend标签:
bufsend:typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem) //将case中要写入的元素写到缓冲区c.sendx++ //写入缓冲区的位置+1if c.sendx == c.dataqsiz { //如果下次要写入缓冲区的位置等于缓冲区的大小就将缓冲区写入位置置为开头c.sendx = 0}c.qcount++ //缓冲区元素数量+1selunlock(scases, lockorder)goto retc
recv标签:
recv:recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) //从写通道的协程读取数据if debugSelect {print("syncrecv: cas0=", cas0, " c=", c, "\n")}recvOK = true //返回成功读取goto retc
rclose标签:
rclose:selunlock(scases, lockorder)recvOK = false //从通道中读取数据失败if cas.elem != nil {typedmemclr(c.elemtype, cas.elem) //释放case中元素的空间}if raceenabled {raceacquire(c.raceaddr())}goto retc
send标签:
send:send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) //发送数据到往通道读数据的协程if debugSelect {print("syncsend: cas0=", cas0, " c=", c, "\n")}goto retc
retc标签:
retc:if caseReleaseTime > 0 {blockevent(caseReleaseTime-t0, 1)}return casi, recvOK //返回case位置和是否从通道成功读取数据
sclose标签:
sclose:selunlock(scases, lockorder)panic(plainError("send on closed channel"))
上面就是selectgo函数第一部分的逻辑,第一部分就是遍历一个随机的case顺序,如果有符合条件的case就返回case的位置并且返回读数据的结果,如果没有case符合条件但是有default分支就返回-1,如果没default分支就进入下一步。
第二步:将当前goroutine放到所有case通道中对应的收发队列上
第二部分的代码如下:
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {...gp = getg() //获取当前协程if gp.waiting != nil {throw("gp.waiting != nil")}nextp = &gp.waitingfor _, casei := range lockorder { //按照对case加锁的顺序遍历casecasi = int(casei) //case的位置cas = &scases[casi] //case对象c = cas.c //case对象中的通道sg := acquireSudog() //初始化一个协程等待结构sg.g = gp //协程等待结构绑定协程sg.isSelect = true //表示该协程等待结构与select操作相关sg.elem = cas.elem sg.releasetime = 0if t0 != 0 {sg.releasetime = -1}sg.c = c*nextp = sgnextp = &sg.waitlinkif casi < nsends { //如果case上是往通道写数据,就将绑定当前协程的等待对象插入当前case通道的发送队列中c.sendq.enqueue(sg) } else { //如果case上是往通道读数据,就将绑定当前协程的等待对象插入当前case通道的接收队列中c.recvq.enqueue(sg)}}...
}
第二部分就是将当前协程放到每个case中的通道对应的收发队列中去。
第三步:唤醒groutine
第三部分代码如下:
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {...sg = (*sudog)(gp.param) //被唤醒的协程等待结构gp.param = nilcasi = -1 //case位置cas = nil //case对象caseSuccess = falsesglist = gp.waiting //lockorder顺序的协程等待结构队列,这里是队列中的第一个协程等待结构for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink { //清空协程等待结构队列中元素便于进行垃圾回收sg1.isSelect = falsesg1.elem = nilsg1.c = nil}gp.waiting = nilfor _, casei := range lockorder { //根据对case的加锁顺序进行遍历k = &scases[casei] //当前caseif sg == sglist { //唤醒的协程等待结构是当前case的casi = int(casei) //唤醒的case位置cas = k //唤醒的case对象caseSuccess = sglist.success //往通道读取或写数据结果if sglist.releasetime > 0 {caseReleaseTime = sglist.releasetime}} else { //唤醒的协程等待结构不是当前case的c = k.cif int(casei) < nsends { //case为发送通道,就是释放当前case通道里sendq队列的协程等待结构对象c.sendq.dequeueSudoG(sglist)} else { //case为读取通道,就是释放当前case通道里recvq队列的协程等待结构对象c.recvq.dequeueSudoG(sglist)}}sgnext = sglist.waitlink //下一个协程等待结构sglist.waitlink = nilreleaseSudog(sglist) //释放上一个协程等待结构sglist = sgnext}...
}
第三部分就是某一个case上的协程等待结构被唤醒时,会先执行通道上对应的收发操作, 然后去将所有case上的协程等待结构释放掉。
4、总结
select虽然使用起来简单,但其实现逻辑还是比较复杂的,通过熟悉其实现,我们能理解对多个通道进行操作时候,可以为每一个通道创建一个协程去操作,这无疑增加了GC开销,但是使用select采用了多路复用的思想,将一个协程绑定在多个协程等待对象上,而且对case使用了随机顺序,确保每一个case都能公平的被执行。