Day 03 - goroutine基础与原理
1. goroutine创建和调度
1.1 goroutine基本特性
特性 | 说明 |
---|
轻量级 | 初始栈大小仅2KB,可动态增长 |
调度方式 | 协作式调度,由Go运行时管理 |
创建成本 | 创建成本很低,可同时运行数十万个 |
通信方式 | 通过channel进行通信,而不是共享内存 |
1.2 创建goroutine的示例代码
package mainimport ("fmt""runtime""sync""time"
)
func monitorGoroutines(duration time.Duration, done chan struct{}) {ticker := time.NewTicker(duration)defer ticker.Stop()for {select {case <-ticker.C:fmt.Printf("当前goroutine数量: %d\n", runtime.NumGoroutine())case <-done:return}}
}
type Worker struct {ID intwg *sync.WaitGroup
}func NewWorker(id int, wg *sync.WaitGroup) *Worker {return &Worker{ID: id,wg: wg,}
}func (w *Worker) Work(jobs <-chan int, results chan<- int) {defer w.wg.Done()for job := range jobs {fmt.Printf("Worker %d 开始处理任务 %d\n", w.ID, job)time.Sleep(100 * time.Millisecond)results <- job * 2}
}func main() {numWorkers := 5numJobs := 10jobs := make(chan int, numJobs)results := make(chan int, numJobs)var wg sync.WaitGroupdone := make(chan struct{})go monitorGoroutines(time.Second, done)fmt.Printf("创建 %d 个worker\n", numWorkers)for i := 1; i <= numWorkers; i++ {wg.Add(1)worker := NewWorker(i, &wg)go worker.Work(jobs, results)}fmt.Printf("发送 %d 个任务\n", numJobs)for j := 1; j <= numJobs; j++ {jobs <- j}close(jobs)go func() {wg.Wait()close(results)}()for result := range results {fmt.Printf("收到结果: %d\n", result)}done <- struct{}{}fmt.Printf("最终goroutine数量: %d\n", runtime.NumGoroutine())
}
2. GMP模型详解
2.1 GMP组件说明
组件 | 说明 | 职责 |
---|
G (Goroutine) | goroutine的抽象 | 包含goroutine的栈、程序计数器等信息 |
M (Machine) | 工作线程 | 执行G的实体,对应系统线程 |
P (Processor) | 处理器 | 维护G的运行队列,提供上下文环境 |
2.2 GMP调度流程图
2.3 GMP相关的运行时参数
runtime.GOMAXPROCS(n)
runtime.NumCPU()
runtime.NumGoroutine()
3. 并发模型原理
3.1 Go并发模型特点
特点 | 说明 |
---|
CSP模型 | 通过通信来共享内存,而不是共享内存来通信 |
非阻塞调度 | goroutine让出CPU时不会阻塞其他goroutine |
工作窃取 | 空闲P可以从其他P窃取任务 |
抢占式调度 | 支持基于信号的抢占式调度 |
3.2 并发模型示例
package mainimport ("context""fmt""runtime""sync""time"
)
type Pipeline struct {input chan intoutput chan intdone chan struct{}
}
func NewPipeline() *Pipeline {return &Pipeline{input: make(chan int),output: make(chan int),done: make(chan struct{}),}
}
func (p *Pipeline) Process(ctx context.Context) {go func() {defer close(p.output)for {select {case num, ok := <-p.input:if !ok {return}result := num * 2select {case p.output <- result:case <-ctx.Done():return}case <-ctx.Done():return}}}()
}
type WorkerPool struct {workers inttasks chan func()wg sync.WaitGroup
}
func NewWorkerPool(workers int) *WorkerPool {pool := &WorkerPool{workers: workers,tasks: make(chan func(), workers*2),}pool.Start()return pool
}
func (p *WorkerPool) Start() {for i := 0; i < p.workers; i++ {p.wg.Add(1)go func(workerID int) {defer p.wg.Done()for task := range p.tasks {fmt.Printf("Worker %d executing task\n", workerID)task()}}(i + 1)}
}
func (p *WorkerPool) Submit(task func()) {p.tasks <- task
}
func (p *WorkerPool) Stop() {close(p.tasks)p.wg.Wait()
}func main() {runtime.GOMAXPROCS(runtime.NumCPU())ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)defer cancel()pipeline := NewPipeline()pipeline.Process(ctx)pool := NewWorkerPool(3)go func() {defer close(pipeline.input)for i := 1; i <= 10; i++ {select {case pipeline.input <- i:fmt.Printf("Sent %d to pipeline\n", i)case <-ctx.Done():return}}}()go func() {for result := range pipeline.output {result := result pool.Submit(func() {time.Sleep(100 * time.Millisecond)fmt.Printf("Processed result: %d\n", result)})}pool.Stop()}()<-ctx.Done()fmt.Println("Main context done")
}
4. goroutine生命周期
4.1 生命周期状态
状态 | 说明 |
---|
创建 | goroutine被创建,分配栈空间 |
可运行 | 等待被调度执行 |
运行中 | 正在被M执行 |
系统调用中 | 阻塞在系统调用上 |
等待中 | 因channel或同步原语阻塞 |
死亡 | 执行完成,等待回收 |
4.2 生命周期示例
package mainimport ("context""fmt""runtime""runtime/debug""sync""time"
)
type GoroutineMonitor struct {startTime time.TimeendTime time.Timestatus stringsync.Mutex
}
func NewGoroutineMonitor() *GoroutineMonitor {return &GoroutineMonitor{startTime: time.Now(),status: "created",}
}
func (g *GoroutineMonitor) UpdateStatus(status string) {g.Lock()defer g.Unlock()g.status = statusfmt.Printf("Goroutine状态更新: %s, 时间: %v\n", status, time.Since(g.startTime))
}
func (g *GoroutineMonitor) Complete() {g.Lock()defer g.Unlock()g.endTime = time.Now()g.status = "completed"fmt.Printf("Goroutine完成, 总运行时间: %v\n", g.endTime.Sub(g.startTime))
}
type Task struct {ID intDuration time.DurationMonitor *GoroutineMonitor
}
func (t *Task) Execute(ctx context.Context, wg *sync.WaitGroup) {defer wg.Done()defer t.Monitor.Complete()defer func() {if r := recover(); r != nil {fmt.Printf("Task %d panic: %v\nStack: %s\n", t.ID, r, debug.Stack())t.Monitor.UpdateStatus("panic")}}()t.Monitor.UpdateStatus("running")select {case <-time.After(t.Duration):t.Monitor.UpdateStatus("normal completion")case <-ctx.Done():t.Monitor.UpdateStatus("cancelled")return}if t.ID%4 == 0 {t.Monitor.UpdateStatus("blocked")time.Sleep(100 * time.Millisecond)} else if t.ID%3 == 0 {panic("模拟任务panic")}
}
type TaskScheduler struct {tasks chan Taskworkers intmonitors map[int]*GoroutineMonitormu sync.RWMutex
}
func NewTaskScheduler(workers int) *TaskScheduler {return &TaskScheduler{tasks: make(chan Task, workers*2),workers: workers,monitors: make(map[int]*GoroutineMonitor),}
}
func (s *TaskScheduler) AddTask(task Task) {s.mu.Lock()s.monitors[task.ID] = task.Monitors.mu.Unlock()s.tasks <- task
}
func (s *TaskScheduler) Start(ctx context.Context) {var wg sync.WaitGroupfor i := 0; i < s.workers; i++ {wg.Add(1)go func(workerID int) {defer wg.Done()for task := range s.tasks {task.Execute(ctx, &wg)}}(i)}go func() {wg.Wait()close(s.tasks)}()
}func main() {runtime.GOMAXPROCS(4)ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)defer cancel()scheduler := NewTaskScheduler(3)scheduler.Start(ctx)for i := 1; i <= 10; i++ {task := Task{ID: i,Duration: time.Duration(i*200) * time.Millisecond,Monitor: NewGoroutineMonitor(),}scheduler.AddTask(task)}<-ctx.Done()fmt.Println("\n最终状态:")scheduler.mu.RLock()for id, monitor := range scheduler.monitors {monitor.Lock()fmt.Printf("Task %d - 状态: %s\n", id, monitor.status)monitor.Unlock()}scheduler.mu.RUnlock()
}
4.3 Goroutine生命周期状态转换图
5. 实践注意事项
5.1 goroutine泄露的常见场景
- channel阻塞且无法释放
func leakyGoroutine() {ch := make(chan int) go func() {val := <-ch }()
}
- 无限循环
func infiniteLoop() {go func() {for {}}()
}
5.2 最佳实践表格
最佳实践 | 说明 |
---|
合理控制goroutine数量 | 避免无限制创建goroutine |
使用context控制生命周期 | 优雅管理goroutine的退出 |
处理panic | 避免goroutine意外退出影响整个程序 |
及时清理资源 | 使用defer确保资源释放 |
合理设置GOMAXPROCS | 根据CPU核心数调整P的数量 |
5.3 性能优化建议
- goroutine池化
type Pool struct {work chan func()sem chan struct{}
}func NewPool(size int) *Pool {return &Pool{work: make(chan func()),sem: make(chan struct{}, size),}
}func (p *Pool) Submit(task func()) {select {case p.work <- task:case p.sem <- struct{}{}:go p.worker(task)}
}func (p *Pool) worker(task func()) {defer func() { <-p.sem }()for {task()task = <-p.work}
}
- 避免锁竞争
type Counter struct {count int32
}func (c *Counter) Increment() {atomic.AddInt32(&c.count, 1)
}func (c *Counter) Get() int32 {return atomic.LoadInt32(&c.count)
}
6. 调试和监控
6.1 调试工具
- GODEBUG参数
GODEBUG=schedtrace=1000 ./program
GODEBUG=gctrace=1 ./program
- pprof工具
import _ "net/http/pprof"go func() {log.Println(http.ListenAndServe("localhost:6060", nil))
}()
6.2 监控指标
- goroutine数量
- P的使用率
- 系统调用次数
- 调度延迟
- GC影响
通过深入理解goroutine的原理和生命周期,我们可以:
- 更好地控制并发程序的行为
- 避免常见的并发陷阱
- 优化程序性能
- 排查并发相关问题
怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!