Go 并发

news/2024/11/17 20:30:22/

来自 《Go 语言从入门到实战》 的并发章节学习笔记,欢迎阅读斧正,感觉该专栏整体来说对有些后端编程经验的来说比无后端编程经验的人更友好。。

Thread VS Groutine

创建时默认 Stack 大小:前者默认 1M,Groutint 的 Stack 初始化大小为 2K

和 KSE(Kernel Space Entity)关系:Java Thread 是 1:1,Groutine 是 M:N

扩展阅读:go语言之行–golang核武器goroutine调度原理、channel详解

import ("fmt""testing""time"
)func TestGroutine(t *testing.T) {for i := 0; i < 10; i++ {// 使用go创建协程,但是需要注意的是:协程函数的 param 作为参数是外部 i 的数据拷贝go func(param int) {fmt.Println(param)}(i)}time.Sleep(time.Microsecond * 50)
}

共享内存并发机制

Lock

import ("sync""testing""time"
)// 非线程安全
func TestCounter(t *testing.T) {counter := 0// 循环出 5000 个协程程序for i := 0; i < 5000; i++ {go func() {// 执行这个协程匿名函数给 counter 自增counter++}()}time.Sleep(1 * time.Second)// 输出的不是 5000,这是因为这 5000 个并发协程都在抢用 countert.Logf("counter = %d", counter)
}// 线程安全
func TestCounterThreadSafe(t *testing.T) {// 增加一个互斥锁var mut sync.Mutexcounter := 0for i := 0; i < 5000; i++ {go func() {defer func() {// 使用 defer 用于释放资源,此处用于解除锁mut.Unlock()}()//  counter 自增前加锁mut.Lock()counter++}()}// 加这个延时是担心程序一下就跑完了,甚至协程还没有跑完,time.Sleep(1 * time.Second)// 输出 5000t.Logf("counter = %d", counter)
}

WaitGroup

在 Java 中等待其他线程完成用的是 Thread.join() 方法(主线程等待子线程的终止),WaitGroup 有着类似的功能

// 线程安全
func TestCounterWaitGroup(t *testing.T) {// 增加一个互斥锁var mut sync.Mutex//var wg sync.WaitGroupcounter := 0for i := 0; i < 5000; i++ {go func() {defer func() {// 使用 defer 用于释放资源,此处用于解除锁mut.Unlock()}()//  counter 自增前加锁mut.Lock()counter++wg.Done()}()}// 加这个是担心程序一下就跑完了,甚至协程还没有跑完wg.Wait()// 输出 5000t.Logf("counter = %d", counter)
}

CSP 并发机制

Go 语言特有的,通过 channel 交互,channel 有阻塞型的,有 buffer 型的(缓冲)

import ("fmt""testing""time"
)func service() string {fmt.Println("---service---立即执行 50 毫秒延时")time.Sleep(time.Millisecond * 50)return "---service---Done"
}func otherTask() {fmt.Println("---otherTask---working on something else")fmt.Println("---otherTask---立即执行 100 毫秒延时")time.Sleep(time.Millisecond * 100)fmt.Println("---otherTask---Task is Done")
}func AsyncService() chan string { // 该函数返回一个通道, 消息类型是 string// retCh := make(chan string)    // 声明一个无缓冲 chan通道, 这个通道的类型是 string, 通道用于协程间的通信//                               // ⚠️ 使用无缓冲通道时, 如果协程1 没有立即接受协程 2 通过管道发送的消息,就会阻塞,反之亦然retCh := make(chan string, 1) // 声明一个缓冲容量为 1 的chan通道, 这个通道的类型是 string, 通道用于协程间的通信go func() {                   // 此处执行 匿名协程函数ret := service()fmt.Println("---AsyncService---returned result:", ret)retCh <- ret //通过通道将 service()函数返回的值传递给  通道 retChfmt.Println("---AsyncService---service exited")}()return retCh // 返回这个通道
}func TestAsyncService(t *testing.T) {retCh := AsyncService()otherTask()fmt.Println(<-retCh)
}func TestService(t *testing.T) {fmt.Println(service())otherTask()
}

多路选择和超时控制

多路选择代码:
在这里插入图片描述
超时控制代码:
在这里插入图片描述

import ("fmt""testing""time"
)func service() string {time.Sleep(time.Millisecond * 500)return "Done"
}func AsyncService() chan string {retCh := make(chan string, 1)//retCh := make(chan string, 1)go func() {ret := service()fmt.Println("returned result.")retCh <- retfmt.Println("service exited.")}()return retCh
}func TestSelect(t *testing.T) {/**select 语句中,case不依赖代码书写顺序。如果case中有1个有消息时,其他case/default则不会执行。如果case中有多个消息时,随机任选1个进行执行,其他不会执行如果所有case都没有消息时,同时含有defalut分之,则会走default分之如果所有case都没有消息时,没有default分之,则会阻塞等待case中返回消息继续执行。*/select {case ret := <-AsyncService():t.Log(ret)case <-time.After(time.Millisecond * 100):t.Error("time out")}
}``# channel
![在这里插入图片描述](https://img-blog.csdnimg.cn/59bfa1b8ede14c66afbd8482f3701e38.png)
```go
import ("fmt""sync""testing"
)func dataProducer(ch chan int, wg *sync.WaitGroup) {go func() {for i := 0; i < 10; i++ {ch <- i}// 数据发送完成,关闭 channelclose(ch)// 往关闭的 channel 发内容会有 panic//ch <- iwg.Done()}()}func dataReceiver(ch chan int, wg *sync.WaitGroup) {go func() {for {if data, ok := <-ch; ok {fmt.Println(data)} else {break}}wg.Done()}()}func TestCloseChannel(t *testing.T) {var wg sync.WaitGroupch := make(chan int)wg.Add(1)dataProducer(ch, &wg)wg.Add(1)dataReceiver(ch, &wg)// wg.Add(1)// dataReceiver(ch, &wg)wg.Wait()}

任务取消

通过关闭 channel 取消

package cancel_by_closeimport ("fmt""testing""time"
)// 任务是否已被取消
// 实现原理:
// 检查是否从 channel 收到一个消息,如果收到一个消息,我们就返回 true,代表任务已经被取消了
// 当没有收到消息,channel 会被阻塞,多路选择机制就会走到 default 分支上去。
func isCancelled(cancelChan chan struct{}) bool {select {case <-cancelChan:return truedefault:return false}
}func cancel_1(cancelChan chan struct{}) {cancelChan <- struct{}{}
}func cancel_2(cancelChan chan struct{}) {close(cancelChan)
}// 利用 CSP, 多路选择机制和 channel 的关闭与广播实现任务取消功能
func TestCancel(t *testing.T) {cancelChan := make(chan struct{}, 0)for i := 0; i < 5; i++ {go func(i int, cancelCh chan struct{}) {for {if isCancelled(cancelCh) {break}time.Sleep(time.Millisecond * 5)}fmt.Println(i, "Cancelled")}(i, cancelChan)}cancel_1(cancelChan)// cancel_2(cancelChan)time.Sleep(time.Second * 1)
}

通过 Context 取消

使用 context 为了解决层级取消的问题,就是取消一个协程的子协程(树)甚至孙子协程(树)的问题
在这里插入图片描述

import ("context""fmt""testing""time"
)func isCancelled(ctx context.Context) bool {select {case <-ctx.Done():return truedefault:return false}
}func TestCancel(t *testing.T) {ctx, cancel := context.WithCancel(context.Background())for i := 0; i < 5; i++ {go func(i int, ctx context.Context) {for {if isCancelled(ctx) {break}time.Sleep(time.Millisecond * 5)}fmt.Println(i, "Cancelled")}(i, ctx)}cancel()time.Sleep(time.Second * 1)
}

常见并发任务

只执行一次

使用 sync.Once 完成,类似于 Java 中的单例模式,避免资源的反复初始化。

如果type Singleton struct {} 定义的是空结构体,那么无论用不用once.Do(),最后得到的obj地址都是一样的。当然不用once.Do()时,除了输出10个相同的地址,还会输出10次“Create obj“。
将Singleton结构体里添加内容后,比如type Singleton struct {a bool},再做实验,每次运行就会得到不同的地址了。

空结构体情况:

import ("fmt""sync""testing""unsafe"
)type Singleton struct {
}var singleInstance *Singleton// 主要用于类似于单例的场景中,避免资源的反复初始化
var once sync.Oncefunc GetSingletonObj() *Singleton {once.Do(func() {fmt.Println("Create Obj")singleInstance = new(Singleton)})return singleInstance
}func TestGetSingletonObj(t *testing.T) {var wg sync.WaitGroupfor i := 0; i < 10; i++ {wg.Add(1)go func() {obj := GetSingletonObj()fmt.Printf("%x\n", unsafe.Pointer(obj))wg.Done()}()}
}

结构体非空情况:

import ("fmt""sync""testing""unsafe"
)type Singleton struct {// flag bool
}var singleInstance *Singleton// 主要用于类似于单例的场景中,避免资源的反复初始化
var once sync.Oncefunc GetSingletonObj() *Singleton {once.Do(func() {fmt.Println("Create Obj")singleInstance = new(Singleton)singleInstance.flag = true})return singleInstance
}// 此时 Singleton 的 struct 中不是空结构,加了 once.Do 能确保单例
func TestGetSingletonObj(t *testing.T) {var wg sync.WaitGroupfor i := 0; i < 10; i++ {wg.Add(1)go func() {obj := GetSingletonObj()fmt.Printf("%x\n", unsafe.Pointer(obj))wg.Done()}()}
}

仅需任意任务完成

import ("fmt""runtime""testing""time"
)func runTask(id int) string {time.Sleep(10 * time.Millisecond)return fmt.Sprintf("The result is from %d", id)
}func FirstResponse() string {numOfRunner := 10// 如果不使用 buffer channel,会出现协程泄露的情况:比如说第 3 个协程就返回了,但是还是会创建 11 个ch := make(chan string)// 使用的话,就不会新创建协程执行直接返回//ch := make(chan string, numOfRunner)for i := 0; i < numOfRunner; i++ {go func(i int) {ret := runTask(i)ch <- ret}(i)}return <-ch
}func TestFirstResponse(t *testing.T) {// 系统当前协程数t.Log("Before:", runtime.NumGoroutine())t.Log(FirstResponse())time.Sleep(time.Second * 1)t.Log("After:", runtime.NumGoroutine())}

所有任务完成

对象池

使用 buffered channel 实现:
在这里插入图片描述
obj_pool.go:

import ("errors""time"
)type ReusableObj struct {
}type ObjPool struct {bufChan chan *ReusableObj //用于缓冲可重用对象
}func NewObjPool(numOfObj int) *ObjPool {objPool := ObjPool{}objPool.bufChan = make(chan *ReusableObj, numOfObj)for i := 0; i < numOfObj; i++ {objPool.bufChan <- &ReusableObj{}}return &objPool
}func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {select {case ret := <-p.bufChan:return ret, nilcase <-time.After(timeout): //超时控制return nil, errors.New("time out")}}func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {select {case p.bufChan <- obj:return nildefault:return errors.New("overflow")}
}

obj_pool_test.go:

import ("fmt""testing""time"
)func TestObjPool(t *testing.T) {pool := NewObjPool(10)// if err := pool.ReleaseObj(&ReusableObj{}); err != nil { //尝试放置超出池大小的对象// 	t.Error(err)// }for i := 0; i < 11; i++ {if v, err := pool.GetObj(time.Second * 1); err != nil {t.Error(err)} else {fmt.Printf("%T\n", v)if err := pool.ReleaseObj(v); err != nil {t.Error(err)}}}fmt.Println("Done")
}

sync.Pool 对象缓存

获取

在这里插入图片描述

返回

在这里插入图片描述

使用

import ("fmt""runtime""sync""testing"
)// 不 GC输出:
// Create a new object.
// 100
// 3
// GC 输出:
// Create a new object.
// 100
// Create a new object.
// 100
func TestSyncPool(t *testing.T) {pool := &sync.Pool{New: func() interface{} {fmt.Println("Create a new object.")return 100},}v := pool.Get().(int)fmt.Println(v)pool.Put(3)runtime.GC() //GC 会清除sync.pool中缓存的对象v1, _ := pool.Get().(int)fmt.Println(v1)
}func TestSyncPoolInMultiGroutine(t *testing.T) {pool := &sync.Pool{New: func() interface{} {fmt.Println("Create a new object.")return 10},}pool.Put(100)pool.Put(100)pool.Put(100)var wg sync.WaitGroupfor i := 0; i < 10; i++ {wg.Add(1)go func(id int) {fmt.Println(pool.Get())wg.Done()}(i)}wg.Wait()
}

对象生命周期

在这里插入图片描述

总结

在这里插入图片描述

关联阅读

Go 反射

Go 基础


http://www.ppmy.cn/news/8397.html

相关文章

uniCloud云开发----4、uniCloud云开发进阶使用方法

uniCloud云开发进阶使用方法前言1、云对象的importObject的创建和使用(1&#xff09;创建云对象&#xff08;2&#xff09;编辑云对象&#xff08;3&#xff09;在.vue文件中调用云对象&#xff08;4&#xff09;在.vue文件中调用方法2、客户端直接连接数据库(1)直接在客户端引…

Threejs实现鼠标点击人物行走/镜头跟随人物移动/鼠标点击动画/游戏第三人称/行走动作

1&#xff0c;功能介绍 Threejs获取鼠标点击位置、实现鼠标点击人物行走、人物头顶显示名称标签、镜头跟随人物移动并且镜头围绕人物旋转&#xff0c;类似游戏中第三人称、鼠标点击位置有动画效果&#xff0c;如下效果图 2&#xff0c;功能实现 获取鼠标点击位置&#xff0c;…

《Nuitka打包实战指南》实战打包OpenCV-Python

实战打包OpenCV-Python 打包时解决掉的问题: ModuleNotFoundError: No Module named cv2ImportError: numpy.core.multiarray failed to import打包示例源码: 请看文章末尾 版本信息: opencv-python==4.5.1.48 numpy==1.23.2 Nuitka==0.6.19.1 打包系统: Windows10 64…

【Python】sklearn机器学习之Birch聚类算法

文章目录基本原理sklearn调用基本原理 BIRCH&#xff0c;即Balanced Iterative Reducing and Clustering Using Hierarchies&#xff0c;利用分层的平衡迭代规约和聚类&#xff0c;特点是扫描一次数据就可以实现聚类&#xff0c; 而根据经验&#xff0c;一般这种一遍成功的算…

AlphaGo简易版MuGo源码解析

文章目录前言源码实现MuGo的输入数据模型的搭建模型的训练参考链接结语前言 自从AlphaGo横空出世&#xff0c;战胜李世石后&#xff0c;AI围棋如雨后春笋一般遍地开花。阅读DeepMind的论文有时还是隔靴搔痒&#xff0c;只有钻到代码里&#xff0c;才能一探究竟。于是&#xff…

Spring 核心概念 IOC/DI IOC容器 Bean

目录 一&#xff1a;代码书写现状 二&#xff1a;核心思想 一&#xff1a;代码书写现状 常规操作如上,但存在着问题,将项目上线后&#xff0c;需要将数据层进行更换,更换如下&#xff1a; 数据层更换后&#xff0c;业务层也需要进行更换&#xff0c;更换如下&#xff1a; 数据…

spring boot 实现搜索引擎的设计思想

实现思路 索引构建模块 搜索模块 数据库模块 索引模块 对于搜索一个东西&#xff0c;我们很自然的能想到遍历去查找。比如我要查找一本书叫 《红楼梦》&#xff0c;那么我直接在所有结果中进行遍历查找&#xff0c;当我们找到书名为《红楼梦》的结果时&#xff0c;就代表我们…

俯瞰·明统一战争·北伐中原

目录 背景 北方战略位置、前三次北伐快速俯瞰 实施策略 第一阶段 第二阶段 第三阶段 影响 雪压枝头低&#xff0c;虽低不着泥。一朝红日出&#xff0c;依旧与天齐。 背景 元至正二十七年&#xff08;1367年&#xff09;四月&#xff0c;吴王朱元璋命中书右丞相徐达为征…