40分钟学 Go 语言高并发:Context包与并发控制

ops/2024/11/24 9:05:44/

Context包与并发控制

学习目标

知识点掌握程度应用场景
context原理深入理解实现机制并发控制和请求链路追踪
超时控制掌握超时设置和处理API请求超时、任务限时控制
取消信号传播理解取消机制和传播链优雅退出、资源释放
context最佳实践掌握使用规范和技巧工程实践中的常见场景

1. Context原理

1.1 Context基本结构和实现

让我们先看一个完整的Context使用示例:

package mainimport ("context""fmt""log""time"
)// 请求追踪信息
type RequestInfo struct {TraceID    stringSessionID  stringStartTime  time.Time
}// 服务接口
type Service interface {HandleRequest(ctx context.Context, req string) (string, error)
}// 业务服务实现
type BusinessService struct {name string
}func NewBusinessService(name string) *BusinessService {return &BusinessService{name: name}
}// 处理请求
func (s *BusinessService) HandleRequest(ctx context.Context, req string) (string, error) {// 获取请求追踪信息info, ok := ctx.Value("request-info").(*RequestInfo)if !ok {return "", fmt.Errorf("request info not found in context")}log.Printf("[%s] Processing request: %s, TraceID: %s, Session: %s\n",s.name, req, info.TraceID, info.SessionID)// 模拟处理过程select {case <-time.After(2 * time.Second):return fmt.Sprintf("Result for %s", req), nilcase <-ctx.Done():return "", ctx.Err()}
}// 请求中间件
func requestMiddleware(next Service) Service {return &middlewareService{next: next}
}type middlewareService struct {next Service
}func (m *middlewareService) HandleRequest(ctx context.Context, req string) (string, error) {// 开始时间startTime := time.Now()// 添加请求信息到contextinfo := &RequestInfo{TraceID:   fmt.Sprintf("trace-%d", time.Now().UnixNano()),SessionID: fmt.Sprintf("session-%d", time.Now().Unix()),StartTime: startTime,}ctx = context.WithValue(ctx, "request-info", info)// 调用下一个处理器result, err := m.next.HandleRequest(ctx, req)// 记录处理时间duration := time.Since(startTime)log.Printf("Request completed in %v, TraceID: %s\n", duration, info.TraceID)return result, err
}func main() {// 创建服务service := requestMiddleware(NewBusinessService("UserService"))// 创建基础contextctx := context.Background()// 添加超时控制ctx, cancel := context.WithTimeout(ctx, 3*time.Second)defer cancel()// 处理请求result, err := service.HandleRequest(ctx, "get user profile")if err != nil {log.Printf("Request failed: %v\n", err)return}log.Printf("Request succeeded: %s\n", result)// 模拟超时场景ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)defer cancel()result, err = service.HandleRequest(ctx, "get user settings")if err != nil {log.Printf("Request failed: %v\n", err)return}
}

2. 超时控制

让我们实现一个带有超时控制的HTTP服务:

