OceanBasev4.2 Runtime Filter特性说明

2024年 5月 7日 66.1k 0

OceanBase数据库支持Hash Join联接算法,它可以将两个表基于某些字段进行等值匹配连接。然而,当参与连接的表(尤其是参与Probe Table)数据量较大时,Hash Join的性能会受极大影响。为了解决这个问题,通用方法是采用Runtime Filter来提高效率。

本文将向大家介绍OceanBase中的Runtime Filter能力及其在4.2版本的优化细节。

OceanBase中的Runtime Filter能力

Runtime Filter是一种用于优化Hash Join性能的技术,它可以通过减少Hash Join需要Probe的数据量来提高查询的效率。比如,在星形连接的多张维度表和事实表Join的场景中,Runtime Filter是一个非常有效的优化手段。本质上,Runtime Filter是一类过滤器,它可以利用Hash Join的Build过程构建一个轻量的Filter,然后将Filter广播发送给参与Probe的大表,Probe Table可以使用多个Runtime Filter提前在存储层过滤数据,减少真正参与Hash Join及网络传输的数据,从而提高查询效率。

Runtime Filter的数据结构有Bloom Filter、In Filter、Range Filter三种,始终贯穿Runtime Filter的使用类别。下面通过Runtime Filter的各类别来说明Runtime Filter的作用是使用场景。

1、Runtime Filter跨机传输需求场景

首先,按照是否需要跨机传输来看,Runtime Filter可以分为Local和Global两类,无论是Local还是Global,都可以使用这三种结构的Runtime Filter。我们通过explain可以显示当前计划中的Runtime Filter。计划中和Runtime Filter相关的算子有:

JOIN FILTER CREATE: 表示创建 Runtime Filter 的算子,其后面的NAME区分了同一个计划中的多个Runtime Filter
JOIN FILTER USE: 表示使用 Runtime Filter 的算子,拥有相同NAME的JOIN FILTER CREATE和JOIN FILTER USE是一对。由于filter下推逻辑的存在,真正使用Runtime Filter算子一般是其下方的Table Scan算子。

PART JOIN FILTER CREATE:表示创建 Part Join Filter 的算子, 其后面的NAME区分了同一个计划中的多个Runtime Filter
PX PARTITION HASH JOIN-FILTER: 表示使用 Part Join Filter 的算子,拥有相同NAME的PART JOIN FILTER CREATE和PX PARTITION HASH JOIN-FILTER是一对。

(1)Local Runtime Filter

Local Runtime Filter意味着Runtime Filter无需经历网络传输,构建的Filter只需要在本地节点计算过滤,这通常适用于Hash Join Probe侧没有Shuffle的场景。下面为Local Runtime Filter计划:

