PostgreSQL 在很多场景下会启用并行执行计划,创建多个并行工作子进程,提升查询效率。
一个常用的并行表扫描的例子:
从下面的执行计划可以看出,Parallel Seq Scan 并行表扫描,并发工作进程数为 2,最上层的执行计划节点名称为 Gather。
postgres=# explain select * from tt where product_item_id=12836242;
QUERY PLAN
----------------------------------------------------------------------
Gather (cost=1000.00..46068.10 rows=1 width=41)
Workers Planned: 2
-> Parallel Seq Scan on tt (cost=0.00..45068.00 rows=1 width=41)
Filter: (product_item_id = 12836242)
并行表扫描相关的参数:
执行计划是否采用并行执行,受很多因素影响,其中一些 guc 参数影响较大,如下:
- max_parallel_workers_per_gather,一个 gather 节点能够使用的最大并行数量
- min_parallel_table_scan_size,表扫描数据量的最小阈值,超过该值才会考虑使用并行
- min_parallel_index_scan_size,索引扫描数据量的最小阈值,超过该值才会考虑使用并行
- force_parallel_mode,强制使用并行模式
- parallel_leader_participation,并行的 leader 是否参与并行,默认为 on
生成并行执行计划的相关函数:
以 Parallel Seq Scan 为例,介绍生成并行表扫描计划所涉及的一些重要函数。
(1)compute_parallel_worker()
根据表信息、表的 heap pages,index pages 数量,以及一些 guc 参数,计算并行的 worker 数量。
(2)set_rel_consider_parallel()
用来判断一个表是否可以并行扫描,即 rel->consider_parallel 是 true 或者 false ,由该函数判定。
(3)create_plain_partial_paths()
创建并行的 T_SeqScan 类型的 Path
(4)create_gather_path()
创建 T_Gather 类型的 Path(GatherPath)
(5)create_gather_plan()
创建 T_Gather 类型的 Plan(Gather),通过函数调用 ExecInitNode() -->ExecInitGather() 初始化 T_Gather 类型的执行计划,生成 T_GatherState。Gather 最终的执行函数为:ExecGather()
支持并行执行计划的扫描方法:
见函数 ExecParallelInitializeDSM()
- T_SeqScanState
- T_IndexScanState
- T_IndexOnlyScanState
- T_ForeignScanState
- T_AppendState
- T_CustomScanState
- T_BitmapHeapScanState
- T_HashJoinState
- T_HashState
- T_SortState
多个子进程如何并发的扫描 tuple?
pg 表数据存储在共享内存中,并行执行所需要的信息也是通过共享内存在所有 parallel 进程中共享,具体函数调用栈如下:
ExecGather()
ExecInitParallelPlan()
ExecParallelInitializeDSM()
ExecSeqScanInitializeDSM()
heap_parallelscan_initialize()
多个 parallel 进程顺序扫描表,ParallelHeapScanDesc target 是在共享内存中,并且与多个并发子进程共享,控制并发扫描的 page。多个进程并发调用 heap_parallelscan_nextpage() 函数,通过 pg_atomic_fetch_add_u64() 函数,多个进程原子化获取扫描的页,多进程并发扫描的粒度是页(page)级别。
nallocated = pg_atomic_fetch_add_u64(¶llel_scan->phs_nallocated, 1);
if (nallocated >= scan->rs_nblocks)
page = InvalidBlockNumber; /* all blocks have been allocated */
else
page = (nallocated + parallel_scan->phs_startblock) % scan->rs_nblocks;
多个 parallel 进程如何向 backend 进程发送 tuple 扫描结果?
parallel 进程调用 shm_mq_send(),将 tuple 数据发送给主进程:
ExecParallelGetReceiver()
CreateTupleQueueDestReceiver()
tqueueReceiveSlot()
shm_mq_send()
backend 进程通过 shm_mq_receive() 接收 tuple 数据:
ExecGather()
gather_getnext()
gather_readnext()
TupleQueueReaderNext()
shm_mq_receive()
消息队列如何与多个 parallel 子进程关联?
parallel 进程与 backend 进程通过消息队列传送 tuple 数据,那么多个 parallel 进程就会有多个消息队列存在,那么每个消息队列如何与相应的 parallel 进程关联呢?在创建 parallel 进程时,会将该进程的编号放入 worker.bgw_extra 中,该变量最终被拷贝到共享内存 BackgroundWorkerData 中,代码如下:
for (i = 0; i nworkers; ++i)
{
memcpy(worker.bgw_extra, &i, sizeof(int));
if (!any_registrations_failed &&
RegisterDynamicBackgroundWorker(&worker,
&pcxt->worker[i].bgwhandle))
...
}
在 parallel 进程中,拿到进程的编号,存放在 ParallelWorkerNumber 变量中,如下:
void
ParallelWorkerMain(Datum main_arg)
{
memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
}
有了进程编号 ParallelWorkerNumber,就可以拿到进程对应的消息队列,如下:
static DestReceiver *
ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
{
char *mqspace;
shm_mq *mq;
mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
mq = (shm_mq *) mqspace;
shm_mq_set_sender(mq, MyProc);
return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
}
backend 进程是否参与 tuple 扫描?
在并发扫描执行计划中,会创建多个 parallel 进程进行表数据扫描,那么 backend 进程是否也参与到 tuple 扫描呢?实际上会通过 gatherstate->need_to_scan_locally 来标记是否在 backend 进程本地进行表扫描。
函数调用栈如下:
ExecGather()
gather_getnext()
相关代码如下:
static TupleTableSlot *
gather_getnext(GatherState *gatherstate)
{
...
if (gatherstate->need_to_scan_locally)
{
EState *estate = gatherstate->ps.state;
/* Install our DSA area while executing the plan. */
estate->es_query_dsa =
gatherstate->pei ? gatherstate->pei->area : NULL;
outerTupleSlot = ExecProcNode(outerPlan);
estate->es_query_dsa = NULL;
if (!TupIsNull(outerTupleSlot))
return outerTupleSlot;
gatherstate->need_to_scan_locally = false;
}
...
}
need_to_scan_locally 的取值与几个因素有关:
(1)如果所有 parallel 进程启动失败,那么 need_to_scan_locally 必须为 true,只能本地 backend 进程扫描 tuple 了。
(2)guc 参数 parallel_leader_participation 为 true 并且 single_copy 为 false。
以上两个条件满足一个,即可启动 backend 本地扫描 tuple,具体代码片断如下:
node->need_to_scan_locally = (node->nreaders == 0)
|| (!gather->single_copy && parallel_leader_participation);
关于 single_copy 的含义如下:
typedef struct GatherPath
{
Path path;
Path *subpath; /* path for each worker */
bool single_copy; /* don't execute path more than once */
int num_workers; /* number of workers sought to help */
} GatherPath;
single_copy 主要由并发数决定,即 path 路径是否支持并发,如果 parallel_workers = 0,则 single_copy = true。
pathnode->subpath = subpath;
pathnode->num_workers = subpath->parallel_workers;
pathnode->single_copy = false;
if (pathnode->num_workers == 0)
{
pathnode->path.pathkeys = subpath->pathkeys;
pathnode->num_workers = 1;
pathnode->single_copy = true;
}