singleflight
singleflight 是一个用于防止重复函数调用的机制,确保对于同一个键(key),在同一时间内只有一个函数执行,其他请求会等待该函数执行完成并共享结果。这可以大量减少对比如访问数据库操作次数,减轻数据库压力提高性能。
方法
-
Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
执行一个函数 fn,如果当前有其他 goroutine 正在为相同的 key 执行该函数,则当前 goroutine 会等待其完成,并共享其结果。- 参数:
- key:用于标识请求的唯一键,相同的 key只会调用一次,并共享结果。
- fn: 需要要执行的函数,返回一个结果和一个错误
- 返回值
- v:函数 fn 的返回结果。
- err:函数 fn 执行过程中产生的错误。
- shared:表示结果是否是共享的,如果为 true 则表示结果是从其他正在执行相同 key 请求的 goroutine 处共享得到的。
- 参数:
-
DoChan(key string, fn func() (interface{}, error)) <-chan Result
与Do方法类似,但是返回channel -
Forget(key string)
从请求组中移除指定 key 的记录,之后再次对该 key 发起请求时,会重新执行函数 fn
示例
package mainimport ("fmt""golang.org/x/sync/singleflight""time"
)var g singleflight.Group// 模拟一个耗时的请求操作
func getData(key string) (interface{}, error) {fmt.Printf("Fetching data for key %s\n", key)time.Sleep(2 * time.Second) // 模拟耗时操作return "Data for " + key, nil
}func main() {// 模拟多个并发请求相同的 keyfor i := 0; i < 5; i++ {go func() {data, err, shared := g.Do("key1", func() (interface{}, error) {return getData("key1")})if err != nil {fmt.Printf("Error: %v\n", err)} else {if shared {fmt.Printf("Result for key1 was shared. Data: %s\n", data.(string))} else {fmt.Printf("Result for key1 was fetched. Data: %s\n", data.(string))}}}()}// 等待所有 goroutine 完成time.Sleep(3 * time.Second)
}
errgroup
errgroup 用于并发执行多个任务,并在其中一个任务出错时快速返回错误,同时等待所有任务完成。
结构
type Group struct {// 取消函数,借助 context.WithCancel 或者 context.WithTimeout 等函数创建上下文时生成。当 Group 里的某个任务返回错误时,cancel 函数会被调用cancel func(error)// go 同步原语wg sync.WaitGroup// sem 是一个通道,用于控制并发任务的数量。sem chan token// 确保第一个出错的任务会被记录到err字段中errOnce sync.Once// 保存第一个出错信息err error
}
方法
- WithContext(ctx context.Context) (*Group, context.Context):返回一个 *Group 指针和一个新的上下文。新的上下文会在 Group 中的某个任务返回错误时被取消。
- (g *Group) Wait() error :会阻塞直到所有goroutine方法完成,如果有其中一个goroutine返回error,Wait会返回该error。如果所有 goroutine 都成功完成,Wait 方法返回 nil
- (g *Group) Go(f func() error) :用于启动一个新的 goroutine 来执行传入的函数。如果该函数返回非 nil 错误,Group 的 Wait 方法会立即返回该错误。
- (g *Group) TryGo(f func() error) bool 与 Go 方法不同的是,TryGo 不会阻塞,如果当前没有可用的并发资源(受到 SetLimit 设置的限制),它会立即返回 false,而不是等待资源可用。下面为你详细介绍其用法:
基本原型 - (g *Group) SetLimit(n int) SetLimit 方法用于设置并发执行任务的最大数量,避免因同时运行过多的 goroutine 而耗尽系统资源
代码示例
package mainimport ("context""fmt""golang.org/x/sync/errgroup""net/http""time"
)// fetchURL 函数用于发起 HTTP 请求,并根据上下文进行取消控制
// 参数 ctx 是上下文,用于控制请求的生命周期
// 参数 url 是要请求的目标 URL
// 返回值是可能出现的错误
func fetchURL(ctx context.Context, url string) error {// 创建一个带有上下文的 HTTP 请求req, err := http.NewRequestWithContext(ctx, "GET", url, nil)if err != nil {return err}// 创建一个 HTTP 客户端,并设置超时时间为 3 秒client := &http.Client{Timeout: 3 * time.Second}// 发起 HTTP 请求resp, err := client.Do(req)if err != nil {return err}// 确保响应体在使用完后关闭,避免资源泄漏defer resp.Body.Close()// 打印请求的 URL 以及响应的状态码fmt.Printf("Fetched %s, status: %s\n", url, resp.Status)return nil
}func main() {// 创建一个带有 5 秒超时的上下文// context.Background() 返回一个空的上下文,作为基础上下文// context.WithTimeout 基于基础上下文创建一个带有 5 秒超时的新上下文// cancel 是一个取消函数,用于手动取消上下文ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)// 确保在 main 函数结束时调用 cancel 函数,避免资源泄漏defer cancel()// 使用 WithContext 创建一个 errgroup.Group 实例和一个新的上下文// 新的上下文会在 errgroup 中的某个任务返回错误时被取消g, ctx := errgroup.WithContext(ctx)// 定义要请求的 URL 列表urls := []string{"https://www.google.com","https://www.github.com","https://www.nonexistentwebsite.com", // 这个 URL 可能会请求失败}// 遍历 URL 列表,为每个 URL 启动一个 goroutine 进行请求for _, url := range urls {// 为了避免闭包问题,创建一个局部变量url := url// 使用 Go 方法启动一个新的 goroutine 来执行 fetchURL 函数// 每个 goroutine 都会并发执行 fetchURL 函数g.Go(func() error {return fetchURL(ctx, url)})}// 等待所有 goroutine 完成// Wait 方法会阻塞,直到所有通过 Go 方法启动的 goroutine 完成// 如果其中有任何一个 goroutine 返回错误,Wait 方法会立即返回该错误if err := g.Wait(); err != nil {fmt.Printf("Encountered an error: %v\n", err)} else {fmt.Println("All requests completed successfully.")}
}
semaphore
信号量(Semaphore),用于控制对有限资源的并发访问。信号量维护了一组许可,线程(goroutine)在访问资源前需要先获取许可,使用完资源后释放许可。
结构
type waiter struct {n int64 //请求的资源数量ready chan<- struct{} // 当资源可用时关闭该通道,通知等待着
}type Weighted struct {size int64 //最大资源数量cur int64 //当前已使用的资源数量mu sync.Mutex //互斥锁,保护共享资源waiters list.List //等待者队列,存储等待获取资源的 waiter
}
方法
- NewWeighted(n int64) *Weighted 创建一个运行并发数为n的信号量
- (s *Weighted) Acquire(ctx context.Context, n int64) 申请n个信号量,如果没有足够的许可,程序会阻塞直到获取到足够的许可或者ctx 被取消或者超时。如果成功返回nil,失败返回err
- (s *Weighted) TryAcquire(n int64) bool 尝试立即获取 n 个权重的许可,如果有足够的许可则获取并返回 true,否则返回 false。
- (s *Weighted) Release(n int64) 释放n个许可
代码示例
package mainimport ("context""fmt""golang.org/x/sync/semaphore""time"
)func main() {// 创建一个信号量,限制同时可以访问共享资源的 goroutine 数量为 2sem := semaphore.NewWeighted(2)// 创建一个上下文,用于控制并发任务的生命周期ctx := context.Background()// 启动 5 个 goroutine 来并发地访问共享资源for i := 0; i < 5; i++ {i := i // 捕获循环变量go func() {// 获取信号量if err := sem.Acquire(ctx, 1); err != nil {fmt.Printf("Goroutine %d failed to acquire semaphore: %v\n", i, err)return}defer sem.Release(1) // 确保在函数退出时释放信号量// 模拟访问共享资源fmt.Printf("Goroutine %d is accessing the shared resource.\n", i)time.Sleep(time.Second)fmt.Printf("Goroutine %d has finished accessing the shared resource.\n", i)}()}// 等待所有 goroutine 完成time.Sleep(6 * time.Second)
}