package mainimport ("context""encoding/json""fmt""log""net/http""time"
)// 响应结构
type Response struct {Data  interface{} `json:"data,omitempty"`Error string      `json:"error,omitempty"`
}// 服务配置
type ServiceConfig struct {Timeout        time.DurationMaxConcurrent  intRetryAttempts  intRetryDelay     time.Duration
}// HTTP客户端包装器
type HTTPClient struct {client  *http.Clientconfig  ServiceConfiglimiter chan struct{} // 并发限制器
}// 创建新的HTTP客户端
func NewHTTPClient(config ServiceConfig) *HTTPClient {return &HTTPClient{client: &http.Client{Timeout: config.Timeout,},config: config,limiter: make(chan struct{}, config.MaxConcurrent),}
}// 发送HTTP请求
func (c *HTTPClient) DoRequest(ctx context.Context, method, url string) (*Response, error) {var lastErr errorfor attempt := 0; attempt <= c.config.RetryAttempts; attempt++ {select {case <-ctx.Done():return nil, ctx.Err()case c.limiter <- struct{}{}: // 获取并发许可}// 确保释放并发许可defer func() {<-c.limiter}()// 创建请求req, err := http.NewRequestWithContext(ctx, method, url, nil)if err != nil {return nil, fmt.Errorf("create request failed: %w", err)}// 设置请求超时reqCtx, cancel := context.WithTimeout(ctx, c.config.Timeout)defer cancel()// 执行请求resp, err := c.client.Do(req.WithContext(reqCtx))if err != nil {lastErr = errlog.Printf("Request failed (attempt %d): %v\n", attempt+1, err)// 如果不是最后一次尝试,等待后重试if attempt < c.config.RetryAttempts {select {case <-ctx.Done():return nil, ctx.Err()case <-time.After(c.config.RetryDelay):continue}}continue}defer resp.Body.Close()// 解析响应var result Responseif err := json.NewDecoder(resp.Body).Decode(&result); err != nil {return nil, fmt.Errorf("decode response failed: %w", err)}return &result, nil}return nil, fmt.Errorf("all retry attempts failed, last error: %v", lastErr)
}// 处理HTTP请求的处理器
func handleRequest(w http.ResponseWriter, r *http.Request) {// 创建contextctx := r.Context()// 模拟长时间处理select {case <-time.After(2 * time.Second):response := Response{Data: "Request processed successfully",}json.NewEncoder(w).Encode(response)case <-ctx.Done():response := Response{Error: "Request timeout",}w.WriteHeader(http.StatusGatewayTimeout)json.NewEncoder(w).Encode(response)}
}func main() {// 配置HTTP客户端config := ServiceConfig{Timeout:       5 * time.Second,MaxConcurrent: 10,RetryAttempts: 3,RetryDelay:    time.Second,}client := NewHTTPClient(config)// 创建HTTP服务器http.HandleFunc("/api", handleRequest)// 启动服务器go func() {log.Println("Server starting on :8080")if err := http.ListenAndServe(":8080", nil); err != nil {log.Fatal(err)}}()// 等待服务器启动time.Sleep(time.Second)// 测试请求ctx := context.Background()// 测试正常请求resp, err := client.DoRequest(ctx, "GET", "http://localhost:8080/api")if err != nil {log.Printf("Request failed: %v\n", err)} else {log.Printf("Response: %+v\n", resp)}// 测试超时请求ctx, cancel := context.WithTimeout(ctx, time.Second)defer cancel()resp, err = client.DoRequest(ctx, "GET", "http://localhost:8080/api")if err != nil {log.Printf("Request failed (expected): %v\n", err)} else {log.Printf("Response: %+v\n", resp)}// 保持主程序运行select {}
}
package mainimport ("context""fmt""math/rand""sync""time"
)// 请求处理器
type RequestHandler struct {requests  chan Requestresponses chan Responsedone      chan struct{}wg        sync.WaitGroup
}// 请求结构
type Request struct {ID      intTimeout time.DurationData    string
}// 响应结构
type Response struct {RequestID intResult    stringError     error
}// 创建新的请求处理器
func NewRequestHandler() *RequestHandler {return &RequestHandler{requests:  make(chan Request, 100),responses: make(chan Response, 100),done:      make(chan struct{}),}
}// 启动处理器
func (h *RequestHandler) Start(workers int) {for i := 0; i < workers; i++ {h.wg.Add(1)go h.worker(i)}
}// 工作协程
func (h *RequestHandler) worker(id int) {defer h.wg.Done()for {select {case req, ok := <-h.requests:if !ok {fmt.Printf("Worker %d: request channel closed\n", id)return}// 创建context用于超时控制ctx, cancel := context.WithTimeout(context.Background(), req.Timeout)// 处理请求response := h.processRequest(ctx, req)// 发送响应select {case h.responses <- response:fmt.Printf("Worker %d: sent response for request %d\n", id, req.ID)case <-h.done:cancel()return}cancel() // 清理contextcase <-h.done:fmt.Printf("Worker %d: received stop signal\n", id)return}}
}// 处理单个请求
func (h *RequestHandler) processRequest(ctx context.Context, req Request) Response {// 模拟处理时间processTime := time.Duration(rand.Intn(int(req.Timeout))) + req.Timeout/2select {case <-time.After(processTime):return Response{RequestID: req.ID,Result:   fmt.Sprintf("Processed: %s", req.Data),}case <-ctx.Done():return Response{RequestID: req.ID,Error:    ctx.Err(),}}
}// 提交请求
func (h *RequestHandler) SubmitRequest(req Request) error {select {case h.requests <- req:return nilcase <-h.done:return fmt.Errorf("handler is stopped")}
}// 获取响应
func (h *RequestHandler) GetResponse() (Response, error) {select {case resp := <-h.responses:return resp, nilcase <-h.done:return Response{}, fmt.Errorf("handler is stopped")}
}// 停止处理器
func (h *RequestHandler) Stop() {close(h.done)h.wg.Wait()close(h.requests)close(h.responses)
}func main() {// 创建请求处理器handler := NewRequestHandler()handler.Start(3)// 发送一些测试请求requests := []Request{{ID: 1, Timeout: time.Second, Data: "Fast request"},{ID: 2, Timeout: time.Second * 2, Data: "Normal request"},{ID: 3, Timeout: time.Millisecond * 500, Data: "Quick request"},{ID: 4, Timeout: time.Second * 3, Data: "Slow request"},}// 提交请求for _, req := range requests {if err := handler.SubmitRequest(req); err != nil {fmt.Printf("Failed to submit request %d: %v\n", req.ID, err)continue}fmt.Printf("Submitted request %d\n", req.ID)}// 收集响应var wg sync.WaitGroupwg.Add(1)go func() {defer wg.Done()for i := 0; i < len(requests); i++ {resp, err := handler.GetResponse()if err != nil {fmt.Printf("Failed to get response: %v\n", err)continue}if resp.Error != nil {fmt.Printf("Request %d failed: %v\n", resp.RequestID, resp.Error)} else {fmt.Printf("Request %d succeeded: %s\n", resp.RequestID, resp.Result)}}}()// 等待所有响应处理完成wg.Wait()// 停止处理器handler.Stop()fmt.Println("Main: processing completed")
}

3. 取消信号传播

让我们继续完成取消信号传播的示例代码:

package mainimport ("context""fmt""log""sync""time"
)// 任务定义
type Task struct {ID      stringProcess func(ctx context.Context) (interface{}, error)
}// 工作池
type WorkerPool struct {workers    inttasks      chan Taskresults    chan interface{}errors     chan errordone       chan struct{}wg         sync.WaitGroup
}// 创建新的工作池
func NewWorkerPool(workers int) *WorkerPool {return &WorkerPool{workers: workers,tasks:   make(chan Task, workers*2),results: make(chan interface{}, workers*2),errors:  make(chan error, workers*2),done:    make(chan struct{}),}
}// 启动工作池
func (p *WorkerPool) Start(ctx context.Context) {// 启动workersfor i := 0; i < p.workers; i++ {p.wg.Add(1)go p.worker(ctx, i)}// 等待所有worker完成go func() {p.wg.Wait()close(p.done)close(p.results)close(p.errors)}()
}// worker处理任务
func (p *WorkerPool) worker(ctx context.Context, id int) {defer p.wg.Done()log.Printf("Worker %d started\n", id)for {select {case <-ctx.Done():log.Printf("Worker %d stopped: %v\n", id, ctx.Err())returncase task, ok := <-p.tasks:if !ok {log.Printf("Worker %d: task channel closed\n", id)return}log.Printf("Worker %d processing task %s\n", id, task.ID)// 创建任务专用的contexttaskCtx, cancel := context.WithTimeout(ctx, 5*time.Second)// 执行任务result, err := task.Process(taskCtx)cancel() // 释放任务context资源if err != nil {select {case p.errors <- fmt.Errorf("task %s failed: %w", task.ID, err):case <-ctx.Done():return}} else {select {case p.results <- result:case <-ctx.Done():return}}}}
}// 提交任务
func (p *WorkerPool) Submit(task Task) error {select {case p.tasks <- task:return nilcase <-p.done:return fmt.Errorf("worker pool is closed")}
}// 关闭工作池
func (p *WorkerPool) Close() {close(p.tasks)
}// 获取结果通道
func (p *WorkerPool) Results() <-chan interface{} {return p.results
}// 获取错误通道
func (p *WorkerPool) Errors() <-chan error {return p.errors
}func main() {// 创建根contextctx, cancel := context.WithCancel(context.Background())defer cancel()// 创建工作池pool := NewWorkerPool(3)pool.Start(ctx)// 创建模拟任务tasks := []Task{{ID: "task-1",Process: func(ctx context.Context) (interface{}, error) {select {case <-time.After(2 * time.Second):return "Task 1 completed", nilcase <-ctx.Done():return nil, ctx.Err()}},},{ID: "task-2",Process: func(ctx context.Context) (interface{}, error) {select {case <-time.After(3 * time.Second):return "Task 2 completed", nilcase <-ctx.Done():return nil, ctx.Err()}},},{ID: "task-3",Process: func(ctx context.Context) (interface{}, error) {select {case <-time.After(1 * time.Second):return nil, fmt.Errorf("task 3 failed")case <-ctx.Done():return nil, ctx.Err()}},},}// 提交任务for _, task := range tasks {if err := pool.Submit(task); err != nil {log.Printf("Failed to submit task %s: %v\n", task.ID, err)}}// 等待3秒后取消所有任务go func() {time.Sleep(3 * time.Second)log.Println("Cancelling all tasks...")cancel()}()// 收集结果和错误completed := 0expected := len(tasks)for completed < expected {select {case result, ok := <-pool.Results():if !ok {continue}log.Printf("Got result: %v\n", result)completed++case err, ok := <-pool.Errors():if !ok {continue}log.Printf("Got error: %v\n", err)completed++case <-ctx.Done():log.Printf("Main: context cancelled: %v\n", ctx.Err())completed = expected // 强制退出循环}}// 关闭工作池pool.Close()// 等待工作池完全关闭<-pool.donelog.Println("All workers stopped")
}

3.1 取消信号传播流程图

在这里插入图片描述

4. Context最佳实践

4.1 Context使用规范

  1. 函数调用链传递
// 推荐
func HandleRequest(ctx context.Context, req *Request) error// 不推荐
func HandleRequest(timeout time.Duration, req *Request) error
  1. Context应作为第一个参数
// 推荐
func ProcessTask(ctx context.Context, task *Task) error// 不推荐
func ProcessTask(task *Task, ctx context.Context) error
  1. 不要储存Context在结构体中
// 不推荐
type Service struct {ctx context.Context
}// 推荐
type Service struct {// 其他字段
}func (s *Service) DoWork(ctx context.Context) error

4.2 Context使用注意事项

  1. 不要将nil传递给context参数
// 推荐
ctx := context.Background()
ProcessTask(ctx, task)// 不推荐
ProcessTask(nil, task)
  1. context.Value应该只用于请求作用域数据
// 推荐
ctx = context.WithValue(ctx, "request-id", requestID)// 不推荐 - 配置信息应该通过其他方式传递
ctx = context.WithValue(ctx, "db-config", dbConfig)
  1. 正确处理取消信号
select {
case <-ctx.Done():return ctx.Err()
default:// 继续处理
}

4.3 实践建议

  1. 超时控制
  • 设置合理的超时时间
  • 在不同层级使用不同的超时时间
  • 确保资源正确释放
  1. 错误处理
  • 区分超时和取消错误
  • 传递有意义的错误信息
  • 实现优雅降级
  1. 性能优化
  • 避免创建过多的context
  • 合理使用context.Value
  • 及时取消不需要的操作
  1. 日志追踪
  • 记录关键操作的耗时
  • 追踪请求的完整链路
  • 记录取消原因

总结

关键点回顾

  1. Context原理
  • 继承关系
  • 值传递机制
  • 生命周期管理
  1. 超时控制
  • 设置超时时间
  • 处理超时信号
  • 资源清理
  1. 取消信号传播
  • 信号传递机制
  • 取消处理流程
  • 资源释放
  1. 最佳实践
  • 使用规范
  • 注意事项
  • 优化建议

实践建议

  1. 代码规范
  • 遵循命名约定
  • 合理组织代码结构
  • 添加必要的注释
  1. 错误处理
  • 使用有意义的错误信息
  • 实现错误恢复机制
  • 记录错误日志
  1. 性能优化
  • 减少不必要的context创建
  • 避免context.Value滥用
  • 及时释放资源

怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!


http://www.ppmy.cn/ops/136273.html

相关文章

安科瑞电气股份有限公司环保用电监管综合云平台GetEnterpriseInfoById存在SQL注入漏洞

免责声明: 本文旨在提供有关特定漏洞的深入信息,帮助用户充分了解潜在的安全风险。发布此信息的目的在于提升网络安全意识和推动技术进步,未经授权访问系统、网络或应用程序,可能会导致法律责任或严重后果。因此,作者不对读者基于本文内容所采取的任何行为承担责任。读者在…

躺平成长-腾讯云数据库(又消失了一次)

开源竞争&#xff1a; 当你无法彻底掌握技术的时候&#xff0c;你就开源这个技术&#xff0c;形成更多的技术依赖&#xff0c;你会说 这不就是在砸罐子吗&#xff1f;一个行业里面总会有人砸罐子的&#xff0c;你不如先砸罐子&#xff0c;还能听个响声。 数据库的里面清洁的数据…

「Mac玩转仓颉内测版28」基础篇8 - 元组类型详解

本篇将介绍 Cangjie 中的元组类型&#xff0c;包括元组的定义、创建、访问、数据解构以及应用场景&#xff0c;帮助开发者掌握元组类型的使用。 关键词 元组类型定义元组创建元组访问数据解构应用场景 一、元组类型概述 在 Cangjie 中&#xff0c;元组是一种用于存储多种数据…

Ubuntu24.04LTS设置root用户可远程登录

Ubuntu24.04LTS设置root用户可远程登录 文章目录 Ubuntu24.04LTS设置root用户可远程登录1. 设置root密码2. 设置root用户可远程登录1. 查看ssh服务是否安装2. 安装ssh服务3. 再次查看ssh服务是否安装4. 配置ssh文件5. 重启ssh服务6. root远程登录 1. 设置root密码 Ubuntu安装后…

Python Tornado框架教程:高性能Web框架的全面解析

Python Tornado框架教程&#xff1a;高性能Web框架的全面解析 引言 在现代Web开发中&#xff0c;选择合适的框架至关重要。Python的Tornado框架因其高性能和非阻塞I/O特性而备受青睐。它特别适合处理大量并发连接的应用&#xff0c;比如聊天应用、实时数据处理和WebSocket服务…

标题gitLab如何打标签

标题gitLab打标签 1、首先进入到项目里面&#xff0c;找到Repository下的Tages&#xff0c;点击进入 如果是还没有创建过标签&#xff0c;会提示如何用命令创建 git tag -a v1.4 -m "version 1.4"2、也可以直接在界面创建&#xff0c;点击new Tag按钮 3、填写标签…

java实现生成PDF文件

目录 引言 iText 库 示例代码 Apache PDFBox 库 示例代码 总结 引言 在Java中实现生成PDF文件的功能&#xff0c;可以采用多种库来完成&#xff0c;其中最常用的是iText和Apache PDFBox。下面将详细介绍如何使用这两种库来生成一个简单的PDF文档&#xff0c;并且会包含一些…

维护在线重做日志(二)

迁移和重命名 可以使用操作系统命令重新定位重做日志&#xff0c;然后使用ALTER DATABASE语句使数据库知道它们的新名称&#xff08;位置&#xff09;。这个过程是必要的&#xff0c;例如&#xff0c;如果当前用于一些重做日志文件的磁盘将被删除&#xff0c;或者如果数据文件…