来自 《Go 语言从入门到实战》 的并发章节学习笔记,欢迎阅读斧正,感觉该专栏整体来说对有些后端编程经验的来说比无后端编程经验的人更友好。。
Thread VS Groutine
创建时默认 Stack 大小:前者默认 1M,Groutint 的 Stack 初始化大小为 2K
和 KSE(Kernel Space Entity)关系:Java Thread 是 1:1,Groutine 是 M:N
扩展阅读:go语言之行–golang核武器goroutine调度原理、channel详解
import ("fmt""testing""time"
)func TestGroutine(t *testing.T) {for i := 0; i < 10; i++ {// 使用go创建协程,但是需要注意的是:协程函数的 param 作为参数是外部 i 的数据拷贝go func(param int) {fmt.Println(param)}(i)}time.Sleep(time.Microsecond * 50)
}
共享内存并发机制
Lock
import ("sync""testing""time"
)// 非线程安全
func TestCounter(t *testing.T) {counter := 0// 循环出 5000 个协程程序for i := 0; i < 5000; i++ {go func() {// 执行这个协程匿名函数给 counter 自增counter++}()}time.Sleep(1 * time.Second)// 输出的不是 5000,这是因为这 5000 个并发协程都在抢用 countert.Logf("counter = %d", counter)
}// 线程安全
func TestCounterThreadSafe(t *testing.T) {// 增加一个互斥锁var mut sync.Mutexcounter := 0for i := 0; i < 5000; i++ {go func() {defer func() {// 使用 defer 用于释放资源,此处用于解除锁mut.Unlock()}()// counter 自增前加锁mut.Lock()counter++}()}// 加这个延时是担心程序一下就跑完了,甚至协程还没有跑完,time.Sleep(1 * time.Second)// 输出 5000t.Logf("counter = %d", counter)
}
WaitGroup
在 Java 中等待其他线程完成用的是 Thread.join()
方法(主线程等待子线程的终止),WaitGroup 有着类似的功能
// 线程安全
func TestCounterWaitGroup(t *testing.T) {// 增加一个互斥锁var mut sync.Mutex//var wg sync.WaitGroupcounter := 0for i := 0; i < 5000; i++ {go func() {defer func() {// 使用 defer 用于释放资源,此处用于解除锁mut.Unlock()}()// counter 自增前加锁mut.Lock()counter++wg.Done()}()}// 加这个是担心程序一下就跑完了,甚至协程还没有跑完wg.Wait()// 输出 5000t.Logf("counter = %d", counter)
}
CSP 并发机制
Go 语言特有的,通过 channel 交互,channel 有阻塞型的,有 buffer 型的(缓冲)
import ("fmt""testing""time"
)func service() string {fmt.Println("---service---立即执行 50 毫秒延时")time.Sleep(time.Millisecond * 50)return "---service---Done"
}func otherTask() {fmt.Println("---otherTask---working on something else")fmt.Println("---otherTask---立即执行 100 毫秒延时")time.Sleep(time.Millisecond * 100)fmt.Println("---otherTask---Task is Done")
}func AsyncService() chan string { // 该函数返回一个通道, 消息类型是 string// retCh := make(chan string) // 声明一个无缓冲 chan通道, 这个通道的类型是 string, 通道用于协程间的通信// // ⚠️ 使用无缓冲通道时, 如果协程1 没有立即接受协程 2 通过管道发送的消息,就会阻塞,反之亦然retCh := make(chan string, 1) // 声明一个缓冲容量为 1 的chan通道, 这个通道的类型是 string, 通道用于协程间的通信go func() { // 此处执行 匿名协程函数ret := service()fmt.Println("---AsyncService---returned result:", ret)retCh <- ret //通过通道将 service()函数返回的值传递给 通道 retChfmt.Println("---AsyncService---service exited")}()return retCh // 返回这个通道
}func TestAsyncService(t *testing.T) {retCh := AsyncService()otherTask()fmt.Println(<-retCh)
}func TestService(t *testing.T) {fmt.Println(service())otherTask()
}
多路选择和超时控制
多路选择代码:
超时控制代码:
import ("fmt""testing""time"
)func service() string {time.Sleep(time.Millisecond * 500)return "Done"
}func AsyncService() chan string {retCh := make(chan string, 1)//retCh := make(chan string, 1)go func() {ret := service()fmt.Println("returned result.")retCh <- retfmt.Println("service exited.")}()return retCh
}func TestSelect(t *testing.T) {/**select 语句中,case不依赖代码书写顺序。如果case中有1个有消息时,其他case/default则不会执行。如果case中有多个消息时,随机任选1个进行执行,其他不会执行如果所有case都没有消息时,同时含有defalut分之,则会走default分之如果所有case都没有消息时,没有default分之,则会阻塞等待case中返回消息继续执行。*/select {case ret := <-AsyncService():t.Log(ret)case <-time.After(time.Millisecond * 100):t.Error("time out")}
}``# channel
![在这里插入图片描述](https://img-blog.csdnimg.cn/59bfa1b8ede14c66afbd8482f3701e38.png)
```go
import ("fmt""sync""testing"
)func dataProducer(ch chan int, wg *sync.WaitGroup) {go func() {for i := 0; i < 10; i++ {ch <- i}// 数据发送完成,关闭 channelclose(ch)// 往关闭的 channel 发内容会有 panic//ch <- iwg.Done()}()}func dataReceiver(ch chan int, wg *sync.WaitGroup) {go func() {for {if data, ok := <-ch; ok {fmt.Println(data)} else {break}}wg.Done()}()}func TestCloseChannel(t *testing.T) {var wg sync.WaitGroupch := make(chan int)wg.Add(1)dataProducer(ch, &wg)wg.Add(1)dataReceiver(ch, &wg)// wg.Add(1)// dataReceiver(ch, &wg)wg.Wait()}
任务取消
通过关闭 channel 取消
package cancel_by_closeimport ("fmt""testing""time"
)// 任务是否已被取消
// 实现原理:
// 检查是否从 channel 收到一个消息,如果收到一个消息,我们就返回 true,代表任务已经被取消了
// 当没有收到消息,channel 会被阻塞,多路选择机制就会走到 default 分支上去。
func isCancelled(cancelChan chan struct{}) bool {select {case <-cancelChan:return truedefault:return false}
}func cancel_1(cancelChan chan struct{}) {cancelChan <- struct{}{}
}func cancel_2(cancelChan chan struct{}) {close(cancelChan)
}// 利用 CSP, 多路选择机制和 channel 的关闭与广播实现任务取消功能
func TestCancel(t *testing.T) {cancelChan := make(chan struct{}, 0)for i := 0; i < 5; i++ {go func(i int, cancelCh chan struct{}) {for {if isCancelled(cancelCh) {break}time.Sleep(time.Millisecond * 5)}fmt.Println(i, "Cancelled")}(i, cancelChan)}cancel_1(cancelChan)// cancel_2(cancelChan)time.Sleep(time.Second * 1)
}
通过 Context 取消
使用 context 为了解决层级取消的问题,就是取消一个协程的子协程(树)甚至孙子协程(树)的问题
import ("context""fmt""testing""time"
)func isCancelled(ctx context.Context) bool {select {case <-ctx.Done():return truedefault:return false}
}func TestCancel(t *testing.T) {ctx, cancel := context.WithCancel(context.Background())for i := 0; i < 5; i++ {go func(i int, ctx context.Context) {for {if isCancelled(ctx) {break}time.Sleep(time.Millisecond * 5)}fmt.Println(i, "Cancelled")}(i, ctx)}cancel()time.Sleep(time.Second * 1)
}
常见并发任务
只执行一次
使用 sync.Once
完成,类似于 Java 中的单例模式,避免资源的反复初始化。
如果type Singleton struct {} 定义的是空结构体,那么无论用不用once.Do(),最后得到的obj地址都是一样的。当然不用once.Do()时,除了输出10个相同的地址,还会输出10次“Create obj“。
将Singleton结构体里添加内容后,比如type Singleton struct {a bool},再做实验,每次运行就会得到不同的地址了。
空结构体情况:
import ("fmt""sync""testing""unsafe"
)type Singleton struct {
}var singleInstance *Singleton// 主要用于类似于单例的场景中,避免资源的反复初始化
var once sync.Oncefunc GetSingletonObj() *Singleton {once.Do(func() {fmt.Println("Create Obj")singleInstance = new(Singleton)})return singleInstance
}func TestGetSingletonObj(t *testing.T) {var wg sync.WaitGroupfor i := 0; i < 10; i++ {wg.Add(1)go func() {obj := GetSingletonObj()fmt.Printf("%x\n", unsafe.Pointer(obj))wg.Done()}()}
}
结构体非空情况:
import ("fmt""sync""testing""unsafe"
)type Singleton struct {// flag bool
}var singleInstance *Singleton// 主要用于类似于单例的场景中,避免资源的反复初始化
var once sync.Oncefunc GetSingletonObj() *Singleton {once.Do(func() {fmt.Println("Create Obj")singleInstance = new(Singleton)singleInstance.flag = true})return singleInstance
}// 此时 Singleton 的 struct 中不是空结构,加了 once.Do 能确保单例
func TestGetSingletonObj(t *testing.T) {var wg sync.WaitGroupfor i := 0; i < 10; i++ {wg.Add(1)go func() {obj := GetSingletonObj()fmt.Printf("%x\n", unsafe.Pointer(obj))wg.Done()}()}
}
仅需任意任务完成
import ("fmt""runtime""testing""time"
)func runTask(id int) string {time.Sleep(10 * time.Millisecond)return fmt.Sprintf("The result is from %d", id)
}func FirstResponse() string {numOfRunner := 10// 如果不使用 buffer channel,会出现协程泄露的情况:比如说第 3 个协程就返回了,但是还是会创建 11 个ch := make(chan string)// 使用的话,就不会新创建协程执行直接返回//ch := make(chan string, numOfRunner)for i := 0; i < numOfRunner; i++ {go func(i int) {ret := runTask(i)ch <- ret}(i)}return <-ch
}func TestFirstResponse(t *testing.T) {// 系统当前协程数t.Log("Before:", runtime.NumGoroutine())t.Log(FirstResponse())time.Sleep(time.Second * 1)t.Log("After:", runtime.NumGoroutine())}
所有任务完成
对象池
使用 buffered channel 实现:
obj_pool.go:
import ("errors""time"
)type ReusableObj struct {
}type ObjPool struct {bufChan chan *ReusableObj //用于缓冲可重用对象
}func NewObjPool(numOfObj int) *ObjPool {objPool := ObjPool{}objPool.bufChan = make(chan *ReusableObj, numOfObj)for i := 0; i < numOfObj; i++ {objPool.bufChan <- &ReusableObj{}}return &objPool
}func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {select {case ret := <-p.bufChan:return ret, nilcase <-time.After(timeout): //超时控制return nil, errors.New("time out")}}func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {select {case p.bufChan <- obj:return nildefault:return errors.New("overflow")}
}
obj_pool_test.go:
import ("fmt""testing""time"
)func TestObjPool(t *testing.T) {pool := NewObjPool(10)// if err := pool.ReleaseObj(&ReusableObj{}); err != nil { //尝试放置超出池大小的对象// t.Error(err)// }for i := 0; i < 11; i++ {if v, err := pool.GetObj(time.Second * 1); err != nil {t.Error(err)} else {fmt.Printf("%T\n", v)if err := pool.ReleaseObj(v); err != nil {t.Error(err)}}}fmt.Println("Done")
}
sync.Pool 对象缓存
获取
返回
使用
import ("fmt""runtime""sync""testing"
)// 不 GC输出:
// Create a new object.
// 100
// 3
// GC 输出:
// Create a new object.
// 100
// Create a new object.
// 100
func TestSyncPool(t *testing.T) {pool := &sync.Pool{New: func() interface{} {fmt.Println("Create a new object.")return 100},}v := pool.Get().(int)fmt.Println(v)pool.Put(3)runtime.GC() //GC 会清除sync.pool中缓存的对象v1, _ := pool.Get().(int)fmt.Println(v1)
}func TestSyncPoolInMultiGroutine(t *testing.T) {pool := &sync.Pool{New: func() interface{} {fmt.Println("Create a new object.")return 10},}pool.Put(100)pool.Put(100)pool.Put(100)var wg sync.WaitGroupfor i := 0; i < 10; i++ {wg.Add(1)go func(id int) {fmt.Println(pool.Get())wg.Done()}(i)}wg.Wait()
}
对象生命周期
总结
关联阅读
Go 反射
Go 基础