sync包详解(下)
学习目标
知识点 | 掌握程度 | 应用场景 |
---|---|---|
WaitGroup使用 | 熟练使用和理解原理 | 并发任务的同步等待 |
Once实现原理 | 理解底层实现和使用场景 | 单例模式、一次性初始化 |
Pool性能优化 | 掌握对象池的使用和调优 | 高并发下的内存优化 |
Cond应用场景 | 了解条件变量的使用方法 | 多线程协调和通信 |
1. WaitGroup的使用
1.1 WaitGroup基本原理
WaitGroup用于等待一组goroutine完成。它提供三个方法:
- Add(): 增加等待的goroutine数量
- Done(): 标记一个goroutine完成
- Wait(): 等待所有goroutine完成
让我们通过一个完整的例子来理解WaitGroup的使用:
package mainimport ("fmt""sync""time"
)// 任务执行器
type TaskExecutor struct {tasks []func() errorwg sync.WaitGrouperrChan chan errortimeout time.Duration
}func NewTaskExecutor(timeout time.Duration) *TaskExecutor {return &TaskExecutor{tasks: make([]func() error, 0),errChan: make(chan error, 100),timeout: timeout,}
}// 添加任务
func (te *TaskExecutor) AddTask(task func() error) {te.tasks = append(te.tasks, task)
}// 执行所有任务
func (te *TaskExecutor) Execute() []error {te.wg.Add(len(te.tasks))// 启动所有任务for i := range te.tasks {go func(taskID int, task func() error) {defer te.wg.Done()// 执行任务并捕获错误if err := task(); err != nil {select {case te.errChan <- fmt.Errorf("task %d failed: %w", taskID, err):default:// 错误channel满了就打印错误fmt.Printf("Error channel full, task %d error: %v\n", taskID, err)}}}(i, te.tasks[i])}// 等待任务完成或超时done := make(chan struct{})go func() {te.wg.Wait()close(done)}()// 等待完成或超时select {case <-done:// 所有任务完成case <-time.After(te.timeout):fmt.Println("Execution timed out")}// 收集所有错误close(te.errChan)var errors []errorfor err := range te.errChan {errors = append(errors, err)}return errors
}// 模拟任务
func createTask(id int, duration time.Duration, shouldFail bool) func() error {return func() error {fmt.Printf("Task %d started\n", id)time.Sleep(duration)if shouldFail {return fmt.Errorf("task %d failed", id)}fmt.Printf("Task %d completed\n", id)return nil}
}func main() {executor := NewTaskExecutor(5 * time.Second)// 添加一些测试任务executor.AddTask(createTask(1, 2*time.Second, false))executor.AddTask(createTask(2, 1*time.Second, true))executor.AddTask(createTask(3, 3*time.Second, false))executor.AddTask(createTask(4, 2*time.Second, true))executor.AddTask(createTask(5, 1*time.Second, false))// 执行任务并收集错误errors := executor.Execute()// 打印执行结果if len(errors) > 0 {fmt.Println("\nExecution completed with errors:")for _, err := range errors {fmt.Printf("- %v\n", err)}} else {fmt.Println("\nAll tasks completed successfully")}
}
1.2 WaitGroup执行流程
2. Once的实现原理
2.1 Once基本概念
sync.Once保证一个函数只执行一次,常用于单例模式或一次性初始化。
让我们实现一个使用Once的配置加载器:
package mainimport ("encoding/json""fmt""sync""time"
)// 配置结构
type Config struct {DatabaseURL string `json:"database_url"`RedisURL string `json:"redis_url"`APIKey string `json:"api_key"`MaxWorkers int `json:"max_workers"`
}// 配置管理器
type ConfigManager struct {config *Configonce sync.Once
}var (configManager *ConfigManagerinstanceOnce sync.Once
)// 获取ConfigManager单例
func GetConfigManager() *ConfigManager {instanceOnce.Do(func() {configManager = &ConfigManager{config: &Config{},}})return configManager
}// 加载配置
func (cm *ConfigManager) LoadConfig() (*Config, error) {var err errorcm.once.Do(func() {// 模拟从文件或远程加载配置time.Sleep(time.Second) // 模拟IO操作// 模拟JSON配置jsonConfig := `{"database_url": "postgres://user:pass@localhost:5432/db","redis_url": "redis://localhost:6379","api_key": "secret-key-123","max_workers": 10}`err = json.Unmarshal([]byte(jsonConfig), cm.config)if err == nil {fmt.Println("Configuration loaded successfully")}})if err != nil {return nil, fmt.Errorf("failed to load config: %w", err)}return cm.config, nil
}// 模拟配置使用
func useConfig(id int) {cm := GetConfigManager()config, err := cm.LoadConfig()if err != nil {fmt.Printf("Goroutine %d failed to get config: %v\n", id, err)return}fmt.Printf("Goroutine %d using config: DB=%s, Redis=%s, Workers=%d\n",id, config.DatabaseURL, config.RedisURL, config.MaxWorkers)
}func main() {// 模拟多个goroutine同时访问配置var wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(id int) {defer wg.Done()time.Sleep(time.Duration(id*100) * time.Millisecond) // 错开启动时间useConfig(id)}(i)}wg.Wait()
}
2.2 Once实现流程图
3. Pool的性能优化
3.1 Pool的作用与原理
sync.Pool用于存储和复用临时对象,减少内存分配和GC压力。
让我们实现一个使用Pool优化的数据处理系统:
package mainimport ("bytes""encoding/json""fmt""sync""time"
)// 数据处理器
type DataProcessor struct {bufferPool *sync.PooljsonPool *sync.Pool
}// 示例数据结构
type DataItem struct {ID int `json:"id"`Name string `json:"name"`Value float64 `json:"value"`Timestamp time.Time `json:"timestamp"`
}// 创建新的数据处理器
func NewDataProcessor() *DataProcessor {return &DataProcessor{// 创建buffer对象池bufferPool: &sync.Pool{New: func() interface{} {return new(bytes.Buffer)},},// 创建数据对象池jsonPool: &sync.Pool{New: func() interface{} {return new(DataItem)},},}
}// 处理数据
func (dp *DataProcessor) ProcessData(data []byte) error {// 从对象池获取bufferbuf := dp.bufferPool.Get().(*bytes.Buffer)defer func() {buf.Reset()dp.bufferPool.Put(buf)}()// 从对象池获取数据对象item := dp.jsonPool.Get().(*DataItem)defer dp.jsonPool.Put(item)// 解析JSON数据if err := json.Unmarshal(data, item); err != nil {return fmt.Errorf("failed to unmarshal data: %w", err)}// 处理数据item.Value = item.Value * 1.1 // 示例处理:增加10%// 使用buffer进行JSON编码if err := json.NewEncoder(buf).Encode(item); err != nil {return fmt.Errorf("failed to encode result: %w", err)}// 在实际应用中,这里可能会将处理结果发送到某个目的地fmt.Printf("Processed: %s", buf.String())return nil
}func main() {processor := NewDataProcessor()var wg sync.WaitGroup// 模拟大量并发请求for i := 0; i < 100; i++ {wg.Add(1)go func(id int) {defer wg.Done()// 创建测试数据testData := DataItem{ID: id,Name: fmt.Sprintf("Item-%d", id),Value: float64(id * 100),Timestamp: time.Now(),}// 序列化测试数据data, err := json.Marshal(testData)if err != nil {fmt.Printf("Failed to marshal test data: %v\n", err)return}// 处理数据if err := processor.ProcessData(data); err != nil {fmt.Printf("Failed to process data: %v\n", err)}}(i)}wg.Wait()
}
3.2 Pool使用注意事项
- 对象的生命周期管理
- 内存泄漏风险
- 性能监控和调优
让我们继续完成Cond的应用场景部分。
4. Cond的应用场景
4.1 Cond基本概念
sync.Cond是一个条件变量,它在共享资源的状态变化时对多个goroutine进行协调。
让我们通过一个生产者-消费者模型来详细理解Cond的使用:
package mainimport ("fmt""sync""time"
)// 任务队列
type TaskQueue struct {mutex sync.Mutexcond *sync.Condtasks []stringmaxSize intclosed bool
}// 创建新的任务队列
func NewTaskQueue(size int) *TaskQueue {tq := &TaskQueue{tasks: make([]string, 0, size),maxSize: size,}tq.cond = sync.NewCond(&tq.mutex)return tq
}// 添加任务
func (tq *TaskQueue) Push(task string) error {tq.mutex.Lock()defer tq.mutex.Unlock()// 检查队列是否已关闭if tq.closed {return fmt.Errorf("task queue is closed")}// 等待队列有空余位置for len(tq.tasks) >= tq.maxSize {tq.cond.Wait()// 再次检查队列状态if tq.closed {return fmt.Errorf("task queue is closed")}}// 添加任务tq.tasks = append(tq.tasks, task)fmt.Printf("Task added: %s, queue size: %d\n", task, len(tq.tasks))// 通知等待的消费者tq.cond.Signal()return nil
}// 获取任务
func (tq *TaskQueue) Pop() (string, error) {tq.mutex.Lock()defer tq.mutex.Unlock()// 等待任务可用for len(tq.tasks) == 0 && !tq.closed {tq.cond.Wait()}// 检查队列是否已关闭且为空if len(tq.tasks) == 0 && tq.closed {return "", fmt.Errorf("task queue is closed and empty")}// 取出任务task := tq.tasks[0]tq.tasks = tq.tasks[1:]fmt.Printf("Task removed: %s, queue size: %d\n", task, len(tq.tasks))// 通知等待的生产者tq.cond.Signal()return task, nil
}// 关闭队列
func (tq *TaskQueue) Close() {tq.mutex.Lock()defer tq.mutex.Unlock()tq.closed = true// 通知所有等待的goroutinetq.cond.Broadcast()
}func main() {// 创建一个容量为3的任务队列queue := NewTaskQueue(3)var wg sync.WaitGroup// 启动3个生产者for i := 0; i < 3; i++ {wg.Add(1)go func(id int) {defer wg.Done()for j := 0; j < 5; j++ {task := fmt.Sprintf("Task-P%d-%d", id, j)err := queue.Push(task)if err != nil {fmt.Printf("Producer %d failed: %v\n", id, err)return}time.Sleep(time.Millisecond * 100) // 模拟生产耗时}}(i)}// 启动2个消费者for i := 0; i < 2; i++ {wg.Add(1)go func(id int) {defer wg.Done()for {task, err := queue.Pop()if err != nil {fmt.Printf("Consumer %d exit: %v\n", id, err)return}fmt.Printf("Consumer %d processed %s\n", id, task)time.Sleep(time.Millisecond * 200) // 模拟消费耗时}}(i)}// 等待生产者完成后关闭队列go func() {wg.Wait()fmt.Println("All producers finished, closing queue...")queue.Close()}()// 等待所有goroutine完成time.Sleep(time.Second * 5)fmt.Println("Main: exit")
}
4.2 Cond的工作流程
4.3 Cond的常见应用场景
-
生产者-消费者模式
- 队列满/空的等待条件
- 批量处理的条件同步
-
资源池管理
- 连接池的可用连接等待
- 工作池的任务分发
-
线程协调
- 启动信号的统一等待
- 任务完成的同步通知
-
状态变更通知
- 配置更新的广播
- 系统状态转换的通知
4.4 性能优化建议
- 条件检查
// 推荐做法
for !condition() {cond.Wait()
}// 不推荐做法
if !condition() {cond.Wait()
}
- 信号通知
// 单个等待者使用Signal
cond.Signal()// 多个等待者使用Broadcast
cond.Broadcast()
- 锁的范围
// 推荐做法
mu.Lock()
// 最小化临界区
mu.Unlock()// 不推荐做法
mu.Lock()
// 大量非临界区操作
mu.Unlock()
总结
知识要点回顾
- WaitGroup
- Add/Done/Wait的正确使用
- 并发任务的同步控制
- 错误处理和超时机制
- Once
- 单例模式实现
- 初始化控制
- 并发安全保证
- Pool
- 对象复用机制
- 内存优化策略
- 性能监控方法
- Cond
- 条件变量的使用
- 生产者-消费者模式
- 多goroutine协调
实践建议
- 代码质量
- 始终使用defer确保解锁
- 检查并处理所有错误情况
- 添加适当的超时机制
- 性能优化
- 最小化锁的范围
- 合理使用对象池
- 避免过度同步
- 调试技巧
- 使用race detector
- 添加详细的日志
- 监控关键指标
怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!