channel 源码解析(5问)

news/2024/11/8 0:32:47/

目录

1.channel底层数据结构是什么

2.channel创建的底层实现

3.channel 的发送过程

4.channel的接受过程

5.关闭 channel


1.channel底层数据结构是什么

channel底层的数据结构是hchan,包括一个循环链表和2个双向链表

type hchan struct {qcount   uint           // total data in the queuedataqsiz uint           // size of the circular queuebuf      unsafe.Pointer // points to an array of dataqsiz elementselemsize uint16closed   uint32elemtype *_type // element typesendx    uint   // send indexrecvx    uint   // receive indexrecvq    waitq  // list of recv waiterssendq    waitq  // list of send waiters// lock protects all fields in hchan, as well as several// fields in sudogs blocked on this channel.//// Do not change another G's status while holding this lock// (in particular, do not ready a G), as this can deadlock// with stack shrinking.lock mutex
}
buf:循环链表

sendx:循环链表中已发送的位置

recvx: 循环链表中已接受的位置

qcount:环形队列的实际大小

dataqsiz:环形队列的容量

elemsize:环列队列中元素的大小

elemtype:环形队列中元素的类型

closed:环形队列是否关闭

recvq:等待接收groutinue链表

sendq:等待发送send groutinue链表

lock:悲观锁

结构图如下所示

2.channel创建的底层实现

创建channel底层调用的是makechan,为新创建的channel分配内存空间,分为下面的三种情况:

  1. 不带缓冲区:只需要给hchan分配内存空间。
  2. 带缓冲区且不包括指针类型:同时给hchan和环形队列缓存buf分配一段连续的内存空间
  3. 带缓冲区且包括指针类型:分别给hchan和环形队列缓存buf分配不同的内存空间。

源码如下:

func makechan64(t *chantype, size int64) *hchan {if int64(int(size)) != size {panic(plainError("makechan: size out of range"))}return makechan(t, int(size))
}func makechan(t *chantype, size int) *hchan {elem := t.elem// compiler checks this but be safe.if elem.size >= 1<<16 {throw("makechan: invalid channel element type")}if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")}mem, overflow := math.MulUintptr(elem.size, uintptr(size))if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.// buf points into the same allocation, elemtype is persistent.// SudoG's are referenced from their owning thread so they can't be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.var c *hchanswitch {case mem == 0:// Queue or element size is zero.c = (*hchan)(mallocgc(hchanSize, nil, true))// Race detector uses this location for synchronization.c.buf = c.raceaddr()case elem.ptrdata == 0:// Elements do not contain pointers.// Allocate hchan and buf in one call.c = (*hchan)(mallocgc(hchanSize+mem, nil, true))c.buf = add(unsafe.Pointer(c), hchanSize)default:// Elements contain pointers.c = new(hchan)c.buf = mallocgc(mem, elem, true)}c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)lockInit(&c.lock, lockRankHchan)if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")}return c
}

3.channel 的发送过程

channel send 发送底层调用的是chansend1函数

channel发送的过程

1.检查 recvq 双向链表 是否为空,如果不为空,说明recvq缓存队列不为空,buffer为空,有大量的recvq在等待,则从 recvq 头部取一个 goroutine,将数据发送过去,并唤醒对应的 goroutine 即可。

2.如果 recvq 为空,代表buffer可能有存储空间,则将数据放入到 buffer 中。

3.如果 buffer 已满,则将要发送的数据和当前 goroutine 打包成 sudog 对象放入到 sendq 中。并将当前 goroutine 置为 waiting 状态。

源码如下

