40分钟学 Go 语言高并发:Goroutine基础与原理

devtools/2024/11/23 21:25:49/

Day 03 - goroutine基础与原理

1. goroutine创建和调度

1.1 goroutine基本特性

特性说明
轻量级初始栈大小仅2KB,可动态增长
调度方式协作式调度,由Go运行时管理
创建成本创建成本很低,可同时运行数十万个
通信方式通过channel进行通信,而不是共享内存

1.2 创建goroutine的示例代码

package mainimport ("fmt""runtime""sync""time"
)// 监控goroutine数量
func monitorGoroutines(duration time.Duration, done chan struct{}) {ticker := time.NewTicker(duration)defer ticker.Stop()for {select {case <-ticker.C:fmt.Printf("当前goroutine数量: %d\n", runtime.NumGoroutine())case <-done:return}}
}// 模拟工作负载
type Worker struct {ID intwg *sync.WaitGroup
}func NewWorker(id int, wg *sync.WaitGroup) *Worker {return &Worker{ID: id,wg: wg,}
}func (w *Worker) Work(jobs <-chan int, results chan<- int) {defer w.wg.Done()for job := range jobs {fmt.Printf("Worker %d 开始处理任务 %d\n", w.ID, job)// 模拟工作负载time.Sleep(100 * time.Millisecond)results <- job * 2}
}func main() {numWorkers := 5numJobs := 10// 创建通道jobs := make(chan int, numJobs)results := make(chan int, numJobs)// 创建WaitGroup来等待所有worker完成var wg sync.WaitGroup// 监控goroutine数量done := make(chan struct{})go monitorGoroutines(time.Second, done)// 创建worker池fmt.Printf("创建 %d 个worker\n", numWorkers)for i := 1; i <= numWorkers; i++ {wg.Add(1)worker := NewWorker(i, &wg)go worker.Work(jobs, results)}// 发送任务fmt.Printf("发送 %d 个任务\n", numJobs)for j := 1; j <= numJobs; j++ {jobs <- j}close(jobs)// 等待所有worker完成go func() {wg.Wait()close(results)}()// 收集结果for result := range results {fmt.Printf("收到结果: %d\n", result)}// 停止监控done <- struct{}{}// 最终统计fmt.Printf("最终goroutine数量: %d\n", runtime.NumGoroutine())
}

2. GMP模型详解

2.1 GMP组件说明

组件说明职责
G (Goroutine)goroutine的抽象包含goroutine的栈、程序计数器等信息
M (Machine)工作线程执行G的实体,对应系统线程
P (Processor)处理器维护G的运行队列,提供上下文环境

2.2 GMP调度流程图

在这里插入图片描述

2.3 GMP相关的运行时参数

runtime.GOMAXPROCS(n) // 设置最大P的数量
runtime.NumCPU()      // 获取CPU核心数
runtime.NumGoroutine() // 获取当前goroutine数量

3. 并发模型原理

3.1 Go并发模型特点

特点说明
CSP模型通过通信来共享内存,而不是共享内存来通信
非阻塞调度goroutine让出CPU时不会阻塞其他goroutine
工作窃取空闲P可以从其他P窃取任务
抢占式调度支持基于信号的抢占式调度

3.2 并发模型示例

package mainimport ("context""fmt""runtime""sync""time"
)// Pipeline 表示一个数据处理管道
type Pipeline struct {input  chan intoutput chan intdone   chan struct{}
}// NewPipeline 创建新的处理管道
func NewPipeline() *Pipeline {return &Pipeline{input:  make(chan int),output: make(chan int),done:   make(chan struct{}),}
}// Process 处理数据
func (p *Pipeline) Process(ctx context.Context) {go func() {defer close(p.output)for {select {case num, ok := <-p.input:if !ok {return}// 模拟处理result := num * 2select {case p.output <- result:case <-ctx.Done():return}case <-ctx.Done():return}}}()
}// WorkerPool 表示工作池
type WorkerPool struct {workers inttasks   chan func()wg      sync.WaitGroup
}// NewWorkerPool 创建新的工作池
func NewWorkerPool(workers int) *WorkerPool {pool := &WorkerPool{workers: workers,tasks:   make(chan func(), workers*2),}pool.Start()return pool
}// Start 启动工作池
func (p *WorkerPool) Start() {for i := 0; i < p.workers; i++ {p.wg.Add(1)go func(workerID int) {defer p.wg.Done()for task := range p.tasks {fmt.Printf("Worker %d executing task\n", workerID)task()}}(i + 1)}
}// Submit 提交任务
func (p *WorkerPool) Submit(task func()) {p.tasks <- task
}// Stop 停止工作池
func (p *WorkerPool) Stop() {close(p.tasks)p.wg.Wait()
}func main() {// 设置使用的CPU核心数runtime.GOMAXPROCS(runtime.NumCPU())// 创建上下文ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)defer cancel()// 创建处理管道pipeline := NewPipeline()pipeline.Process(ctx)// 创建工作池pool := NewWorkerPool(3)// 启动生产者go func() {defer close(pipeline.input)for i := 1; i <= 10; i++ {select {case pipeline.input <- i:fmt.Printf("Sent %d to pipeline\n", i)case <-ctx.Done():return}}}()// 使用工作池处理pipeline输出go func() {for result := range pipeline.output {result := result // 捕获变量pool.Submit(func() {// 模拟处理时间time.Sleep(100 * time.Millisecond)fmt.Printf("Processed result: %d\n", result)})}// 处理完成后停止工作池pool.Stop()}()// 等待上下文结束<-ctx.Done()fmt.Println("Main context done")
}

4. goroutine生命周期

4.1 生命周期状态

状态说明
创建goroutine被创建,分配栈空间
可运行等待被调度执行
运行中正在被M执行
系统调用中阻塞在系统调用上
等待中因channel或同步原语阻塞
死亡执行完成,等待回收

4.2 生命周期示例

package mainimport ("context""fmt""runtime""runtime/debug""sync""time"
)// GoroutineMonitor 用于监控goroutine的状态
type GoroutineMonitor struct {startTime time.TimeendTime   time.Timestatus    stringsync.Mutex
}// NewGoroutineMonitor 创建新的goroutine监控器
func NewGoroutineMonitor() *GoroutineMonitor {return &GoroutineMonitor{startTime: time.Now(),status:    "created",}
}// UpdateStatus 更新goroutine状态
func (g *GoroutineMonitor) UpdateStatus(status string) {g.Lock()defer g.Unlock()g.status = statusfmt.Printf("Goroutine状态更新: %s, 时间: %v\n", status, time.Since(g.startTime))
}// Complete 标记goroutine完成
func (g *GoroutineMonitor) Complete() {g.Lock()defer g.Unlock()g.endTime = time.Now()g.status = "completed"fmt.Printf("Goroutine完成, 总运行时间: %v\n", g.endTime.Sub(g.startTime))
}// Task 代表一个任务
type Task struct {ID       intDuration time.DurationMonitor  *GoroutineMonitor
}// Execute 执行任务
func (t *Task) Execute(ctx context.Context, wg *sync.WaitGroup) {defer wg.Done()defer t.Monitor.Complete()defer func() {if r := recover(); r != nil {fmt.Printf("Task %d panic: %v\nStack: %s\n", t.ID, r, debug.Stack())t.Monitor.UpdateStatus("panic")}}()t.Monitor.UpdateStatus("running")// 模拟任务执行select {case <-time.After(t.Duration):t.Monitor.UpdateStatus("normal completion")case <-ctx.Done():t.Monitor.UpdateStatus("cancelled")return}// 模拟一些可能的状态if t.ID%4 == 0 {t.Monitor.UpdateStatus("blocked")time.Sleep(100 * time.Millisecond)} else if t.ID%3 == 0 {panic("模拟任务panic")}
}// TaskScheduler 任务调度器
type TaskScheduler struct {tasks    chan Taskworkers  intmonitors map[int]*GoroutineMonitormu       sync.RWMutex
}// NewTaskScheduler 创建任务调度器
func NewTaskScheduler(workers int) *TaskScheduler {return &TaskScheduler{tasks:    make(chan Task, workers*2),workers:  workers,monitors: make(map[int]*GoroutineMonitor),}
}// AddTask 添加任务
func (s *TaskScheduler) AddTask(task Task) {s.mu.Lock()s.monitors[task.ID] = task.Monitors.mu.Unlock()s.tasks <- task
}// Start 启动调度器
func (s *TaskScheduler) Start(ctx context.Context) {var wg sync.WaitGroup// 启动worker池for i := 0; i < s.workers; i++ {wg.Add(1)go func(workerID int) {defer wg.Done()for task := range s.tasks {task.Execute(ctx, &wg)}}(i)}go func() {wg.Wait()close(s.tasks)}()
}func main() {// 设置最大P的数量runtime.GOMAXPROCS(4)ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)defer cancel()// 创建调度器scheduler := NewTaskScheduler(3)// 启动调度器scheduler.Start(ctx)// 创建多个任务for i := 1; i <= 10; i++ {task := Task{ID:       i,Duration: time.Duration(i*200) * time.Millisecond,Monitor:  NewGoroutineMonitor(),}scheduler.AddTask(task)}// 等待context结束<-ctx.Done()// 打印最终状态fmt.Println("\n最终状态:")scheduler.mu.RLock()for id, monitor := range scheduler.monitors {monitor.Lock()fmt.Printf("Task %d - 状态: %s\n", id, monitor.status)monitor.Unlock()}scheduler.mu.RUnlock()
}

4.3 Goroutine生命周期状态转换图

在这里插入图片描述

5. 实践注意事项

5.1 goroutine泄露的常见场景

  1. channel阻塞且无法释放
func leakyGoroutine() {ch := make(chan int) // 无缓冲channelgo func() {val := <-ch // 永远阻塞在这里}()// ch没有被写入,goroutine泄露
}
  1. 无限循环
func infiniteLoop() {go func() {for {// 没有退出条件的循环// 应该添加 select 或 检查退出信号}}()
}

5.2 最佳实践表格

最佳实践说明
合理控制goroutine数量避免无限制创建goroutine
使用context控制生命周期优雅管理goroutine的退出
处理panic避免goroutine意外退出影响整个程序
及时清理资源使用defer确保资源释放
合理设置GOMAXPROCS根据CPU核心数调整P的数量

5.3 性能优化建议

  1. goroutine池化
type Pool struct {work chan func()sem  chan struct{}
}func NewPool(size int) *Pool {return &Pool{work: make(chan func()),sem:  make(chan struct{}, size),}
}func (p *Pool) Submit(task func()) {select {case p.work <- task:case p.sem <- struct{}{}:go p.worker(task)}
}func (p *Pool) worker(task func()) {defer func() { <-p.sem }()for {task()task = <-p.work}
}
  1. 避免锁竞争
// 使用atomic替代mutex
type Counter struct {count int32
}func (c *Counter) Increment() {atomic.AddInt32(&c.count, 1)
}func (c *Counter) Get() int32 {return atomic.LoadInt32(&c.count)
}

6. 调试和监控

6.1 调试工具

  1. GODEBUG参数
GODEBUG=schedtrace=1000 ./program # 每1000ms输出调度信息
GODEBUG=gctrace=1 ./program      # 输出GC信息
  1. pprof工具
import _ "net/http/pprof"go func() {log.Println(http.ListenAndServe("localhost:6060", nil))
}()

6.2 监控指标

  1. goroutine数量
  2. P的使用率
  3. 系统调用次数
  4. 调度延迟
  5. GC影响

通过深入理解goroutine的原理和生命周期,我们可以:

  1. 更好地控制并发程序的行为
  2. 避免常见的并发陷阱
  3. 优化程序性能
  4. 排查并发相关问题

怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!


http://www.ppmy.cn/devtools/136386.html

相关文章

小鹏汽车智慧材料数据库系统项目总成数据同步

1、定时任务处理 2、提供了接口 小鹏方面提供的推送的数据表结构&#xff1a; 这几个表总数为100多万&#xff0c;经过条件筛选过滤后大概2万多条数据 小鹏的人给的示例图&#xff1a; 界面&#xff1a; SQL: -- 查询车型 select bmm.md_material_id, bmm.material_num, bm…

vue3 uniapp 扫普通链接或二维码打开小程序并获取携带参数

vue3 uniapp 扫普通链接或二维码打开小程序并获取携带参数 微信公众平台添加配置 微信公众平台 > 开发管理 > 开发设置 > 扫普通链接二维码打开小程序 配置链接规则需要下载校验文档给后端存入服务器中&#xff0c;保存配置的时候会校验一次&#xff0c;确定当前的配…

Cmakelist.txt之win-c-udp-client

1.cmakelist.txt cmake_minimum_required(VERSION 3.16) ​ project(c_udp_client LANGUAGES C) ​ add_executable(c_udp_client main.c) ​ target_link_libraries(c_udp_client wsock32) ​ ​ include(GNUInstallDirs) install(TARGETS c_udp_clientLIBRARY DESTINATION $…

(神领物流)day01项目概述

项目概述要在面试的时候准确的说出整体的项目内容简单介绍&#xff01;&#xff01;&#xff01;&#xff01;至关重要 形成大型的物流公司&#xff0c;车辆的调度等等都交给系统&#xff0c;让我们的操作更加智能化&#xff0c;提升工作效率&#xff1b; &#xff01;&#xf…

vue数据变化但页面不变

记录一下vue中数据变了 但是页面没有变化的几种情况和解决办法 情况一&#xff1a;vue无法检测实例不存在于data中的变量 原因&#xff1a;由于 Vue 会在初始化实例时对data中的数据执行getter/setter转化&#xff0c;所以变量必须在data对象上存在才能让Vue将它转化成响应式…

Flutter:AnimatedSwitcher当子元素改变时,触发动画

AnimatedSwitcher中的子元素 由:CircularProgressIndicator() 改变为:Image.network(https://cdn.uviewui.com/uview/swiper/1.jpg) 则会触发动画class _MyHomePageState extends State<MyHomePage> {bool flag true;overrideWidget build(BuildContext context) {retur…

Elasticsearch 开放推理 API 增加了对 IBM watsonx.ai Slate 嵌入模型的支持

作者&#xff1a;来自 Elastic Saikat Sarkar 使用 Elasticsearch 向量数据库构建搜索 AI 体验时如何使用 IBM watsonx™ Slate 文本嵌入。 Elastic 很高兴地宣布&#xff0c;通过集成 IBM watsonx™ Slate 嵌入模型&#xff0c;我们的开放推理 API 功能得以扩展&#xff0c;这…

android general boot loader(rust安装后的build过程)

按照ahttps://cs.android.com/android/platform/superproject/main//main:bootable/libbootloader/gbl/的guide做android general boot loader下载以及build 注意事项&#xff1a; 1. bazel-bootstrap 安装参考 https://zhuanlan.zhihu.com/p/661422615 2. 根据实际情况确认…