背景
在日常业务开发中,我们经常遇到需要并发处理的场景。例如:
- 依据id列表查询db,获取数据。为了保证查询性能,单次查询的id列表长度最好不要超过50(依据业务来判断),当id列表长度超过50时,拆分成并发请求,减少耗时和提高性能,返回聚合后的结果
- 外部提供的接口不支持批量写入/读取数据,当需要批量处理数据时,为了减少耗时和提高性能,并发请求外部接口
以上处理数据的场景,都可以分成两个阶段:
同步调用,调用耗时增长明显
并发调用,可以减少调用耗时
分析
上面说的处理数据的场景,都可以分成两个阶段:
这种是一种特殊的MapReduce
为了处理这类场景,我们需要明确以下几个部分:
- 列表长度。代表有多少数据需要进行处理
- map函数。并发处理的函数,互不干扰
- reduce函数。同步处理的函数
- 最大并发数。决定需要开多少线程/协程来处理
- 拆分长度。列表长度 / 拆分长度 = 子任务数
由于我在日常开发中常使用golang语言,下面梳理下使用golang来解决这类问题的一个思路
函数签名
func ChunkProcess(length int, procedure func(start, end int) (interface{}, error),
reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int)
核心逻辑:
-
当最大并发数