40分钟学 Go 语言高并发:Select多路复用

devtools/2024/11/24 20:26:38/

Select多路复用

学习目标

知识点掌握程度应用场景
select实现原理深入理解底层机制channel通信和多路选择
超时处理掌握超时控制方法避免阻塞和资源浪费
优先级控制理解优先级实现处理多个channel的顺序
性能考虑了解性能优化点高并发场景优化

1. Select实现原理

让我们通过一个完整的例子来理解select的工作原理:

package mainimport ("fmt""math/rand""sync""time"
)// 数据生产者
type Producer struct {dataChan chan intdone     chan struct{}
}// 创建新的生产者
func NewProducer() *Producer {return &Producer{dataChan: make(chan int, 100),done:     make(chan struct{}),}
}// 启动生产
func (p *Producer) Start() {go func() {defer close(p.dataChan)for {select {case <-p.done:fmt.Println("Producer: received stop signal")returndefault:// 生成随机数据data := rand.Intn(100)select {case p.dataChan <- data:fmt.Printf("Producer: sent data %d\n", data)time.Sleep(time.Millisecond * 100)case <-p.done:fmt.Println("Producer: received stop signal while sending")return}}}}()
}// 停止生产
func (p *Producer) Stop() {close(p.done)
}// 获取数据通道
func (p *Producer) DataChan() <-chan int {return p.dataChan
}// 数据处理器
type Processor struct {producers []*Producerresults   chan intdone      chan struct{}
}// 创建新的处理器
func NewProcessor(producerCount int) *Processor {producers := make([]*Producer, producerCount)for i := 0; i < producerCount; i++ {producers[i] = NewProducer()}return &Processor{producers: producers,results:   make(chan int, producerCount*100),done:      make(chan struct{}),}
}// 启动处理
func (p *Processor) Start() {// 启动所有生产者for i, producer := range p.producers {producer.Start()// 为每个生产者启动一个处理goroutinego func(id int, prod *Producer) {for {select {case data, ok := <-prod.DataChan():if !ok {fmt.Printf("Processor %d: producer channel closed\n", id)return}// 处理数据result := data * 2select {case p.results <- result:fmt.Printf("Processor %d: processed data %d -> %d\n", id, data, result)case <-p.done:return}case <-p.done:fmt.Printf("Processor %d: received stop signal\n", id)return}}}(i, producer)}
}// 停止处理
func (p *Processor) Stop() {close(p.done)for _, producer := range p.producers {producer.Stop()}
}// 获取结果通道
func (p *Processor) Results() <-chan int {return p.results
}func main() {// 创建有3个生产者的处理器processor := NewProcessor(3)// 启动处理器processor.Start()// 创建结果收集器var wg sync.WaitGroupwg.Add(1)go func() {defer wg.Done()count := 0for result := range processor.Results() {fmt.Printf("Collector: received result %d\n", result)count++if count >= 20 { // 收集20个结果后停止processor.Stop()break}}}()// 等待处理完成wg.Wait()fmt.Println("Main: processing completed")
}

1.1 Select执行流程图

在这里插入图片描述

2. 超时处理

让我们实现一个带有超时控制的服务请求处理系统:

package mainimport ("context""fmt""math/rand""sync""time"
)// 请求处理器
type RequestHandler struct {requests  chan Requestresponses chan Responsedone      chan struct{}wg        sync.WaitGroup
}// 请求结构
type Request struct {ID      intTimeout time.DurationData    string
}// 响应结构
type Response struct {RequestID intResult    stringError     error
}// 创建新的请求处理器
func NewRequestHandler() *RequestHandler {return &RequestHandler{requests:  make(chan Request, 100),responses: make(chan Response, 100),done:      make(chan struct{}),}
}// 启动处理器
func (h *RequestHandler) Start(workers int) {for i := 0; i < workers; i++ {h.wg.Add(1)go h.worker(i)}
}// 工作协程
func (h *RequestHandler) worker(id int) {defer h.wg.Done()for {select {case req, ok := <-h.requests:if !ok {fmt.Printf("Worker %d: request channel closed\n", id)return}// 创建context用于超时控制ctx, cancel := context.WithTimeout(context.Background(), req.Timeout)// 处理请求response := h.processRequest(ctx, req)// 发送响应select {case h.responses <- response:fmt.Printf("Worker %d: sent response for request %d\n", id, req.ID)case <-h.done:cancel()return}cancel() // 清理contextcase <-h.done:fmt.Printf("Worker %d: received stop signal\n", id)return}}
}// 处理单个请求
func (h *RequestHandler) processRequest(ctx context.Context, req Request) Response {// 模拟处理时间processTime := time.Duration(rand.Intn(int(req.Timeout))) + req.Timeout/2select {case <-time.After(processTime):return Response{RequestID: req.ID,Result:   fmt.Sprintf("Processed: %s", req.Data),}case <-ctx.Done():return Response{RequestID: req.ID,Error:    ctx.Err(),}}
}// 提交请求
func (h *RequestHandler) SubmitRequest(req Request) error {select {case h.requests <- req:return nilcase <-h.done:return fmt.Errorf("handler is stopped")}
}// 获取响应
func (h *RequestHandler) GetResponse() (Response, error) {select {case resp := <-h.responses:return resp, nilcase <-h.done:return Response{}, fmt.Errorf("handler is stopped")}
}// 停止处理器
func (h *RequestHandler) Stop() {close(h.done)h.wg.Wait()close(h.requests)close(h.responses)
}func main() {// 创建请求处理器handler := NewRequestHandler()handler.Start(3)// 发送一些测试请求requests := []Request{{ID: 1, Timeout: time.Second, Data: "Fast request"},{ID: 2, Timeout: time.Second * 2, Data: "Normal request"},{ID: 3, Timeout: time.Millisecond * 500, Data: "Quick request"},{ID: 4, Timeout: time.Second * 3, Data: "Slow request"},}// 提交请求for _, req := range requests {if err := handler.SubmitRequest(req); err != nil {fmt.Printf("Failed to submit request %d: %v\n", req.ID, err)continue}fmt.Printf("Submitted request %d\n", req.ID)}// 收集响应var wg sync.WaitGroupwg.Add(1)go func() {defer wg.Done()for i := 0; i < len(requests); i++ {resp, err := handler.GetResponse()if err != nil {fmt.Printf("Failed to get response: %v\n", err)continue}if resp.Error != nil {fmt.Printf("Request %d failed: %v\n", resp.RequestID, resp.Error)} else {fmt.Printf("Request %d succeeded: %s\n", resp.RequestID, resp.Result)}}}()// 等待所有响应处理完成wg.Wait()// 停止处理器handler.Stop()fmt.Println("Main: processing completed")
}

3. 优先级控制

让我们实现一个带有优先级控制的任务调度系统:

package mainimport ("fmt""math/rand""sort""sync""time"
)// 优先级级别
const (PriorityHigh = iotaPriorityMediumPriorityLow
)// 任务结构
type Task struct {ID       intPriority intAction   func() error
}// 优先级调度器
type PriorityScheduler struct {highPriority   chan TaskmediumPriority chan TasklowPriority    chan Taskresults        chan errordone           chan struct{}wg             sync.WaitGroup
}// 创建新的调度器
func NewPriorityScheduler() *PriorityScheduler {return &PriorityScheduler{highPriority:   make(chan Task, 100),mediumPriority: make(chan Task, 100),lowPriority:    make(chan Task, 100),results:        make(chan error, 100),done:          make(chan struct{}),}
}// 启动调度器
func (s *PriorityScheduler) Start(workers int) {for i := 0; i < workers; i++ {s.wg.Add(1)go s.worker(i)}
}// 工作协程
func (s *PriorityScheduler) worker(id int) {defer s.wg.Done()for {// 使用优先级顺序处理任务select {case <-s.done:return// 高优先级任务case task := <-s.highPriority:fmt.Printf("Worker %d: processing high priority task %d\n", id, task.ID)s.results <- task.Action()// 如果没有高优先级任务,检查中优先级default:select {case <-s.done:returncase task := <-s.highPriority:fmt.Printf("Worker %d: processing high priority task %d\n", id, task.ID)s.results <- task.Action()case task := <-s.mediumPriority:fmt.Printf("Worker %d: processing medium priority task %d\n", id, task.ID)s.results <- task.Action()// 如果没有中优先级任务,检查低优先级default:select {case <-s.done:returncase task := <-s.highPriority:fmt.Printf("Worker %d: processing high priority task %d\n", id, task.ID)s.results <- task.Action()case task := <-s.mediumPriority:fmt.Printf("Worker %d: processing medium priority task %d\n", id, task.ID)s.results <- task.Action()case task := <-s.lowPriority:fmt.Printf("Worker %d: processing low priority task %d\n", id, task.ID)s.results <- task.Action()}}}}
}// 提交任务
func (s *PriorityScheduler) SubmitTask(task Task) error {var targetChan chan Taskswitch task.Priority {case PriorityHigh:targetChan = s.highPrioritycase PriorityMedium:targetChan = s.mediumPrioritycase PriorityLow:targetChan = s.lowPrioritydefault:return fmt.Errorf("invalid priority level: %d", task.Priority)}select {case targetChan <- task:return nilcase <-s.done:return fmt.Errorf("scheduler is stopped")}
}// 获取结果
func (s *PriorityScheduler) Results() <-chan error {return s.results
}// 停止调度器
func (s *PriorityScheduler) Stop() {close(s.done)s.wg.Wait()close(s.highPriority)close(s.mediumPriority)close(s.lowPriority)close(s.results)
}// 创建模拟任务
func createTask(id int, priority int, duration time.Duration) Task {return Task{ID:       id,Priority: priority,Action: func() error {time.Sleep(duration)if rand.Float32() < 0.1 { // 10%的失败率return fmt.Errorf("task %d failed", id)}return nil},}
}func main() {// 创建调度器scheduler := NewPriorityScheduler()scheduler.Start(3)// 创建一些测试任务var tasks []Taskfor i := 0; i < 15; i++ {priority := i % 3 // 在三个优先级之间循环duration := time.Millisecond * time.Duration(rand.Intn(500)+100)tasks = append(tasks, createTask(i, priority, duration))}// 随机打乱任务顺序rand.Shuffle(len(tasks), func(i, j int) {tasks[i], tasks[j] = tasks[j], tasks[i]})// 提交任务for _, task := range tasks {if err := scheduler.SubmitTask(task); err != nil {fmt.Printf("Failed to submit task %d: %v\n", task.ID, err)continue}fmt.Printf("Submitted task %d with priority %d\n", task.ID, task.Priority)}// 收集结果var wg sync.WaitGroupwg.Add(1)go func() {defer wg.Done()completed := 0failures := 0for err := range scheduler.Results() {if err != nil {failures++fmt.Printf("Task failed: %v\n", err)}completed++if completed >= len(tasks) {break}}fmt.Printf("\nProcessing completed: %d tasks total, %d failures\n",completed, failures)}()// 等待所有任务完成wg.Wait()// 停止调度器scheduler.Stop()fmt.Println("Main: scheduler stopped")
}

让我们继续完成优先级控制的示例代码:

3.1 优先级控制流程图

在这里插入图片描述

4. 性能考虑

4.1 Select性能优化建议

  1. case数量控制
  • select中的case数量会影响性能
  • 建议控制在合理范围内(通常不超过5-10个)
  1. channel缓冲区
  • 适当使用带缓冲的channel可以提高性能
  • 避免频繁的阻塞和唤醒
  1. default分支使用
  • 合理使用default避免无谓的阻塞
  • 考虑轮询间隔,避免CPU空转

让我们实现一个性能优化的示例:

package mainimport ("fmt""runtime""sync""sync/atomic""time"
)// 性能统计
type Stats struct {processed uint64dropped   uint64blocked   uint64
}// 批处理器
type BatchProcessor struct {input     chan interface{}output    chan []interface{}done      chan struct{}stats     *StatsbatchSize intmaxWait   time.Duration
}// 创建新的批处理器
func NewBatchProcessor(batchSize int, maxWait time.Duration) *BatchProcessor {return &BatchProcessor{input:     make(chan interface{}, batchSize*2),output:    make(chan []interface{}, batchSize),done:      make(chan struct{}),stats:     &Stats{},batchSize: batchSize,maxWait:   maxWait,}
}// 启动处理
func (p *BatchProcessor) Start(workers int) {for i := 0; i < workers; i++ {go p.worker(i)}// 启动统计打印go p.printStats()
}// 工作协程
func (p *BatchProcessor) worker(id int) {batch := make([]interface{}, 0, p.batchSize)timer := time.NewTimer(p.maxWait)defer timer.Stop()for {// 重置计时器if !timer.Stop() {select {case <-timer.C:default:}}timer.Reset(p.maxWait)// 优化的批处理逻辑for len(batch) < p.batchSize {select {case <-p.done:returncase item := <-p.input:batch = append(batch, item)atomic.AddUint64(&p.stats.processed, 1)case <-timer.C:// 达到最大等待时间,处理当前批次if len(batch) > 0 {p.processBatch(batch)batch = batch[:0]}atomic.AddUint64(&p.stats.blocked, 1)continuedefault:// 如果输入队列为空且已有数据,立即处理if len(batch) > 0 {p.processBatch(batch)batch = batch[:0]}// 短暂休眠避免CPU空转runtime.Gosched()continue}// 批次满了就处理if len(batch) >= p.batchSize {p.processBatch(batch)batch = batch[:0]}}}
}// 处理批次数据
func (p *BatchProcessor) processBatch(batch []interface{}) {// 创建副本避免数据竞争output := make([]interface{}, len(batch))copy(output, batch)// 尝试发送处理结果select {case p.output <- output:// 成功发送default:// 输出channel满了,增加丢弃计数atomic.AddUint64(&p.stats.dropped, uint64(len(batch)))}
}// 提交数据
func (p *BatchProcessor) Submit(item interface{}) error {select {case p.input <- item:return nilcase <-p.done:return fmt.Errorf("processor is stopped")default:atomic.AddUint64(&p.stats.dropped, 1)return fmt.Errorf("input channel full")}
}// 获取输出通道
func (p *BatchProcessor) Output() <-chan []interface{} {return p.output
}// 定期打印统计信息
func (p *BatchProcessor) printStats() {ticker := time.NewTicker(time.Second)defer ticker.Stop()var lastProcessed, lastDropped, lastBlocked uint64for {select {case <-p.done:returncase <-ticker.C:processed := atomic.LoadUint64(&p.stats.processed)dropped := atomic.LoadUint64(&p.stats.dropped)blocked := atomic.LoadUint64(&p.stats.blocked)fmt.Printf("Stats - Processed: %d/s, Dropped: %d/s, Blocked: %d/s\n",processed-lastProcessed,dropped-lastDropped,blocked-lastBlocked)lastProcessed = processedlastDropped = droppedlastBlocked = blocked}}
}// 停止处理器
func (p *BatchProcessor) Stop() {close(p.done)
}func main() {// 创建批处理器processor := NewBatchProcessor(100, time.Millisecond*50)processor.Start(3)// 模拟高速数据提交var wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(id int) {defer wg.Done()for j := 0; j < 10000; j++ {data := fmt.Sprintf("Data-%d-%d", id, j)processor.Submit(data)time.Sleep(time.Microsecond * time.Duration(50+id*10))}}(i)}// 处理输出go func() {for batch := range processor.Output() {// 这里可以进行批量处理,比如写入数据库fmt.Printf("Received batch of size %d\n", len(batch))}}()// 等待提交完成wg.Wait()time.Sleep(time.Second) // 等待最后的处理完成// 停止处理器processor.Stop()fmt.Println("Main: processing completed")
}

4.2 性能优化要点

  1. 避免过度使用select
  • 只在必要的地方使用select
  • 考虑其他并发控制方式
  1. channel设计优化
  • 合理设置缓冲区大小
  • 避免频繁的channel创建和关闭
  1. goroutine管理
  • 控制goroutine数量
  • 实现优雅的退出机制
  1. 内存优化
  • 重用切片和对象
  • 避免不必要的内存分配

总结

核心要点

  1. Select实现原理
  • 随机选择机制
  • 阻塞和非阻塞模式
  • 多路复用特性
  1. 超时处理
  • 超时控制方法
  • 资源释放保证
  • 错误处理机制
  1. 优先级控制
  • 优先级实现方式
  • 任务调度策略
  • 公平性保证
  1. 性能优化
  • select使用建议
  • channel优化
  • 资源管理

怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!


http://www.ppmy.cn/devtools/136633.html

相关文章

光伏电站项目-视频监控、微气象及安全警卫系统

一、项目背景 近年来&#xff0c;我国光伏发电持续快速发展。截止2019年5月装机总容量超过2.043亿千瓦&#xff0c;技术水平不断提升&#xff0c;成本显著降低&#xff0c;开发建设质量和消纳利用明显改善&#xff0c;在部分地区实现了家庭分布式光伏并入电网&#xff0c;为建…

MATLAB 2024a安装包下载及安装教程

[安装环境]: Win 11/Win 10 MATLAB和Mathematica、Maple并称为三大数学软件。它在数学类科技应用软件中在数值计算方面首屈一指。行矩阵运算、绘制函数和数据、实现算法、创建用户界面、连接其他编程语言的程序等。MATLAB的基本数据单位是矩阵&#xff0c;它的指令表达式与数学…

【入门篇】哥德巴赫猜想——多语言求解版

# [NOIP2004 提高组] 津津的储蓄计划 题目描述 津津的零花钱一直都是自己管理。每个月的月初妈妈给津津 300 300 300 元钱&#xff0c;津津会预算这个月的花销&#xff0c;并且总能做到实际花销和预算的相同。 为了让津津学习如何储蓄&#xff0c;妈妈提出&#xff0c;津津…

嵌入式LVGL自定义纯数字键盘

嵌入式LVGL自定义纯数字键盘 一、前言二、设置自定义数字键盘三、使用一、前言 嵌入式UI项目中有时候会使用到纯数字密码的需求,所以打算使用LVGL构建自定义的纯数字键盘。 二、设置自定义数字键盘 参考这个文章,以LV_KEYBOARD_MODE_USER_1为例,增加一个数字键盘,如下图所…

shell

第四章 shell中的变量 4.1 系统变量 1.常用系统变量 $HOME ,$PWD,$SHELL ,$USER 4.2 自定义变量 1.变量值&#xff08;等号两边没有空格&#xff09; 2.撤销变量&#xff1a;unset变量 3.声明静态变量&#xff1a;readonly 变量&#xff0c;注意&#xff1a;不能unset 4.变…

ROS机器视觉入门:从基础到人脸识别与目标检测

前言 从本文开始&#xff0c;我们将开始学习ROS机器视觉处理&#xff0c;刚开始先学习一部分外围的知识&#xff0c;为后续的人脸识别、目标跟踪和YOLOV5目标检测做准备工作。我采用的笔记本是联想拯救者游戏本&#xff0c;系统采用Ubuntu20.04&#xff0c;ROS采用noetic。 颜…

03 —— Webpack 自动生成 html 文件

HtmlWebpackPlugin | webpack 中文文档 | webpack中文文档 | webpack中文网 安装 npm install --save-dev html-webpack-plugin 下载html-webpack-plugin本地软件包 npm i html-webpack-plugin --save-dev 配置webpack.config.js让webpack拥有插件功能 const HtmlWebpack…

Kafka - 消费者程序仅消费一半分区消息的问题

1. 问题描述 修改安全服务状态有时逻辑正常有时候逻辑不正常&#xff0c;排查incident服务的日志发现消息可以正常发送到 kafka topic &#xff0c;但是incident-cron 服务有时候有拉取消息的日志有时候没有日志。 kafka 生产者可以将消息正常发送到 kafka topic &#xff0c…