obclient> create table tt1(v1 int, v2 int) partition by hash(v1) partitions 5;
obclient> create table tt2(v1 int, v2 int) partition by hash(v1) partitions 5;
obclient> explain select /*+ px_join_filter(tt2) parallel(4) */ * from tt1 join tt2 on tt1.v1=tt2.v1;
+------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan                                                                                                                         |
+------------------------------------------------------------------------------------------------------------------------------------+
| ==============================================================                                                                     |
| |ID|OPERATOR                  |NAME    |EST.ROWS|EST.TIME(us)|                                                                     |
| --------------------------------------------------------------                                                                     |
| |0 |PX COORDINATOR            |        |1       |7           |                                                                     |
| |1 |└─EXCHANGE OUT DISTR      |:EX10000|1       |6           |                                                                     |
| |2 |  └─PX PARTITION ITERATOR |        |1       |6           |                                                                     |
| |3 |    └─HASH JOIN           |        |1       |6           |                                                                     |
| |4 |      ├─JOIN FILTER CREATE|:RF0000 |1       |3           |                                                                     |
| |5 |      │ └─TABLE FULL SCAN |tt2     |1       |3           |                                                                     |
| |6 |      └─JOIN FILTER USE   |:RF0000 |1       |4           |                                                                     |
| |7 |        └─TABLE FULL SCAN |tt1     |1       |4           |                                                                     |
| ==============================================================                                                                     |
| Outputs & filters:                                                                                                                 |
| -------------------------------------                                                                                              |
|   0 - output([INTERNAL_FUNCTION(tt1.v1, tt1.v2, tt2.v1, tt2.v2)]), filter(nil), rowset=256                                         |
|   1 - output([INTERNAL_FUNCTION(tt1.v1, tt1.v2, tt2.v1, tt2.v2)]), filter(nil), rowset=256                                         |
|       dop=4                                                                                                                        |
|   2 - output([tt1.v1], [tt2.v1], [tt2.v2], [tt1.v2]), filter(nil), rowset=256                                                      |
|       partition wise, force partition granule                                                                                      |
|   3 - output([tt1.v1], [tt2.v1], [tt2.v2], [tt1.v2]), filter(nil), rowset=256                                                      |
|       equal_conds([tt1.v1 = tt2.v1]), other_conds(nil)                                                                             |
|   4 - output([tt2.v1], [tt2.v2]), filter(nil), rowset=256                                                                          |
|       RF_TYPE(in, range, bloom), RF_EXPR[tt2.v1]                                                                                   |
|   5 - output([tt2.v1], [tt2.v2]), filter(nil), rowset=256                                                                          |
|       access([tt2.v1], [tt2.v2]), partitions(p[0-4])                                                                               |
|       is_index_back=false, is_global_index=false,                                                                                  |
|       range_key([tt2.__pk_increment]), range(MIN ; MAX)always true                                                                 |
|   6 - output([tt1.v1], [tt1.v2]), filter(nil), rowset=256                                                                          |
|   7 - output([tt1.v1], [tt1.v2]), filter([RF_IN_FILTER(tt1.v1)], [RF_RANGE_FILTER(tt1.v1)], [RF_BLOOM_FILTER(tt1.v1)]), rowset=256 |
|       access([tt1.v1], [tt1.v2]), partitions(p[0-4])                                                                               |
|       is_index_back=false, is_global_index=false, filter_before_indexback[false,false,false],                                      |
|       range_key([tt1.__pk_increment]), range(MIN ; MAX)always true                                                                 |
+------------------------------------------------------------------------------------------------------------------------------------+
32 rows in set (0.045 sec)

在计划中显示为JOIN FILTER CREATE 和 JOIN FILTER USE的这一对算子就是Runtime Filter。Name为RF0000的4号和6号算子在该计划中构建了一个Local Runtime FIlter,该Filter无需网络传播,仅在本机使用。

(2)Global Runtime FIlter

Global Runtime Filter意味着该Filter需要广播传输给多个执行节点,并且在计划形态中可以按需将Runtime Filter嵌套下压至计划中的任意位置完成过滤。相比Local Runtime Filter,由于节省的代价不仅包括SQL层投影及计算开销,还包括了网络传输,因此往往能获得不错的执行性能提升。下面为Global Runtime Filter计划:

