目录
- 基础案例:简单的并发下载器
- 进阶案例:高并发网站访问统计
- 实战案例:分布式任务调度系统
基础案例:简单的并发下载器
问题描述
需要同时下载多个文件,使用并发方式提高下载效率。
实现代码
package mainimport ("fmt""io""net/http""os""sync"
)func downloadFile(url string, filename string, wg *sync.WaitGroup) {defer wg.Done()// 创建HTTP请求resp, err := http.Get(url)if err != nil {fmt.Printf("下载 %s 失败: %v\n", filename, err)return}defer resp.Body.Close()// 创建文件file, err := os.Create(filename)if err != nil {fmt.Printf("创建文件 %s 失败: %v\n", filename, err)return}defer file.Close()// 写入文件_, err = io.Copy(file, resp.Body)if err != nil {fmt.Printf("写入文件 %s 失败: %v\n", filename, err)return}fmt.Printf("文件 %s 下载完成\n", filename)
}func main() {urls := []string{"https://example.com/file1.zip","https://example.com/file2.zip","https://example.com/file3.zip",}var wg sync.WaitGroupfor i, url := range urls {wg.Add(1)filename := fmt.Sprintf("file%d.zip", i+1)go downloadFile(url, filename, &wg)}wg.Wait()fmt.Println("所有文件下载完成")
}
关键点解析
- 使用
sync.WaitGroup
管理并发下载任务 - 每个下载任务在独立的goroutine中执行
- 使用defer确保资源正确释放
- 基本的错误处理机制
进阶案例:高并发网站访问统计
问题描述
需要统计网站的实时访问量,包括总访问次数、独立IP数等指标。
实现代码
package mainimport ("fmt""net/http""sync""time"
)type VisitStats struct {mutex sync.RWMutextotalVisits int64uniqueIPs map[string]boollastMinute map[int64]int64 // 按秒记录最近一分钟的访问量
}func NewVisitStats() *VisitStats {return &VisitStats{uniqueIPs: make(map[string]bool),lastMinute: make(map[int64]int64),}
}func (vs *VisitStats) recordVisit(ip string) {vs.mutex.Lock()defer vs.mutex.Unlock()// 更新总访问量vs.totalVisits++// 记录唯一IPvs.uniqueIPs[ip] = true// 记录当前秒的访问量now := time.Now().Unix()vs.lastMinute[now]++// 清理一分钟前的数据vs.cleanOldData(now)
}func (vs *VisitStats) cleanOldData(now int64) {for timestamp := range vs.lastMinute {if now-timestamp > 60 {delete(vs.lastMinute, timestamp)}}
}func (vs *VisitStats) getStats() (int64, int, int64) {vs.mutex.RLock()defer vs.mutex.RUnlock()// 计算最近一分钟的访问量var lastMinuteVisits int64now := time.Now().Unix()for timestamp, count := range vs.lastMinute {if now-timestamp <= 60 {lastMinuteVisits += count}}return vs.totalVisits, len(vs.uniqueIPs), lastMinuteVisits
}func main() {stats := NewVisitStats()// 处理访问请求http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {ip := r.RemoteAddrstats.recordVisit(ip)fmt.Fprintf(w, "Welcome!")})// 定期打印统计信息go func() {for {total, unique, lastMin := stats.getStats()fmt.Printf("总访问量: %d, 唯一IP数: %d, 最近一分钟访问量: %d\n",total, unique, lastMin)time.Sleep(5 * time.Second)}}()http.ListenAndServe(":8080", nil)
}
关键点解析
- 使用读写锁
sync.RWMutex
提高并发性能 - 通过map记录唯一IP和时间戳数据
- 实现了滑动窗口统计最近一分钟的访问量
- 定期清理过期数据
实战案例:分布式任务调度系统
问题描述
实现一个支持高并发的分布式任务调度系统,具备任务分发、执行和监控功能。
实现代码
package mainimport ("context""fmt""sync""time"
)// 任务定义
type Task struct {ID stringPayload interface{}Priority int
}// 工作节点
type Worker struct {ID stringStatus stringTasks chan Task
}// 调度器
type Scheduler struct {workers map[string]*WorkertaskQueue chan TaskworkerPool chan *Workermutex sync.RWMutexctx context.Contextcancel context.CancelFunc
}func NewScheduler(workerCount int) *Scheduler {ctx, cancel := context.WithCancel(context.Background())s := &Scheduler{workers: make(map[string]*Worker),taskQueue: make(chan Task, 1000),workerPool: make(chan *Worker, workerCount),ctx: ctx,cancel: cancel,}// 初始化工作节点for i := 0; i < workerCount; i++ {worker := &Worker{ID: fmt.Sprintf("worker-%d", i),Status: "idle",Tasks: make(chan Task, 10),}s.workers[worker.ID] = workers.workerPool <- worker}return s
}func (s *Scheduler) Start() {// 任务分发go func() {for {select {case <-s.ctx.Done():returncase task := <-s.taskQueue:worker := <-s.workerPools.assignTask(worker, task)}}}()// 监控工作节点状态go s.monitorWorkers()
}func (s *Scheduler) assignTask(worker *Worker, task Task) {s.mutex.Lock()worker.Status = "busy"s.mutex.Unlock()go func() {worker.Tasks <- task// 模拟任务执行time.Sleep(time.Second * time.Duration(task.Priority))s.mutex.Lock()worker.Status = "idle"s.mutex.Unlock()s.workerPool <- worker}()
}func (s *Scheduler) monitorWorkers() {ticker := time.NewTicker(5 * time.Second)defer ticker.Stop()for {select {case <-s.ctx.Done():returncase <-ticker.C:s.mutex.RLock()for id, worker := range s.workers {fmt.Printf("Worker %s status: %s\n", id, worker.Status)}s.mutex.RUnlock()}}
}func main() {scheduler := NewScheduler(5)scheduler.Start()// 模拟提交任务go func() {for i := 0; i < 20; i++ {task := Task{ID: fmt.Sprintf("task-%d", i),Payload: fmt.Sprintf("payload-%d", i),Priority: i % 3 + 1,}scheduler.taskQueue <- tasktime.Sleep(time.Millisecond * 500)}}()// 运行一段时间后退出time.Sleep(time.Second * 30)scheduler.cancel()
}
关键点解析
- 使用context管理goroutine生命周期
- 实现了工作池模式提高资源利用率
- 使用channel实现任务队列和工作节点池
- 采用读写锁保护共享资源
- 实现了基本的监控功能
- 支持任务优先级
总结
通过这三个案例,我们循序渐进地展示了Go语言在并发编程中的应用:
- 基础案例展示了goroutine和WaitGroup的基本用法
- 进阶案例引入了更复杂的并发控制和数据结构
- 实战案例整合了多个并发特性,实现了一个完整的系统
在实际开发中,需要注意:
- 正确使用锁机制避免竞态条件
- 合理设计channel缓冲区大小
- 注意goroutine的生命周期管理
- 实现适当的错误处理和资源清理
- 考虑系统的可扩展性和维护性