ants连接池概述
参考文档:https://github.com/panjf2000/ants/blob/master/README_ZH.md
虽然Go的Goroutine非常强大,几乎可以随意创建,但是毕竟资源是有限的。当我们大批量重复使用Goroutine的时候,将会占用非常多的系统资源,这个时候Goroutine连接池就非常有用了。
ants是一个非常流行的Goroutine连接池工具,具备以下功能:
- 自动调度海量的 goroutines,复用 goroutines
- 定期清理过期的 goroutines,进一步节省资源
- 提供了大量有用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool
- 优雅处理 panic,防止程序崩溃
- 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有更高的性能
- 非阻塞机制
使用默认连接池
如何使用默认连接池提交任务
// 任务子函数 func demoFunc() { time.Sleep(10 * time.Millisecond) fmt.Println("Hello World!") } // 任务函数 var wg sync.WaitGroup syncCalculateSum := func() { demoFunc() wg.Done() } // 提交任务 ants.Submit(syncCalculateSum)
并发执行1000次无参方法
package main import ( "fmt" "sync" "time" "github.com/panjf2000/ants/v2" ) // 核心的方法 func demoFunc() { time.Sleep(10 * time.Millisecond) fmt.Println("Hello World!") } func main() { defer ants.Release() runTimes := 1000 // 使用通用的连接池 var wg sync.WaitGroup syncCalculateSum := func() { demoFunc() wg.Done() } // 提交指定次数的任务 for i := 0; i < runTimes; i++ { wg.Add(1) // 使用默认的连接池 _ = ants.Submit(syncCalculateSum) } wg.Wait() fmt.Printf("运行中的Goroutine数量: %d\n", ants.Running()) fmt.Printf("任务完成.\n") }
使用自定义连接池
package main import ( "fmt" "sync" "sync/atomic" "github.com/panjf2000/ants/v2" ) // 全局变量 var sum int32 // 核心的方法 func myFunc(i interface{}) { n := i.(int32) atomic.AddInt32(&sum, n) fmt.Printf("run with %d\n", n) } func main() { runTimes := 1000 // 使用通用的连接池 var wg sync.WaitGroup // 创建连接池 p, _ := ants.NewPoolWithFunc(10, func(i interface{}) { myFunc(i) wg.Done() }) defer p.Release() // 提交任务 for i := 0; i < runTimes; i++ { wg.Add(1) _ = p.Invoke(int32(i)) } wg.Wait() fmt.Printf("运行中的Goroutine数量: %d\n", ants.Running()) fmt.Println("任务完成:", sum) }
使用连接池并发求数组和
package main import ( "fmt" "math" "math/rand" "sync" "sync/atomic" "github.com/panjf2000/ants/v2" ) // 全局变量 var sum int64 // 核心的方法 func sumArr(i interface{}) { arr := i.([]int64) for _, v := range arr { atomic.AddInt64(&sum, v) } } // 获取随机数组 func getArr(min, max, length int) (arr []int64) { for i := 0; i < length; i++ { num := rand.Float64()*float64(max) + float64(min) numInt := int64(math.Floor(num)) arr = append(arr, numInt) } return } func main() { arr := getArr(0, 100, 100000000) // 使用通用的连接池 var wg sync.WaitGroup // 创建连接池 p, _ := ants.NewPoolWithFunc(10, func(i interface{}) { sumArr(i) wg.Done() }) defer p.Release() // 拆分数组 step := len(arr) / 3 arr1 := arr[:step] arr2 := arr[step : step+step] arr3 := arr[step+step:] // 提交任务 wg.Add(1) _ = p.Invoke(arr1) wg.Add(1) _ = p.Invoke(arr2) wg.Add(1) _ = p.Invoke(arr3) wg.Wait() fmt.Printf("运行中的Goroutine数量: %d\n", ants.Running()) fmt.Println("任务完成:", sum) }
比较连接池和普通方法求和消耗时间
package main import ( "fmt" "math" "math/rand" "sync" "sync/atomic" "time" "github.com/panjf2000/ants/v2" ) // 全局变量 var sum int64 // 核心的方法 func sumArr(i interface{}) { arr := i.([]int64) for _, v := range arr { atomic.AddInt64(&sum, v) } } // 获取随机数组 func getArr(min, max, length int) (arr []int64) { for i := 0; i < length; i++ { num := rand.Float64()*float64(max) + float64(min) numInt := int64(math.Floor(num)) arr = append(arr, numInt) } return } // 求数组和 func getArrSum(arr []int64) (sum int64) { for _, num := range arr { sum += num } return } func main() { var ( arr = getArr(0, 100, 100000000) startTime time.Time spendTime time.Duration wg sync.WaitGroup ) // 创建连接池 p, _ := ants.NewPoolWithFunc(10, func(i interface{}) { sumArr(i) wg.Done() }) defer p.Release() // 拆分数组 startTime = time.Now() step := len(arr) / 3 arr1 := arr[:step] arr2 := arr[step : step+step] arr3 := arr[step+step:] // 提交任务 wg.Add(1) _ = p.Invoke(arr1) wg.Add(1) _ = p.Invoke(arr2) wg.Add(1) _ = p.Invoke(arr3) wg.Wait() spendTime = time.Since(startTime) fmt.Println("使用连接池求和:", sum, spendTime) // 普通方法求和 startTime = time.Now() sum = getArrSum(arr) spendTime = time.Since(startTime) fmt.Println("使用普通方法求和:", sum, spendTime) }
输出结果:
使用连接池求和: 4950340694 1.323886243s 使用普通方法求和: 4950340694 46.091214ms
从结果可以发现,连接池消耗的时间反而比普通方法更多。主要是因为,连接池不仅要拆分数组,每次求和的时候,还需要单独将每个元素累加到全局变量sum上,会有额外的计算步骤。
即就是说,连接池有自己适用的使用场景,并非只要上连接池效率就高于一切,在真实的开发中,要根据实际需求考虑是否需要使用Goroutine连接池。