obclient> create table t1 (C1_RAND int, C2_RAND int, C3_RAND int, C4_RAND int, C5_RAND int) partition by hash(C5_RAND) partitions 5;
obclient> create table t2 (C1_RAND int, C2_RAND int, C3_RAND int, C4_RAND int, C5_RAND int) partition by hash(C5_RAND) partitions 5;
obclient> create table t3 (C1_RAND int, C2_RAND int, C3_RAND int, C4_RAND int, C5_RAND int) partition by hash(C5_RAND) partitions 5;
obclient> explain basic select /*+ leading(a (b  c)) parallel(3) use_hash(b) use_hash(c) pq_distribute(c hash hash) px_join_filter(c a)  px_join_filter(c b) */ count(*) from t1 a, t2 b, t3 c where a.C1_RAND=b.C1_RAND and a.C2_RAND = c.C2_RAND and b.C3_RAND = c.C3_RAND;
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan                                                                                                                                                       |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ========================================================                                                                                                         |
| |ID|OPERATOR                                  |NAME    |                                                                                                         |
| --------------------------------------------------------                                                                                                         |
| |0 |SCALAR GROUP BY                           |        |                                                                                                         |
| |1 |└─PX COORDINATOR                          |        |                                                                                                         |
| |2 |  └─EXCHANGE OUT DISTR                    |:EX10003|                                                                                                         |
| |3 |    └─MERGE GROUP BY                      |        |                                                                                                         |
| |4 |      └─SHARED HASH JOIN                  |        |                                                                                                         |
| |5 |        ├─JOIN FILTER CREATE              |:RF0001 |                                                                                                         |
| |6 |        │ └─EXCHANGE IN DISTR             |        |                                                                                                         |
| |7 |        │   └─EXCHANGE OUT DISTR (BC2HOST)|:EX10000|                                                                                                         |
| |8 |        │     └─PX BLOCK ITERATOR         |        |                                                                                                         |
| |9 |        │       └─TABLE FULL SCAN         |a       |                                                                                                         |
| |10|        └─HASH JOIN                       |        |                                                                                                         |
| |11|          ├─JOIN FILTER CREATE            |:RF0000 |                                                                                                         |
| |12|          │ └─EXCHANGE IN DISTR           |        |                                                                                                         |
| |13|          │   └─EXCHANGE OUT DISTR (HASH) |:EX10001|                                                                                                         |
| |14|          │     └─PX BLOCK ITERATOR       |        |                                                                                                         |
| |15|          │       └─TABLE FULL SCAN       |b       |                                                                                                         |
| |16|          └─EXCHANGE IN DISTR             |        |                                                                                                         |
| |17|            └─EXCHANGE OUT DISTR (HASH)   |:EX10002|                                                                                                         |
| |18|              └─JOIN FILTER USE           |:RF0000 |                                                                                                         |
| |19|                └─JOIN FILTER USE         |:RF0001 |                                                                                                         |
| |20|                  └─PX BLOCK ITERATOR     |        |                                                                                                         |
| |21|                    └─TABLE FULL SCAN     |c       |                                                                                                         |
| ========================================================                                                                                                         |
| Outputs & filters:                                                                                                                                               |
| -------------------------------------                                                                                                                            |
|   0 - output([T_FUN_COUNT_SUM(T_FUN_COUNT(*))]), filter(nil), rowset=256                                                                                         |
|       group(nil), agg_func([T_FUN_COUNT_SUM(T_FUN_COUNT(*))])                                                                                                    |
|   1 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256                                                                                                          |
|   2 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256                                                                                                          |
|       dop=3                                                                                                                                                      |
|   3 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256                                                                                                          |
|       group(nil), agg_func([T_FUN_COUNT(*)])                                                                                                                     |
|   4 - output(nil), filter(nil), rowset=256                                                                                                                       |
|       equal_conds([a.C1_RAND = b.C1_RAND], [a.C2_RAND = c.C2_RAND]), other_conds(nil)                                                                            |
|   5 - output([a.C2_RAND], [a.C1_RAND]), filter(nil), rowset=256                                                                                                  |
|       RF_TYPE(in, range, bloom), RF_EXPR[a.C2_RAND]                                                                                                              |
|   6 - output([a.C2_RAND], [a.C1_RAND]), filter(nil), rowset=256                                                                                                  |
|   7 - output([a.C2_RAND], [a.C1_RAND]), filter(nil), rowset=256                                                                                                  |
|       dop=3                                                                                                                                                      |
|   8 - output([a.C1_RAND], [a.C2_RAND]), filter(nil), rowset=256                                                                                                  |
|   9 - output([a.C1_RAND], [a.C2_RAND]), filter(nil), rowset=256                                                                                                  |
|       access([a.C1_RAND], [a.C2_RAND]), partitions(p[0-4])                                                                                                       |
|       is_index_back=false, is_global_index=false,                                                                                                                |
|       range_key([a.__pk_increment]), range(MIN ; MAX)always true                                                                                                 |
|  10 - output([b.C1_RAND], [c.C2_RAND]), filter(nil), rowset=256                                                                                                  |
|       equal_conds([b.C3_RAND = c.C3_RAND]), other_conds(nil)                                                                                                     |
|  11 - output([b.C3_RAND], [b.C1_RAND]), filter(nil), rowset=256                                                                                                  |
|       RF_TYPE(in, range, bloom), RF_EXPR[b.C3_RAND]                                                                                                              |
|  12 - output([b.C3_RAND], [b.C1_RAND]), filter(nil), rowset=256                                                                                                  |
|  13 - output([b.C3_RAND], [b.C1_RAND]), filter(nil), rowset=256                                                                                                  |
|       (#keys=1, [b.C3_RAND]), dop=3                                                                                                                              |
|  14 - output([b.C1_RAND], [b.C3_RAND]), filter(nil), rowset=256                                                                                                  |
|  15 - output([b.C1_RAND], [b.C3_RAND]), filter(nil), rowset=256                                                                                                  |
|       access([b.C1_RAND], [b.C3_RAND]), partitions(p[0-4])                                                                                                       |
|       is_index_back=false, is_global_index=false,                                                                                                                |
|       range_key([b.__pk_increment]), range(MIN ; MAX)always true                                                                                                 |
|  16 - output([c.C3_RAND], [c.C2_RAND]), filter(nil), rowset=256                                                                                                  |
|  17 - output([c.C3_RAND], [c.C2_RAND]), filter(nil), rowset=256                                                                                                  |
|       (#keys=1, [c.C3_RAND]), dop=3                                                                                                                              |
|  18 - output([c.C3_RAND], [c.C2_RAND]), filter(nil), rowset=256                                                                                                  |
|  19 - output([c.C3_RAND], [c.C2_RAND]), filter(nil), rowset=256                                                                                                  |
|  20 - output([c.C2_RAND], [c.C3_RAND]), filter(nil), rowset=256                                                                                                  |
|  21 - output([c.C2_RAND], [c.C3_RAND]), filter([RF_IN_FILTER(c.C3_RAND)], [RF_RANGE_FILTER(c.C3_RAND)], [RF_BLOOM_FILTER(c.C3_RAND)], [RF_IN_FILTER(c.C2_RAND)], |
|        [RF_RANGE_FILTER(c.C2_RAND)], [RF_BLOOM_FILTER(c.C2_RAND)]), rowset=256                                                                                   |
|       access([c.C2_RAND], [c.C3_RAND]), partitions(p[0-4])                                                                                                       |
|       is_index_back=false, is_global_index=false, filter_before_indexback[false,false,false,false,false,false],                                                  |
|       range_key([c.__pk_increment]), range(MIN ; MAX)always true                                                                                                 |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
70 rows in set (0.103 sec)

5号算子对应RF0001,11号算子对应RF000,均下推到21号table full scan算子上进行过滤,此过程横跨了多个DFO,需要通过网络传输,属于Global Runtime Filter。

2、Runtime Filter生效功能场景

其次,从Runtime Filter生效的功能而言,Runtime Filter可以分为过滤连接数据的Filter和过滤分区的Filter两类。第一类为普通Runtime Filter,支持三种数据结构;第二类为Part Join Filter,当前仅支持Bloom Filter。下面重点介绍Part Join Filter。

Hash Join的执行流程是左侧数据阻塞建哈希表, 右侧逐行匹配数据。左侧建哈希表会获取左侧所有数据。如果可以获取左侧所有数据关于右侧某表的分区分布特征(按照右表的分区方式计算出左表关于右表的分区id), 那么在右侧扫描该表数据时,可以根据已统计的分区分布特征提前过滤掉不必要的分区,从而提升性能,Part Bloom Filter的引入正是为了优化这一场景。

想要在Hash Join左侧计算出按照右侧某表分区方式的具体分区,Join的连接键必须包含右侧该表的分区键,这是生成Part Bloom Filter的前提。

obclient [mysql]> create table t1(v1 int);
obclient [mysql]> create table t2(v1 int) partition by hash(v1) partitions 5;
obclient [mysql]> explain select /*+ parallel(3) leading(t1 t2) px_part_join_filter(t2)*/ * from t1 join t2 on t1.v1=t2.v1;
+-------------------------------------------------------------------------------+
| Query Plan                                                                    |
+-------------------------------------------------------------------------------+
| =======================================================================       |
| |ID|OPERATOR                           |NAME    |EST.ROWS|EST.TIME(us)|       |
| -----------------------------------------------------------------------       |
| |0 |PX COORDINATOR                     |        |1       |5           |       |
| |1 |└─EXCHANGE OUT DISTR               |:EX10001|1       |4           |       |
| |2 |  └─HASH JOIN                      |        |1       |4           |       |
| |3 |    ├─PART JOIN FILTER CREATE      |:RF0000 |1       |1           |       |
| |4 |    │ └─EXCHANGE IN DISTR          |        |1       |1           |       |
| |5 |    │   └─EXCHANGE OUT DISTR (PKEY)|:EX10000|1       |1           |       |
| |6 |    │     └─PX BLOCK ITERATOR      |        |1       |1           |       |
| |7 |    │       └─TABLE FULL SCAN      |t1      |1       |1           |       |
| |8 |    └─PX PARTITION HASH JOIN-FILTER|:RF0000 |1       |3           |       |
| |9 |      └─TABLE FULL SCAN            |t2      |1       |3           |       |
| =======================================================================       |
| Outputs & filters:                                                            |
| -------------------------------------                                         |
|   0 - output([INTERNAL_FUNCTION(t1.v1, t2.v1)]), filter(nil), rowset=256      |
|   1 - output([INTERNAL_FUNCTION(t1.v1, t2.v1)]), filter(nil), rowset=256      |
|       dop=3                                                                   |
|   2 - output([t1.v1], [t2.v1]), filter(nil), rowset=256                       |
|       equal_conds([t1.v1 = t2.v1]), other_conds(nil)                          |
|   3 - output([t1.v1]), filter(nil), rowset=256                                |
|       RF_TYPE(bloom), RF_EXPR[t1.v1]                                          |
|   4 - output([t1.v1]), filter(nil), rowset=256                                |
|   5 - output([t1.v1]), filter(nil), rowset=256                                |
|       (#keys=1, [t1.v1]), dop=3                                               |
|   6 - output([t1.v1]), filter(nil), rowset=256                                |
|   7 - output([t1.v1]), filter(nil), rowset=256                                |
|       access([t1.v1]), partitions(p0)                                         |
|       is_index_back=false, is_global_index=false,                             |
|       range_key([t1.__pk_increment]), range(MIN ; MAX)always true             |
|   8 - output([t2.v1]), filter(nil), rowset=256                                |
|       affinitize                                                              |
|   9 - output([t2.v1]), filter(nil), rowset=256                                |
|       access([t2.v1]), partitions(p[0-4])                                     |
|       is_index_back=false, is_global_index=false,                             |
|       range_key([t2.__pk_increment]), range(MIN ; MAX)always true             |
+-------------------------------------------------------------------------------+

在计划中显示为 PART JOIN FILTER CREATE 的3号算子和 PX PARTITION HASH JOIN-FILTER的8号算子就是一对Part Join Filter,3号算子创建Part Join Filter,他会应用到8号算子中对t2表的分区进行过滤。

手动分配Runtime Filter

在默认配置下,当Join基于连接键的过滤性高于一定条件时(目前约为60%)会默认创建三种Runtime Filter数据结构:

  • In Filter,内部使用hash表进行过滤判定;
  • Range Filter,内部使用最大最小值进行过滤判定;
  • Bloom Filter,自身就是一种过滤器。

在过滤优先级上,由于In Filter具有最准的过滤性,执行器当In Filter被启用时,其他两种Filter会被自适应关闭,在执行器根据实际NDV值确定是否使用In Filter。此外,每种Filter在实际计算中会根据实时过滤性自适应Disable Filter以及重新Enable Filter。

Runtime Filter的使用场景仅限于Hash Join,当连接类型不为Hash Join时,优化器将不会分配Runtime Filter。一般情况下,优化器会自动分配Runtime Filter,如果优化器并未分配Runtime Filter,大家也可以通过hint手动分配Runtime Filter。

我们可以通过px_join_fitler以及px_part_join_filter来手动打开runtime filter/part join filter。

/*+ px_join_filter(join_right_table_name)*/
/*+ px_part_join_filter(join_right_table_name)*/

也可以通过no_px_join_filter以及px_part_join_filter来手动关闭runtime filter/part join filter。

/*+ no_px_join_filter(join_right_table_name)*/
/*+ no_px_part_join_filter(join_right_table_name)*/

需要注意的是,在并行度为1的场景下,不会分配Runtime Filter,我们需要至少指定并行度为2。

/*+ parallel(2) */

在手动分配Runtime Filter的过程中,会涉及一些系统变量,以下提供了4个系统变量用于在需要场景下调整Runtime Filter执行相关策略。

系统变量1:runtime_filter_type。

-- oracle模式
alter system set runtime_filter_type = 'BLOOM_FILTER,RANGE,IN';
-- mysql模式
set runtime_filter_type = 'BLOOM_FILTER,RANGE,IN';

runtime_filter_type的默认值为'BLOOM_FILTER,RANGE,IN',表示启用三种类型的Runtime Filter,当runtime_filter_type='' 时,表示不启用任何类型的Runtime Filter。一般情况下,不需要特别指定runtime_filter_type类型,使用默认值即可,OceanBase会在优化和执行阶段选择最优的Runtime Filter类型进行过滤。

系统变量2:runtime_filter_max_in_num。

默认为1024,代表In Filter使用的默认NDV为1024。若执行时NDV > runtime_filter_max_in_num,则将在执行器自动关闭In filter。一般情况下,不建议用户将该值设置过大,因为在Build表NDV很大的情况下,In Filter的效果不如Bloom Filter。

系统变量3:runtime_filter_wait_time_ms。

Probe端等待Runtime Filter到达的最大等待时间默认10ms,当Runtime Filter到达后,Probe再吐数据。如果超过runtime_filter_wait_time_ms还未到达,则进入by pass阶段,此时不经Runtime Filter过滤直接开始吐数据;当某个时刻Runtime Filter达到后,Runtime Filter仍然会被启用并执行过滤。一般情况下,该值不需要调整,当实际使用Bloom Filter数据很大,且如果知道过滤性比较好的情况下,可以适当调大该值。

系统变量4:runtime_bloom_filter_max_size。

限制Bloom Filter的最大长度默认2048MB。当用户实际使用中build表的数据过多时,默认的Bloom Filter的最大长度将不能容纳数据,会导致Bloom Filter误判率增加,此时需要调大runtime_bloom_filter_max_size以降低其误判率,提高其过滤性。

OceanBasev4.2 Runtime Filter特性

OceanBase自3.1版本以来,支持了Join Bloom Filter用于在Join中执行器扫描数据时快速过滤数据,在4.0版本中,对于Join键为分区列或者分区列前缀场景支持了Partition Bloom FIlter动态过滤分区,之后又在4.1版本支持了多发Bloom Filter使相邻DFO和单个DFO内部均可生成多对Bloom Filter。

在OceanBase的4.2版本中,Join Filter能力再次扩充,在TPCH/TPCDS等场景中进一步获得性能提升,并将其统一称为Runtime Filter(RF)。具体来说,OceanBase的4.2版本新增了以下能力:

  • Global Runtime Filter,支持跨DFO计划中任意位置下压Runtime Filter。区别于之前版本中只能支持相邻DFO和DFO内部传输Runtime Filter,Global Runtime Filter可以在复杂的星形连接中完成横跨CTE及多个DFO下压。
  • 新增In、Range类型的Runtime Filter。在Join左表Join列NDV较小的情况下, 使用In Filter可以精确过滤右侧大表数据,对于数据为数值类型且连续分布通过构建MIN/MAX的Range Filter能够在存储层尤其是列式存储结构中快速过滤数据。

注意事项

以上就是OceanBase中的Runtime Filter能力及其在4.2版本的优化细节。需要注意的是,当Hash Join的过滤性不足时,使用Runtime Filter并不能解决问题,反而可能会导致轻微性能下降。因此,在选择手动开启Runtime Filter的时候,需要对查询场景进行仔细评估,以确定Runtime Filter是否适用。

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论