40分钟学 Go 语言高并发:sync包详解(下)

devtools/2024/11/24 4:38:18/

sync包详解(下)

学习目标

知识点掌握程度应用场景
WaitGroup使用熟练使用和理解原理并发任务的同步等待
Once实现原理理解底层实现和使用场景单例模式、一次性初始化
Pool性能优化掌握对象池的使用和调优高并发下的内存优化
Cond应用场景了解条件变量的使用方法多线程协调和通信

1. WaitGroup的使用

1.1 WaitGroup基本原理

WaitGroup用于等待一组goroutine完成。它提供三个方法:

  • Add(): 增加等待的goroutine数量
  • Done(): 标记一个goroutine完成
  • Wait(): 等待所有goroutine完成

让我们通过一个完整的例子来理解WaitGroup的使用:

package mainimport ("fmt""sync""time"
)// 任务执行器
type TaskExecutor struct {tasks    []func() errorwg       sync.WaitGrouperrChan  chan errortimeout  time.Duration
}func NewTaskExecutor(timeout time.Duration) *TaskExecutor {return &TaskExecutor{tasks:    make([]func() error, 0),errChan:  make(chan error, 100),timeout:  timeout,}
}// 添加任务
func (te *TaskExecutor) AddTask(task func() error) {te.tasks = append(te.tasks, task)
}// 执行所有任务
func (te *TaskExecutor) Execute() []error {te.wg.Add(len(te.tasks))// 启动所有任务for i := range te.tasks {go func(taskID int, task func() error) {defer te.wg.Done()// 执行任务并捕获错误if err := task(); err != nil {select {case te.errChan <- fmt.Errorf("task %d failed: %w", taskID, err):default:// 错误channel满了就打印错误fmt.Printf("Error channel full, task %d error: %v\n", taskID, err)}}}(i, te.tasks[i])}// 等待任务完成或超时done := make(chan struct{})go func() {te.wg.Wait()close(done)}()// 等待完成或超时select {case <-done:// 所有任务完成case <-time.After(te.timeout):fmt.Println("Execution timed out")}// 收集所有错误close(te.errChan)var errors []errorfor err := range te.errChan {errors = append(errors, err)}return errors
}// 模拟任务
func createTask(id int, duration time.Duration, shouldFail bool) func() error {return func() error {fmt.Printf("Task %d started\n", id)time.Sleep(duration)if shouldFail {return fmt.Errorf("task %d failed", id)}fmt.Printf("Task %d completed\n", id)return nil}
}func main() {executor := NewTaskExecutor(5 * time.Second)// 添加一些测试任务executor.AddTask(createTask(1, 2*time.Second, false))executor.AddTask(createTask(2, 1*time.Second, true))executor.AddTask(createTask(3, 3*time.Second, false))executor.AddTask(createTask(4, 2*time.Second, true))executor.AddTask(createTask(5, 1*time.Second, false))// 执行任务并收集错误errors := executor.Execute()// 打印执行结果if len(errors) > 0 {fmt.Println("\nExecution completed with errors:")for _, err := range errors {fmt.Printf("- %v\n", err)}} else {fmt.Println("\nAll tasks completed successfully")}
}

1.2 WaitGroup执行流程

在这里插入图片描述

2. Once的实现原理

2.1 Once基本概念

sync.Once保证一个函数只执行一次,常用于单例模式或一次性初始化。

让我们实现一个使用Once的配置加载器:

package mainimport ("encoding/json""fmt""sync""time"
)// 配置结构
type Config struct {DatabaseURL string `json:"database_url"`RedisURL    string `json:"redis_url"`APIKey      string `json:"api_key"`MaxWorkers  int    `json:"max_workers"`
}// 配置管理器
type ConfigManager struct {config *Configonce   sync.Once
}var (configManager *ConfigManagerinstanceOnce  sync.Once
)// 获取ConfigManager单例
func GetConfigManager() *ConfigManager {instanceOnce.Do(func() {configManager = &ConfigManager{config: &Config{},}})return configManager
}// 加载配置
func (cm *ConfigManager) LoadConfig() (*Config, error) {var err errorcm.once.Do(func() {// 模拟从文件或远程加载配置time.Sleep(time.Second) // 模拟IO操作// 模拟JSON配置jsonConfig := `{"database_url": "postgres://user:pass@localhost:5432/db","redis_url": "redis://localhost:6379","api_key": "secret-key-123","max_workers": 10}`err = json.Unmarshal([]byte(jsonConfig), cm.config)if err == nil {fmt.Println("Configuration loaded successfully")}})if err != nil {return nil, fmt.Errorf("failed to load config: %w", err)}return cm.config, nil
}// 模拟配置使用
func useConfig(id int) {cm := GetConfigManager()config, err := cm.LoadConfig()if err != nil {fmt.Printf("Goroutine %d failed to get config: %v\n", id, err)return}fmt.Printf("Goroutine %d using config: DB=%s, Redis=%s, Workers=%d\n",id, config.DatabaseURL, config.RedisURL, config.MaxWorkers)
}func main() {// 模拟多个goroutine同时访问配置var wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(id int) {defer wg.Done()time.Sleep(time.Duration(id*100) * time.Millisecond) // 错开启动时间useConfig(id)}(i)}wg.Wait()
}

2.2 Once实现流程图

在这里插入图片描述

3. Pool的性能优化

3.1 Pool的作用与原理

sync.Pool用于存储和复用临时对象,减少内存分配和GC压力。

让我们实现一个使用Pool优化的数据处理系统:

package mainimport ("bytes""encoding/json""fmt""sync""time"
)// 数据处理器
type DataProcessor struct {bufferPool *sync.PooljsonPool   *sync.Pool
}// 示例数据结构
type DataItem struct {ID        int       `json:"id"`Name      string    `json:"name"`Value     float64   `json:"value"`Timestamp time.Time `json:"timestamp"`
}// 创建新的数据处理器
func NewDataProcessor() *DataProcessor {return &DataProcessor{// 创建buffer对象池bufferPool: &sync.Pool{New: func() interface{} {return new(bytes.Buffer)},},// 创建数据对象池jsonPool: &sync.Pool{New: func() interface{} {return new(DataItem)},},}
}// 处理数据
func (dp *DataProcessor) ProcessData(data []byte) error {// 从对象池获取bufferbuf := dp.bufferPool.Get().(*bytes.Buffer)defer func() {buf.Reset()dp.bufferPool.Put(buf)}()// 从对象池获取数据对象item := dp.jsonPool.Get().(*DataItem)defer dp.jsonPool.Put(item)// 解析JSON数据if err := json.Unmarshal(data, item); err != nil {return fmt.Errorf("failed to unmarshal data: %w", err)}// 处理数据item.Value = item.Value * 1.1 // 示例处理:增加10%// 使用buffer进行JSON编码if err := json.NewEncoder(buf).Encode(item); err != nil {return fmt.Errorf("failed to encode result: %w", err)}// 在实际应用中,这里可能会将处理结果发送到某个目的地fmt.Printf("Processed: %s", buf.String())return nil
}func main() {processor := NewDataProcessor()var wg sync.WaitGroup// 模拟大量并发请求for i := 0; i < 100; i++ {wg.Add(1)go func(id int) {defer wg.Done()// 创建测试数据testData := DataItem{ID:        id,Name:      fmt.Sprintf("Item-%d", id),Value:     float64(id * 100),Timestamp: time.Now(),}// 序列化测试数据data, err := json.Marshal(testData)if err != nil {fmt.Printf("Failed to marshal test data: %v\n", err)return}// 处理数据if err := processor.ProcessData(data); err != nil {fmt.Printf("Failed to process data: %v\n", err)}}(i)}wg.Wait()
}

3.2 Pool使用注意事项

  1. 对象的生命周期管理
  2. 内存泄漏风险
  3. 性能监控和调优

让我们继续完成Cond的应用场景部分。

4. Cond的应用场景

4.1 Cond基本概念

sync.Cond是一个条件变量,它在共享资源的状态变化时对多个goroutine进行协调。

让我们通过一个生产者-消费者模型来详细理解Cond的使用:

package mainimport ("fmt""sync""time"
)// 任务队列
type TaskQueue struct {mutex     sync.Mutexcond      *sync.Condtasks     []stringmaxSize   intclosed    bool
}// 创建新的任务队列
func NewTaskQueue(size int) *TaskQueue {tq := &TaskQueue{tasks:   make([]string, 0, size),maxSize: size,}tq.cond = sync.NewCond(&tq.mutex)return tq
}// 添加任务
func (tq *TaskQueue) Push(task string) error {tq.mutex.Lock()defer tq.mutex.Unlock()// 检查队列是否已关闭if tq.closed {return fmt.Errorf("task queue is closed")}// 等待队列有空余位置for len(tq.tasks) >= tq.maxSize {tq.cond.Wait()// 再次检查队列状态if tq.closed {return fmt.Errorf("task queue is closed")}}// 添加任务tq.tasks = append(tq.tasks, task)fmt.Printf("Task added: %s, queue size: %d\n", task, len(tq.tasks))// 通知等待的消费者tq.cond.Signal()return nil
}// 获取任务
func (tq *TaskQueue) Pop() (string, error) {tq.mutex.Lock()defer tq.mutex.Unlock()// 等待任务可用for len(tq.tasks) == 0 && !tq.closed {tq.cond.Wait()}// 检查队列是否已关闭且为空if len(tq.tasks) == 0 && tq.closed {return "", fmt.Errorf("task queue is closed and empty")}// 取出任务task := tq.tasks[0]tq.tasks = tq.tasks[1:]fmt.Printf("Task removed: %s, queue size: %d\n", task, len(tq.tasks))// 通知等待的生产者tq.cond.Signal()return task, nil
}// 关闭队列
func (tq *TaskQueue) Close() {tq.mutex.Lock()defer tq.mutex.Unlock()tq.closed = true// 通知所有等待的goroutinetq.cond.Broadcast()
}func main() {// 创建一个容量为3的任务队列queue := NewTaskQueue(3)var wg sync.WaitGroup// 启动3个生产者for i := 0; i < 3; i++ {wg.Add(1)go func(id int) {defer wg.Done()for j := 0; j < 5; j++ {task := fmt.Sprintf("Task-P%d-%d", id, j)err := queue.Push(task)if err != nil {fmt.Printf("Producer %d failed: %v\n", id, err)return}time.Sleep(time.Millisecond * 100) // 模拟生产耗时}}(i)}// 启动2个消费者for i := 0; i < 2; i++ {wg.Add(1)go func(id int) {defer wg.Done()for {task, err := queue.Pop()if err != nil {fmt.Printf("Consumer %d exit: %v\n", id, err)return}fmt.Printf("Consumer %d processed %s\n", id, task)time.Sleep(time.Millisecond * 200) // 模拟消费耗时}}(i)}// 等待生产者完成后关闭队列go func() {wg.Wait()fmt.Println("All producers finished, closing queue...")queue.Close()}()// 等待所有goroutine完成time.Sleep(time.Second * 5)fmt.Println("Main: exit")
}

4.2 Cond的工作流程

在这里插入图片描述

4.3 Cond的常见应用场景

  1. 生产者-消费者模式

    • 队列满/空的等待条件
    • 批量处理的条件同步
  2. 资源池管理

    • 连接池的可用连接等待
    • 工作池的任务分发
  3. 线程协调

    • 启动信号的统一等待
    • 任务完成的同步通知
  4. 状态变更通知

    • 配置更新的广播
    • 系统状态转换的通知

4.4 性能优化建议

  1. 条件检查
// 推荐做法
for !condition() {cond.Wait()
}// 不推荐做法
if !condition() {cond.Wait()
}
  1. 信号通知
// 单个等待者使用Signal
cond.Signal()// 多个等待者使用Broadcast
cond.Broadcast()
  1. 锁的范围
// 推荐做法
mu.Lock()
// 最小化临界区
mu.Unlock()// 不推荐做法
mu.Lock()
// 大量非临界区操作
mu.Unlock()

总结

知识要点回顾

  1. WaitGroup
  • Add/Done/Wait的正确使用
  • 并发任务的同步控制
  • 错误处理和超时机制
  1. Once
  • 单例模式实现
  • 初始化控制
  • 并发安全保证
  1. Pool
  • 对象复用机制
  • 内存优化策略
  • 性能监控方法
  1. Cond
  • 条件变量的使用
  • 生产者-消费者模式
  • 多goroutine协调

实践建议

  1. 代码质量
  • 始终使用defer确保解锁
  • 检查并处理所有错误情况
  • 添加适当的超时机制
  1. 性能优化
  • 最小化锁的范围
  • 合理使用对象池
  • 避免过度同步
  1. 调试技巧
  • 使用race detector
  • 添加详细的日志
  • 监控关键指标

怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!


http://www.ppmy.cn/devtools/136457.html

相关文章

npm/cnpm的使用

npm 1、安装npm 前往nodejs官网下载安装node 验证是否安装成功node node -v node安装npm也会安装 npm -v 2、使用npm 1. 初始化项目 在一个项目文件夹中运行&#xff1a; npm init 根据提示输入项目信息&#xff08;如项目名称、版本号等&#xff09;。 如果你希望快速初…

Linux下Intel编译器oneAPI安装和链接MKL库编译

参考: https://blog.csdn.net/qq_44263574/article/details/123582481 官网下载: https://www.intel.com/content/www/us/en/developer/tools/oneapi/base-toolkit-download.html?packagesoneapi-toolkit&oneapi-toolkit-oslinux&oneapi-linoffline 填写邮件和国家,…

基于spring boot扶贫助农系统设计与实现

摘 要 扶贫助农系统是一种旨在改善农村贫困地区经济发展和居民生活水平的综合信息化平台。该系统通过整合资源、提供信息服务和优化供应链管理&#xff0c;帮助农民增加收入并提升农业生产效率。系统功能包括农产品在线销售、扶贫资讯等等功能。用户界面友好&#xff0c;操作简…

低速接口项目之串口Uart开发(四)——UART串口实现FPGA内部AXILITE寄存器的读写控制

本节目录 一、设计背景 二、设计思路 三、逻辑设计框架 四、仿真验证 五、上板验证 六、往期文章链接本节内容 一、设计背景 通常&#xff0c;芯片手册或者IP都会提供一系列的用户寄存器以及相关的定义&#xff0c;用于软件开发人员进行控制底层硬件来调试&#xff0c;或封装…

Leetcode 每日一题 167 .两数之和

目录 引言 问题背景 输入输出规范 输入 输出 示例解析 示例 1 示例 2 示例 3 算法策略 Java代码实现 复杂度分析 结语 引言 在算法的世界里&#xff0c;有些问题虽然简单&#xff0c;但却是锻炼算法思维的绝佳练习。今天&#xff0c;我们将深入探讨一个在面试中经…

Zabbix:使用CentOS 9,基于LNMP平台,源码部署Zabbix 7。

ZBX&#xff1a;源码部署Zabbix 7 一、Zabbix概述1. 什么是zabbix2. 为什么学习zabbix3. 逻辑架构3. 实验环境4. 软件下载&#xff1a; 二、安装前的系统准备工作1. 配置主机名2. 关闭防火墙3. 关闭selinux4. 配置yum源5. 配置时钟同步6. 优化系统限制7. 安装JDK 三、部署LNMP环…

猎板科技:PCB 特殊定制领域的卓越引领者

一、专业团队&#xff0c;创新设计之源 猎板科技的核心竞争力首先源于其卓越的专业团队。这支队伍汇聚了经验丰富的资深工程师以及行业前沿的技术专家&#xff0c;他们在 PCB 设计领域拥有深厚的造诣和敏锐的洞察力。无论是面对常规 PCB 设计任务&#xff0c;还是应对极具挑战…

SpringBoot学习记录(六)配置文件参数化

SpringBoot学习记录&#xff08;六&#xff09;配置文件参数化 一、参数提取到配置文件中二、yml配置文件三、ConfigurationProperties注解实现批量属性注入 一、参数提取到配置文件中 定义在代码中的参数的值分散在各个不同的文件中&#xff0c;不便于后期维护管理&#xff0…