最近在学《OceanBase 数据库源码解析》,发现其中和 SQL 执行器相关的内容很少,所以想写几篇博客,作为这本书的一个补充。上一篇博客《OceanBase 执行引擎的自适应技术》中给大家介绍了执行器里几个比较具有代表性的自适应技术,但是已经假设大家对 hash group by 中的两阶段下压技术有所了解。如果大家对执行器的多阶段下压技术还不是特别熟悉,欢迎阅读这篇博客,来一起学习一下 OceanBase 中比较常见的几种自适应分布式下压技术。
什么是分布式下压
在分布式执行的过程中,为了更好地利用并行的能力,降低 CPU 和网络的开销,优化器生成计划的过程中,往往会将部分算子下压到更下层的各个计算节点上。目的是为了充分利用集群的计算资源,提升执行效率。这次就来介绍下 OceanBase 里面最常见的集中分布式下压技术。
LIMIT 下压
我们先介绍一下 limit 的下压。举一个简单的例子,这两条 SQL 是创建一个 orders 表,并从 orders 表中读 100 行数据。
CREATE TABLE `orders` (
`o_orderkey` bigint(20) NOT NULL,
`o_custkey` bigint(20) NOT NULL,
`o_orderdate` date NOT NULL,
PRIMARY KEY (`o_orderkey`, `o_orderdate`, `o_custkey`),
KEY `o_orderkey` (`o_orderkey`) LOCAL BLOCK_SIZE 16384
) partition by range columns(o_orderdate)
subpartition by hash(o_custkey) subpartitions 64
(partition ord1 values less than ('1992-01-01'),
partition ord2 values less than ('1992-02-01'),
partition ord3 values less than ('1992-03-01'),
partition ord77 values less than ('1998-05-01'),
partition ord78 values less than ('1998-06-01'),
partition ord79 values less than ('1998-07-01'),
partition ord80 values less than ('1998-08-01'),
partition ord81 values less than (MAXVALUE));
select * from orders limit 100;
图中的计划是分布式下压的一个很常见的场景:
explain select * from orders limit 100;
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ================================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| ----------------------------------------------------------------- |
| |0 |LIMIT | |1 |2794 | |
| |1 |└─PX COORDINATOR | |1 |2794 | |
| |2 | └─EXCHANGE OUT DISTR |:EX10000|1 |2793 | |
| |3 | └─LIMIT | |1 |2792 | |
| |4 | └─PX PARTITION ITERATOR| |1 |2792 | |
| |5 | └─TABLE FULL SCAN |orders |1 |2792 | |
| ================================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
| limit(100), offset(nil) |
| 1 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
| 2 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
| dop=1 |
| 3 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
| limit(100), offset(nil) |
| 4 - output([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), filter(nil) |
| force partition granule |
| 5 - output([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), filter(nil) |
| access([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), partitions(p0sp[0-63], p1sp[0-63], p2sp[0-63], p3sp[0-63], p4sp[0-63], p5sp[0-63], |
| p6sp[0-63], p7sp[0-63]) |
| limit(100), offset(nil), is_index_back=false, is_global_index=false, |
| range_key([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), range(MIN,MIN,MIN ; MAX,MAX,MAX)always true |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
可以看到计划中有两个 limit 算子(1 号和 3 号)。通过下压生成 3 号 limit 算子,可以降低对 5 号 table scan 对 orders 每个分区的扫描行数,让每个 table scan 的线程最多只扫描 100 行数据,这样可以降低 table scan 扫描数据的开销以及发送数据到 1 号算子进行汇总的网络开销。目前 OB 的一个 exchange 算子是从下层收到 64K 数据以后发一个包,limit 如果不下压的话可能会多扫描很多的数据,并且带来很大的网络开销。
真实的场景中,limit 往往伴随着 order by。如果在前面的例子中加上 order by 关键字,order by 加 limit 会在计划中生成一个 top-n sort 算子,它的性能是比 sort 要好很多的。
explain select * from orders order by o_orderdate limit 100;
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ================================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| ----------------------------------------------------------------- |
| |0 |LIMIT | |1 |2794 | |
| |1 |└─PX COORDINATOR MERGE SORT | |1 |2794 | |
| |2 | └─EXCHANGE OUT DISTR |:EX10000|1 |2793 | |
| |3 | └─TOP-N SORT | |1 |2792 | |
| |4 | └─PX PARTITION ITERATOR| |1 |2792 | |
| |5 | └─TABLE FULL SCAN |orders |1 |2792 | |
| ================================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
| limit(100), offset(nil) |
| 1 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
| sort_keys([orders.o_orderdate, ASC]) |
| 2 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
| dop=1 |
| 3 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
| sort_keys([orders.o_orderdate, ASC]), topn(100) |
| 4 - output([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), filter(nil) |
| force partition granule |
| 5 - output([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), filter(nil) |
| access([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), partitions(p0sp[0-63], p1sp[0-63], p2sp[0-63], p3sp[0-63], p4sp[0-63], p5sp[0-63], |
| p6sp[0-63], p7sp[0-63]) |
| is_index_back=false, is_global_index=false, |
| range_key([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), range(MIN,MIN,MIN ; MAX,MAX,MAX)always true |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
如果上面的 limit 不下压的话,3 号算子就会变成 sort 算子,每个线程需要将自己扫描的所有数据排序以后发送给上层的 DFO(DFO 就是一个子计划,相邻的 DFO 之间以 exchange 算子作为分割,详见:https://www.oceanbase.com/docs/common-oceanbase-database-1000000000034037)。
limit 下压的作用,就是能够提前结束执行,减少计算和网络的开销。
AGGREGATION 下压
下面介绍一下聚合中的分布式下压,以这条 group by 语句为例:
select count(o_totalprice), sum(o_totalprice) from orders group by o_orderdate;
这条 SQL 查询了每一天的订单数和销售额,如果希望并行地执行这条 SQL 的话,最直接的想法肯定是让表中数据根据 group by 列(o_orderdate)的 hash 值进行数据的分发,因为这样可以确保 o_orderdate 值相同的行都被发送到了同一个线程,各个线程可以并行地对收到的数据去进行聚合。
但是这个计划的一个弊端是要对表中所有的数据都要做一次 shuffle 网络的开销可能很大;还有一个问题是如果表中存在数据倾斜比如某一天的订单特别多,那么处理负责处理这一天订单的线程的工作量就会比其他线程多很多,这个长尾的任务可能直接导致这个查询的执行时间特别长。
为了解决上述这些问题,我们会对 group by 算子进行下压,生成这样一个计划:
explain select count(o_totalprice), sum(o_totalprice) from orders group by o_orderdate;
+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
| ===================================================================== |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| --------------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |2796 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10001|1 |2795 | |
| |2 | └─HASH GROUP BY | |1 |2795 | |
| |3 | └─EXCHANGE IN DISTR | |1 |2794 | |
| |4 | └─EXCHANGE OUT DISTR (HASH)|:EX10000|1 |2794 | |
| |5 | └─HASH GROUP BY | |1 |2793 | |
| |6 | └─PX PARTITION ITERATOR| |1 |2792 | |
| |7 | └─TABLE FULL SCAN |orders |1 |2792 | |
| ===================================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(T_FUN_COUNT_SUM(T_FUN_COUNT(orders.o_totalprice)), T_FUN_SUM(T_FUN_SUM(orders.o_totalprice)))]), filter(nil) |
| 1 - output([INTERNAL_FUNCTION(T_FUN_COUNT_SUM(T_FUN_COUNT(orders.o_totalprice)), T_FUN_SUM(T_FUN_SUM(orders.o_totalprice)))]), filter(nil) |
| dop=1 |
| 2 - output([T_FUN_COUNT_SUM(T_FUN_COUNT(orders.o_totalprice))], [T_FUN_SUM(T_FUN_SUM(orders.o_totalprice))]), filter(nil) |
| group([orders.o_orderdate]), agg_func([T_FUN_COUNT_SUM(T_FUN_COUNT(orders.o_totalprice))], [T_FUN_SUM(T_FUN_SUM(orders.o_totalprice))]) |
| 3 - output([orders.o_orderdate], [T_FUN_COUNT(orders.o_totalprice)], [T_FUN_SUM(orders.o_totalprice)]), filter(nil) |
| 4 - output([orders.o_orderdate], [T_FUN_COUNT(orders.o_totalprice)], [T_FUN_SUM(orders.o_totalprice)]), filter(nil) |
| (#keys=1, [orders.o_orderdate]), dop=1 |
| 5 - output([orders.o_orderdate], [T_FUN_COUNT(orders.o_totalprice)], [T_FUN_SUM(orders.o_totalprice)]), filter(nil) |
| group([orders.o_orderdate]), agg_func([T_FUN_COUNT(orders.o_totalprice)], [T_FUN_SUM(orders.o_totalprice)]) |
| 6 - output([orders.o_orderdate], [orders.o_totalprice]), filter(nil) |
| force partition granule |
| 7 - output([orders.o_orderdate], [orders.o_totalprice]), filter(nil) |
| access([orders.o_orderdate], [orders.o_totalprice]), partitions(p0sp[0-63], p1sp[0-63], p2sp[0-63], p3sp[0-63], p4sp[0-63], p5sp[0-63], p6sp[0-63], |
| p7sp[0-63]) |
| is_index_back=false, is_global_index=false, |
| range_key([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), range(MIN,MIN,MIN ; MAX,MAX,MAX)always true |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
在这个计划里面,每个线程在进行数据的分发之前会先对自己读取的这部分数据进行预聚合,也就是计划里面的 5 号 group by 算子做的工作。然后 5 号算子将聚合的结果发送给上层的算子,之后上层的 2号 group by 算子会对收到的数据再进一次聚合。因为经过这个 5 号 group by 算子的提前聚合之后,数据量一般都会大幅降低,这样即可以降低数据 shuffle 带来的网络开销,也能降低数据倾斜对执行时间的影响。
接下来展示一下具体的执行过程来进行说明,还是刚才那条熟悉的 SQL,求每一天的订单数和销售额。
select count(o_totalprice), sum(o_totalprice) from orders group by o_orderdate;
原始的数据共有 7 行,每笔订单的销售额都是 10 元,它分布在 1、2、3 号这三天里面。
下图中展示了执行的过程,这里我们把并行度设置为 2:
我们从左往右看,可以看到左上方的第一个线程扫描了 3 行的数据,左下方的第二个线程扫描了 4 行数据。日期相同的数据,也就是同一组的数据,都被标成了相同的颜色。
先看左侧,第一个线程对其扫描的 3 行数据去进行聚合,这 3 行数据分布在两个 group 里面,6 月 1 号有 2 行,6 月 3 号有 1 行。因为 6 月 1 号有 2 行,所以它的 count 是 2,销售额是 20。6 月 3 号有 1 行,它的 count 是 1,销售额是10。第二个线程扫描的 4 行数据也分布在两个 group 里面,聚合后也生成了两行数据,这里不再赘述。这部分工作对应计划里的 5 号算子。
然后这两个线程利用 o_orderdate 列的 hash 值进行数据的分发,让同一天的数据都发送到同一个线程。这部分工作对应计划里的 3 号和 4 号算子。
右侧的每个线程对收到的数据会再进行一次聚合。可以看到左边两个线程中 6 月 3 号的数据(红色)都被发送到了右下方这个线程里,这两行从左侧不同线程发过来的 6 月 3 号的数据被右侧的算子再次进行聚合,count 和 sum 都再次被相加,count 变成了2,sum 变成了 20,最终被聚合成了一行。这部分工作对应计划里的 2 号算子。
然后所有的数据都会发送给协调者,协调者对数据进行汇总之后,将最终的计算结果发送给客户端。
JOIN FILTER 下压
join 算子中也会把左表的过滤条件 join filter 下压到右表,对右表的数据进行提前过滤和分区裁剪。
提前过滤
hash join 在执行的时候,总是先读左表的数据,建立一个哈希表。然后用右侧的数据去探测这个哈希表,如果探测成功的话就会把这个数据发送给上层算子。这里存在的一个问题就是如果 hash join 的右侧存在一个数据 reshuffle(重分布)的话,网络的开销可能比较大,这个开销标取决于右表的数据量大小。在这种情况下,我们可以通过 join filter 来降低数据 shuffle 的网络开销。
以这个计划为例:
在上面这个计划中,2 号的 hash join 算子从左侧读数据,读的时候会使用 t1.c1 这个连接键创建一个 join filter,就是计划中的这个 3 号 join filter create 算子。join filter 最常见的一个形式是 bloom filter,join filter 创建完成以后会被发送到 hash join 右侧这个 DFO(6 号算子以及更下层的算子)。
可以看到,10 号的这个 table scan 上面有一个过滤条件 sys_op_bloom_filter(t2.c1),表示会用 bloom_filter 对 hash join 右表 t2.c1 的值去进行一个快速的探测。如果探测失败的话说明不存在 t2.c1 跟这个 t1.c1 相等,那么这行数据可以直接被提前过滤掉,不需要向上发送给 hash join。
分区裁剪
join filter 不仅可以对行进行过滤,还可以用于分区裁剪,即对分区进行过滤。如果 t1 是一个分区表,并且连接键是它的分区键的话,那么可以生成这样的计划:
可以看到这个计划里,3 号是一个 partition join filter create 算子,它会感知 hash join 右边的 t1 表的分区方式,它每从下层获取一行左表的数据,就会用 c1 的值去计算这行数据在右表 t1 表里的哪个分区里面,并将这个 partition id 记录到 join filter 里。最终这个 partition id 的 join filter 会在 8 号算子上用于 hash join 右表的分区裁剪。右表扫描每一个分区之前都会检查这个 partition id 是否存在于 join filter 中,如果不存在的话,可以直接跳过整分区的扫描。
join filter 可以提前对数据进行过滤、提前对分区进行裁剪,降低了扫描数据、网络传输和探测 hash 表的开销。目前 4.2 之前只支持 bloom filter 这一种类型的 join filter。4.2上新支持了 in filter 和 range filter 这两种类型的 join filter,这两种新的 join filter 在一些场景中对性能有很好的提升,特别是在左表不同值的个数较少或者是左表值连续的场景。
其他的分布式下压
除了上述介绍的几个比较常见也比较易于理解的分布式下压技术,OceanBase 还支持更多的自适应分布式下压,例如: window function 的自适应两阶段下压、三阶段的聚合下压等等。
OceanBase 中这些更为复杂的分布式下压技术,由于精力所限,就不再一一详细介绍。下面会贴一下刚才提到的两种分布式下压的执行计划,供有兴趣的同学去进行更深入的研究。
window function 的自适应两阶段下压:
select /*+parallel(3) */
c1, sum(c2) over (partition by c1) from t1 order by c1;
Query Plan
===================================================
|ID|OPERATOR |NAME |
---------------------------------------------------
|0 |PX COORDINATOR MERGE SORT | |
|1 | EXCHANGE OUT DISTR |:EX10001|
|2 | MATERIAL | |
|3 | WINDOW FUNCTION CONSOLIDATOR | |
|4 | EXCHANGE IN MERGE SORT DISTR | |
|5 | EXCHANGE OUT DISTR (HASH HYBRID)|:EX10000|
|6 | WINDOW FUNCTION | |
|7 | SORT | |
|8 | PX BLOCK ITERATOR | |
|9 | TABLE SCAN |t1 |
===================================================
三阶段的聚合下压:
select /*+ parallel(2) */
c1, sum(distinct c2),count(distinct c3), sum(c4) from t group by c1;
Query Plan
===========================================================================
|ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)|
---------------------------------------------------------------------------
|0 |PX COORDINATOR | |1 |8 |
|1 |└─EXCHANGE OUT DISTR |:EX10002|1 |7 |
|2 | └─HASH GROUP BY | |1 |6 |
|3 | └─EXCHANGE IN DISTR | |2 |6 |
|4 | └─EXCHANGE OUT DISTR (HASH) |:EX10001|2 |6 |
|5 | └─HASH GROUP BY | |2 |4 |
|6 | └─EXCHANGE IN DISTR | |2 |4 |
|7 | └─EXCHANGE OUT DISTR (HASH)|:EX10000|2 |3 |
|8 | └─HASH GROUP BY | |2 |2 |
|9 | └─PX BLOCK ITERATOR | |1 |1 |
|10| └─TABLE FULL SCAN |t |1 |1 |
===========================================================================
下回预告
这篇博客给大家介绍了 OceanBase 执行器中几个比较具有代表性的分布式下压技术,但是已经假设大家对数据库的分布式执行技术有所了解。如果大家对执行器的并行执行技术还不是特别了解,请期待下一篇博客《OceanBase 并行执行技术》。