func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())
}/** generic single channel send/recv* If block is not nil,* then the protocol will not* sleep but return if it could* not complete.** sleep can wake up with g.param == nil* when a channel involved in the sleep has* been closed.  it is easiest to loop and re-run* the operation; we'll see that it's now closed.*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {if c == nil {if !block {return false}gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)throw("unreachable")}if debugChan {print("chansend: chan=", c, "\n")}if raceenabled {racereadpc(c.raceaddr(), callerpc, funcPC(chansend))}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not closed, we observe that the channel is// not ready for sending. Each of these observations is a single word-sized read// (first c.closed and second full()).// Because a closed channel cannot transition from 'ready for sending' to// 'not ready for sending', even if the channel is closed between the two observations,// they imply a moment between the two when the channel was both not yet closed// and not ready for sending. We behave as if we observed the channel at that moment,// and report that the send cannot proceed.//// It is okay if the reads are reordered here: if we observe that the channel is not// ready for sending and then observe that it is not closed, that implies that the// channel wasn't closed during the first observation. However, nothing here// guarantees forward progress. We rely on the side effects of lock release in// chanrecv() and closechan() to update this thread's view of c.closed and full().if !block && c.closed == 0 && full(c) {return false}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock)if c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}//recvq不为空if sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}//if c.qcount < c.dataqsiz {// Space is available in the channel buffer. Enqueue the element to send.qp := chanbuf(c, c.sendx)if raceenabled {racenotify(c, c.sendx, nil)}typedmemmove(c.elemtype, qp, ep)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++unlock(&c.lock)return true}if !block {unlock(&c.lock)return false}// Block on the channel. Some receiver will complete our operation for us.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nilc.sendq.enqueue(mysg)// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.atomic.Store8(&gp.parkingOnChan, 1)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)// Ensure the value being sent is kept alive until the// receiver copies it out. The sudog has a pointer to the// stack object, but sudogs aren't considered as roots of the// stack tracer.KeepAlive(ep)// someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseclosed := !mysg.successgp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nilreleaseSudog(mysg)if closed {if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}return true
}

4.channel的接受过程

channel底层调用的是chanrecv、chanrecv1、chanrecv2,chanrecv1和chanrecv2底层都调用的是chanrecv。chanrecv1和chanrecv2区别有没有接受数据成功的bool类型

 chanrecv1(c *hchan, elem unsafe.Pointer)chanrecv2(c *hchan, elem unsafe.Pointer) (received bool)

receiver过程分为4种情况

1.从sendq队列头部取一个元素,如果元素不为空,环形队列缓存区已满,说明buffer已满,大量的send goroutine在发送数据,阻塞了,rece从循环队列读取一个元素,,把goroutinue元素放在循环队列中,从sendq队列中唤醒goroutinue。

2.从sendq队列头部取一个元素,如果元素不为空,环形队列为空,并把goroutine中元素copy到receiver中,从sendq队列中唤醒一个goroutine,

3.sendq队列为空,buffer没有满,从buffer中获取一个元素,recex+1

4.sendq对列为空,buffer为空,rece的goroutinue包装成sudog,放在receq队列中。

channel recv的底层源码如下

// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {_, received = chanrecv(c, elem, true)return
}// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// raceenabled: don't need to check ep, as it is always on the stack// or is new memory allocated by reflect.if debugChan {print("chanrecv: chan=", c, "\n")}if c == nil {if !block {return}gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)throw("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.if !block && empty(c) {// After observing that the channel is not ready for receiving, we observe whether the// channel is closed.//// Reordering of these checks could lead to incorrect behavior when racing with a close.// For example, if the channel was open and not empty, was closed, and then drained,// reordered reads could incorrectly indicate "open and empty". To prevent reordering,// we use atomic loads for both checks, and rely on emptying and closing to happen in// separate critical sections under the same lock.  This assumption fails when closing// an unbuffered channel with a blocked send, but that is an error condition anyway.if atomic.Load(&c.closed) == 0 {// Because a channel cannot be reopened, the later observation of the channel// being not closed implies that it was also not closed at the moment of the// first observation. We behave as if we observed the channel at that moment// and report that the receive cannot proceed.return}// The channel is irreversibly closed. Re-check whether the channel has any pending data// to receive, which could have arrived between the empty and closed checks above.// Sequential consistency is also required here, when racing with such a send.if empty(c) {// The channel is irreversibly closed and empty.if raceenabled {raceacquire(c.raceaddr())}if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock)if c.closed != 0 && c.qcount == 0 {if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock)if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}if c.qcount > 0 {// Receive directly from queueqp := chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)}if ep != nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--unlock(&c.lock)return true, true}if !block {unlock(&c.lock)return false, false}// no sender available: block on this channel.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nilc.recvq.enqueue(mysg)// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.atomic.Store8(&gp.parkingOnChan, 1)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)// someone woke us upif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}success := mysg.successgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, success
}

5.关闭 channel

关闭channel底层源码调用的是closechan

除此之外,关闭channel,出现panic的场景如下:

  1. 关闭值为nil的channel
  2. 关闭已经被关闭的channel
  3. 向已经关闭的channel写数据

关闭channel的源码如下

func closechan(c *hchan) {if c == nil {panic(plainError("close of nil channel"))}lock(&c.lock)if c.closed != 0 {unlock(&c.lock)panic(plainError("close of closed channel"))}if raceenabled {callerpc := getcallerpc()racewritepc(c.raceaddr(), callerpc, funcPC(closechan))racerelease(c.raceaddr())}c.closed = 1var glist gList// release all readersfor {sg := c.recvq.dequeue()if sg == nil {break}if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// release all writers (they will panic)for {sg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}unlock(&c.lock)// Ready all Gs now that we've dropped the channel lock.for !glist.empty() {gp := glist.pop()gp.schedlink = 0goready(gp, 3)}
}

http://www.ppmy.cn/news/45971.html

相关文章

Prometheus - Grafana 监控 MySQLD Linux服务器 demo版

目录 首先是下载Prometheus 下载和安装 配置Prometheus 查看监控数据 监控mysql demo 部署 mysqld_exporter 组件 配置 Prometheus 获取监控数据 -------------------------------------- 安装和使用Grafana 启动Grafana -------------------------------------- 配…

多通道振弦传感器无线采集仪与参数配置工具连接

多通道振弦传感器无线采集仪与参数配置工具连接 VS101~VS432 设备配备了专门的参数配置工具 SETP 来完成设备工作参数的查看和修改工作。 连接前的准备工作 &#xff08; 1&#xff09;数据接口与计算机连接 使用标配的通讯线与计算机 RS232 接口连接。 若需基于手机网络发送数…

BGA封装与PCB差分互连结构的设计与优化

摘要&#xff1a;随着电子系统通信速率的不断提升&#xff0c;BGA封装与PCB互连区域的信号完整性问题越来越突出。 针对高速BGA封装与PCB差分互连结构进行设计与优化&#xff0c;着重分析封装与PCB互连区域差分布线方式&#xff0c;信号布局方式&#xff0c;信号孔/地孔比&…

Linux移动文件和文件夹(目录)命令

命令mv 英文move 翻译移动 mv命令可以移动文件或文件夹&#xff08;目录&#xff09;&#xff0c;也可以重命令&#xff08;覆盖&#xff09;文件。 1. 移动文件/重命名 单纯地移动某一个文件直接使用&#xff1a; mv <源文件名称/地址> <新文件名称/地址>这个方法…

【刷题之路Ⅱ】LeetCode 3381.搜索旋转排序数组ⅠⅡ

【刷题之路Ⅱ】LeetCode 33&81.搜索旋转排序数组Ⅰ&Ⅱ 一、题目描述二、解题1、方法1——暴力法1.1、思路分析1.2、代码实现 2、方法2——二分法2.1、思路分析2.2、代码实现2.3、升级到81题2.3.1、改进思路分析2.3.1、改进代码实现 3、改进二分法3.1、思路分析3.2、代码…

Python词云

词云图wordcloud 1.安装第三方库 j i e b a 库、 m a t p l o t l i b 、 w o r d c l o u d 库 jieba库、matplotlib、wordcloud库 jieba库、matplotlib、wordcloud库 2.过程 1.使用 j i e b a jieba jieba 库对数据进行分词整理&#xff0c;转为 t x t txt txt文件&#…

如何使用数字示波器

本文介绍以鼎阳SIGLENT SDS1122E数字示波器为例。 带了一根电源线&#xff1b;两根信号线&#xff0c;每根信号线都有几个小配件&#xff0c;如下所示&#xff1a; 使用概述 我们都知道万用表&#xff08;又称欧姆表&#xff09;是工程师最常用的调试电路的工具&#xff0c;但万…

CC2642的GGS使用笔记

一、前言 我们了解BLE的GATT之前需要了解一些基本的概念&#xff1a; &#xff08;1&#xff09;Profile,字面意思简介、概述、形象印象、轮廓、配置文件&#xff0c;在BLE中&#xff0c;我们可能把它理解成配置文件较好&#xff0c;Profile有一些是BLE SIG规定的&#xff0c;有…