简介
本文主要参考 1994 年 Graefe 的论文《Volcano-An Extensible and Parallel Query Evaluation System》,来解释火山模型中对查询进行并行的方法。
火山模型下,每一种关系代数操作会被抽象为一个算子,从而将整个 SQL 构建成一棵算子树(Operator Tree),从根节点到叶子结点自上而下地递归调用 next()
函数,从而实现计算。
文章虽然很老了,火山模型也由于性能问题也逐渐被向量化引擎和编译执行所取代,但是其中将查询表示为算子树进行并行和优化的思路,现在依然被延用。本文主要聚焦于根据算子树对查询进行并行的方法。
根据算子树实现并行查询主要包括以下两个方向:
火山模型中负责实现并行执行的是 Exchange 运算符,它是一个有 open
, next
, close
方法的 iterator,可以被插到查询执行树中的任何一个或多个位置。图5所示是一个插入了 Exchange 算子的查询计划。
因为主要参考 1994 年的论文,所以下面的表述中不区分进程和线程,统一用进程表示
算子间并行(inter-operator parallelism)
Exchange 的 open
方法用于创建进程,Exchange 算子上方作为父进程,下方作为子进程,例如图5中的查询树执行 open
方法后,创建的进程将如图6所示。
Exchange 采用生产者消费者模型,父进程会作为消费者,子进程会作为生产者,同时在共享内存中创建一个数据结构 port 用于同步和数据交换。例如 Scan 算子会作为生产者,上方的 Join 算子作为消费者。
生产者端的 exchange 算子会作为 driver 驱动查询执行,其输出会放到 packet 里面临时存储。 packet 被填满后,会被放到 port 中,同时发送一个信号量来提醒消费者可以进行消费。
消费者端的 exchange 算子就和普通的迭代器一样,只不过它接收输入时会通过进程间的通信而不是内部的方法调用。
注意,火山模型中所有其他模块都是基于 demand-driven,即 iterator 调用 Next()
方法后,数据流再从下游传到上游,控制流和数据流的方向相反。而 Exchange 算子则是基于data-driven,生产者侧的数据就绪后再通知消费者执行,数据流和控制流的方向相同。可参见下图的 Pull 模型和 Push 模型的比较,容易理解。
这主要有两方面的原因:1. Data-driven 的方式更容易实现算子内的并行,因为算子内并行需要对数据进行分区,然后基于不交叉的数据进行; 2. 这种模式避免了多余的控制流来 Request data,进程间通信时这些不避免要的控制流会导致延迟。
同时,data-driven 的模式下允许流量控制(flow control) 或者说反压(back pressure)。比如说,当生产者的生产速度大于消费者的消费速度时,会导致数据堆积,占用较大内存的问题。这时可以通过消费者端发送一个信号量,告诉生产者降低生产速度或停止生产,等消费者消费完后再进行,从而解决问题。
算子内并行(intra-operator parallelism)
算子内的并行需要对输入数据进行分区,输入数据主要包括数据存储和中间结果。
- 数据存储的分区主要依赖物理分区,比如不同设备,不同文件。
- 中间结果分区则主要依靠在 port 中使用不同的队列。生产者使用分区 support function 来决定放到哪个队列里。
图7中展示了为了实现算子内并行创建的进程,Join 算子有三个进程执行,Scan 算子由一个或两个进程执行。通过规定并发度(degree of parallelism)来确定执行的进程数。因为同时有三个进程在执行 Join 算子,因此必须对 Scan 得到的数据进行重分区,以交给不同的进程执行。
所有的 Scan 进程都可以传递数据给所有的 Join 进程,但是 Join 算子间的数据传递只允许在每个 Join 进程内部进行。此时,如果使用了基于分区的并行 Join 方法,且图7中两个 Join 是针对不同属性进行的,则会导致出现问题。因为第一个 Join 是用属性 1 做的分区,此时属性2 相同的 tuple 可能落在不同的 Join 进程中。这个问题可以使用 exchange 算子的变式来解决,称为 inter-change.
Exchange 算子的变式
目前,我们提到的 exchange 算子都只能在一个进程的顶部或底部出现(要么提供输入,要么进行输出)。除此之外,Exchange 还可以在一个进程的 operator tree 的中间出现,其功能只限于提供一个数据交换的窗口。其 next 方法从下游的算子中获取输入,并可能把它发送给同一个 Group 的其他进程(如果属于自己的分区就自己用)。这种操作模式称为 inter-change.
另外,还有能把输出广播给所有消费者的 exchange 算子, 比如 HashJoin 中广播小表构建的哈希表;根据 producer 把 input 分别存储的 exchange 算子,以便上游可以区分输入的来源。
参考
Volcano - An Extensible and Parallel Query Evaluation System
zhuanlan.zhihu.com/p/219516250
Push vs. Pull-Based Loop Fusion in Query Engines