GO语言并发编程入门:Goroutine连接池/并发求数组和

2023年 7月 12日 53.2k 0

ants连接池概述

参考文档:https://github.com/panjf2000/ants/blob/master/README_ZH.md

虽然Go的Goroutine非常强大,几乎可以随意创建,但是毕竟资源是有限的。当我们大批量重复使用Goroutine的时候,将会占用非常多的系统资源,这个时候Goroutine连接池就非常有用了。

ants是一个非常流行的Goroutine连接池工具,具备以下功能:

  • 自动调度海量的 goroutines,复用 goroutines
  • 定期清理过期的 goroutines,进一步节省资源
  • 提供了大量有用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool
  • 优雅处理 panic,防止程序崩溃
  • 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有更高的性能
  • 非阻塞机制

使用默认连接池

如何使用默认连接池提交任务

// 任务子函数
func demoFunc() {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("Hello World!")
}

// 任务函数
var wg sync.WaitGroup
syncCalculateSum := func() {
    demoFunc()
    wg.Done()
}

// 提交任务
ants.Submit(syncCalculateSum)

并发执行1000次无参方法

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/panjf2000/ants/v2"
)

// 核心的方法
func demoFunc() {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("Hello World!")
}

func main() {
	defer ants.Release()

	runTimes := 1000

	// 使用通用的连接池
	var wg sync.WaitGroup
	syncCalculateSum := func() {
		demoFunc()
		wg.Done()
	}

	// 提交指定次数的任务
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		// 使用默认的连接池
		_ = ants.Submit(syncCalculateSum)
	}
	wg.Wait()
	fmt.Printf("运行中的Goroutine数量: %d\n", ants.Running())
	fmt.Printf("任务完成.\n")
}

使用自定义连接池

package main

import (
	"fmt"
	"sync"
	"sync/atomic"

	"github.com/panjf2000/ants/v2"
)

// 全局变量
var sum int32

// 核心的方法
func myFunc(i interface{}) {
	n := i.(int32)
	atomic.AddInt32(&sum, n)
	fmt.Printf("run with %d\n", n)
}

func main() {
	runTimes := 1000

	// 使用通用的连接池
	var wg sync.WaitGroup

	// 创建连接池
	p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
		myFunc(i)
		wg.Done()
	})
	defer p.Release()

	// 提交任务
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = p.Invoke(int32(i))
	}
	wg.Wait()

	fmt.Printf("运行中的Goroutine数量: %d\n", ants.Running())
	fmt.Println("任务完成:", sum)
}

使用连接池并发求数组和

package main

import (
	"fmt"
	"math"
	"math/rand"
	"sync"
	"sync/atomic"

	"github.com/panjf2000/ants/v2"
)

// 全局变量
var sum int64

// 核心的方法
func sumArr(i interface{}) {
	arr := i.([]int64)
	for _, v := range arr {
		atomic.AddInt64(&sum, v)
	}
}

// 获取随机数组
func getArr(min, max, length int) (arr []int64) {
	for i := 0; i < length; i++ {
		num := rand.Float64()*float64(max) + float64(min)
		numInt := int64(math.Floor(num))
		arr = append(arr, numInt)
	}
	return
}

func main() {
	arr := getArr(0, 100, 100000000)

	// 使用通用的连接池
	var wg sync.WaitGroup

	// 创建连接池
	p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
		sumArr(i)
		wg.Done()
	})
	defer p.Release()

	// 拆分数组
	step := len(arr) / 3
	arr1 := arr[:step]
	arr2 := arr[step : step+step]
	arr3 := arr[step+step:]

	// 提交任务
	wg.Add(1)
	_ = p.Invoke(arr1)
    wg.Add(1)
	_ = p.Invoke(arr2)
    wg.Add(1)
	_ = p.Invoke(arr3)
	wg.Wait()

	fmt.Printf("运行中的Goroutine数量: %d\n", ants.Running())
	fmt.Println("任务完成:", sum)
}

比较连接池和普通方法求和消耗时间

package main

import (
	"fmt"
	"math"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"

	"github.com/panjf2000/ants/v2"
)

// 全局变量
var sum int64

// 核心的方法
func sumArr(i interface{}) {
	arr := i.([]int64)
	for _, v := range arr {
		atomic.AddInt64(&sum, v)
	}
}

// 获取随机数组
func getArr(min, max, length int) (arr []int64) {
	for i := 0; i < length; i++ {
		num := rand.Float64()*float64(max) + float64(min)
		numInt := int64(math.Floor(num))
		arr = append(arr, numInt)
	}
	return
}

// 求数组和
func getArrSum(arr []int64) (sum int64) {
	for _, num := range arr {
		sum += num
	}
	return
}

func main() {
	var (
		arr       = getArr(0, 100, 100000000)
		startTime time.Time
		spendTime time.Duration
		wg        sync.WaitGroup
	)

	// 创建连接池
	p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
		sumArr(i)
		wg.Done()
	})
	defer p.Release()

	// 拆分数组
	startTime = time.Now()
	step := len(arr) / 3
	arr1 := arr[:step]
	arr2 := arr[step : step+step]
	arr3 := arr[step+step:]

	// 提交任务
	wg.Add(1)
	_ = p.Invoke(arr1)
	wg.Add(1)
	_ = p.Invoke(arr2)
	wg.Add(1)
	_ = p.Invoke(arr3)
	wg.Wait()
	spendTime = time.Since(startTime)
	fmt.Println("使用连接池求和:", sum, spendTime)

	// 普通方法求和
	startTime = time.Now()
	sum = getArrSum(arr)
	spendTime = time.Since(startTime)
	fmt.Println("使用普通方法求和:", sum, spendTime)
}

输出结果:

使用连接池求和: 4950340694 1.323886243s
使用普通方法求和: 4950340694 46.091214ms

从结果可以发现,连接池消耗的时间反而比普通方法更多。主要是因为,连接池不仅要拆分数组,每次求和的时候,还需要单独将每个元素累加到全局变量sum上,会有额外的计算步骤。

即就是说,连接池有自己适用的使用场景,并非只要上连接池效率就高于一切,在真实的开发中,要根据实际需求考虑是否需要使用Goroutine连接池。

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论