OceanBase 并行执行学习笔记 2 —— 设定并行度

2024年 5月 7日 103.5k 0

并行度(degree of parallelism,简称 DOP)指的是单个 DFO 在执行时使用的工作线程数。并行执行的设计目的就是为了高效利用多核资源。OceanBase 并行执行框架提供了多种方式指定并行度,既可以手工指定,也可以利用 Auto DOP 的能力让数据库帮你自动选择。本篇博客主要介绍如何通过手工来指定并行度。

并行执行系列的内容分为以下七篇博客,本篇是其中的第二篇。

第一篇 并行执行概念
第二篇 设定并行度
第三篇 并发控制与排队
第四篇 并行执行分类
第五篇 并行执行控制参数
第六篇 并行执行诊断及调优技巧
第七篇 并行执行 PoC QuickStart

2.1 手工指定并行度

可以对一张表指定并行度,使得对这张表的扫描总是使用并行执行。

2.1.1 指定并行度的几种方式

表属性指定并行度

下面的语句分别指定一张主表和一个索引的扫描并行度。

ALTER TABLE table_name PARALLEL 4;
ALTER TABLE table_name ALTER INDEX index_name PARALLEL 2;

如果一个 SQL 中只涉及一张表,当 SQL 中查询了主表时,除了主表所在的 DFO,其余 DFO 也会使用并行度 4 来执行;当 SQL 查询了索引表时,除了索引表所在的 DFO,其余 DFO 也会使用并行度 2 来执行。

如果一个 SQL 中涉及多张表,则会使用 PARALLEL 最大值作为整个计划的 DOP。

PARALLEL HINT 指定并行度

可以通过全局 PARALLEL HINT 来指定整个 SQL 的并行度,也可以通过表级的 PARALLEL HINT 来指定特定表的并行度。当一个 SQL 里给定了多个表的 PARALLEL HINT,那么各个表所在 DFO 的 DOP 由各个表的 Parallel 值决定。如果一个 DFO 里包含多张表,则会取他们的 PARALLEL 最大值作为 DFO 的 DOP。

OceanBase(TEST@TEST)>create table t1 (c1 int, c2 int);
Query OK, 0 rows affected (0.167 sec)

OceanBase(TEST@TEST)>explain select /*+ parallel(3) */ sum(c1) from t1;
+----------------------------------------------------------------------+
| Query Plan                                                           |
+----------------------------------------------------------------------+
| =========================================================            |
| |ID|OPERATOR             |NAME    |EST.ROWS|EST.TIME(us)|            |
| ---------------------------------------------------------            |
| |0 |SCALAR GROUP BY      |        |1       |2           |            |
| |1 | PX COORDINATOR      |        |3       |2           |            |
| |2 |  EXCHANGE OUT DISTR |:EX10000|3       |2           |            |
| |3 |   MERGE GROUP BY    |        |3       |1           |            |
| |4 |    PX BLOCK ITERATOR|        |1       |1           |            |
| |5 |     TABLE SCAN      |T1      |1       |1           |            |
| =========================================================            |
| Outputs & filters:                                                   |
| -------------------------------------                                |
|   0 - output([T_FUN_SUM(T_FUN_SUM(T1.C1))]), filter(nil), rowset=256 |
|       group(nil), agg_func([T_FUN_SUM(T_FUN_SUM(T1.C1))])            |
|   1 - output([T_FUN_SUM(T1.C1)]), filter(nil), rowset=256            |
|   2 - output([T_FUN_SUM(T1.C1)]), filter(nil), rowset=256            |
|       dop=3                                                          |
|   3 - output([T_FUN_SUM(T1.C1)]), filter(nil), rowset=256            |
|       group(nil), agg_func([T_FUN_SUM(T1.C1)])                       |
|   4 - output([T1.C1]), filter(nil), rowset=256                       |
|   5 - output([T1.C1]), filter(nil), rowset=256                       |
|       access([T1.C1]), partitions(p0)                                |
|       is_index_back=false, is_global_index=false,                    |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true    |
+----------------------------------------------------------------------+
24 rows in set (0.002 sec)
OceanBase(TEST@TEST)>create table t1 (c1 int, c2 int);
Query OK, 0 rows affected (0.167 sec)

OceanBase(TEST@TEST)>create table t2 (c1 int, c2 int);
Query OK, 0 rows affected (0.150 sec)

OceanBase(TEST@TEST)>select /*+ parallel(t1 3) */ * from t1, t2 where t1.c2 = t2.c1;
Empty set (0.041 sec)

OceanBase(TEST@TEST)>explain select /*+ parallel(t1 3) */ * from t1, t2 where t1.c2 = t2.c1;
+----------------------------------------------------------------------------------------+
| Query Plan                                                                             |
+----------------------------------------------------------------------------------------+
| =================================================================                      |
| |ID|OPERATOR                     |NAME    |EST.ROWS|EST.TIME(us)|                      |
| -----------------------------------------------------------------                      |
| |0 |PX COORDINATOR               |        |1       |7           |                      |
| |1 | EXCHANGE OUT DISTR          |:EX10002|1       |6           |                      |
| |2 |  HASH JOIN                  |        |1       |5           |                      |
| |3 |   EXCHANGE IN DISTR         |        |1       |2           |                      |
| |4 |    EXCHANGE OUT DISTR (HASH)|:EX10000|1       |2           |                      |
| |5 |     PX BLOCK ITERATOR       |        |1       |1           |                      |
| |6 |      TABLE SCAN             |T1      |1       |1           |                      |
| |7 |   EXCHANGE IN DISTR         |        |1       |4           |                      |
| |8 |    EXCHANGE OUT DISTR (HASH)|:EX10001|1       |4           |                      |
| |9 |     PX PARTITION ITERATOR   |        |1       |2           |                      |
| |10|      TABLE SCAN             |T2      |1       |2           |                      |
| =================================================================                      |
| Outputs & filters:                                                                     |
| -------------------------------------                                                  |
|   0 - output([INTERNAL_FUNCTION(T1.C1, T1.C2, T2.C1, T2.C2)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1, T1.C2, T2.C1, T2.C2)]), filter(nil), rowset=256 |
|       dop=3                                                                            |
|   2 - output([T1.C2], [T2.C1], [T1.C1], [T2.C2]), filter(nil), rowset=256              |
|       equal_conds([T1.C2 = T2.C1]), other_conds(nil)                                   |
|   3 - output([T1.C2], [T1.C1]), filter(nil), rowset=256                                |
|   4 - output([T1.C2], [T1.C1]), filter(nil), rowset=256                                |
|       (#keys=1, [T1.C2]), dop=3                                                        |
|   5 - output([T1.C2], [T1.C1]), filter(nil), rowset=256                                |
|   6 - output([T1.C2], [T1.C1]), filter(nil), rowset=256                                |
|       access([T1.C2], [T1.C1]), partitions(p0)                                         |
|       is_index_back=false, is_global_index=false,                                      |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true                      |
|   7 - output([T2.C1], [T2.C2]), filter(nil), rowset=256                                |
|   8 - output([T2.C1], [T2.C2]), filter(nil), rowset=256                                |
|       (#keys=1, [T2.C1]), dop=1                                                        |
|   9 - output([T2.C1], [T2.C2]), filter(nil), rowset=256                                |
|       force partition granule                                                          |
|  10 - output([T2.C1], [T2.C2]), filter(nil), rowset=256                                |
|       access([T2.C1], [T2.C2]), partitions(p0)                                         |
|       is_index_back=false, is_global_index=false,                                      |
|       range_key([T2.__pk_increment]), range(MIN ; MAX)always true                      |
+----------------------------------------------------------------------------------------+
39 rows in set (0.002 sec)

对于 DML 语句,使用上面的 HINT,只会使得 DML 语句的查询部分走并行,写入部分还是串行。如果希望写入部分也并行,则需要还添加一个 HINT ENABLE_PARALLEL_DML,例如:

insert /*+ parallel(3) enable_parallel_dml */ into t3 select * from t1;

注意:并行 DML 必须指定全局 parallel hint。仅指定表级 parallel hint 无法让写入部分串行。例如,下面的 SQL 不会开启 DML 并行:

insert /*+ parallel(t3 3) enable_parallel_dml */ into t3 select * from t1;

SESSION 上指定并行度

在当前 session 上指定并行度后,session 上的所有查询语句都将以这个并行度执行。需要注意,即使是单行查询的 SQL,也会使用该并行度,可能导致性能下降。

set _force_parallel_query_dop = 3

对于 DML 语句,使用上面的命令,只会使得 DML 语句的查询部分走并行,写入部分还是串行。如果希望写入部分也并行,则需要用下面的命令:

set _force_parallel_dml_dop = 3

2.1.2 并行度优先级

全局 HINT > TABLE HINT > SESSION 并行度 > TABLE DOP

对比全局 HINT 和 TABLE HINT 的优先级。可以看到,有全局 HINT 时,TABLE HINT 不生效。

OceanBase(TEST@TEST)>explain select /*+ parallel(2) parallel(t1 3) */ * from t1;
+-------------------------------------------------------------------+
| Query Plan                                                        |
+-------------------------------------------------------------------+
| =======================================================           |
| |ID|OPERATOR           |NAME    |EST.ROWS|EST.TIME(us)|           |
| -------------------------------------------------------           |
| |0 |PX COORDINATOR     |        |1       |2           |           |
| |1 | EXCHANGE OUT DISTR|:EX10000|1       |2           |           |
| |2 |  PX BLOCK ITERATOR|        |1       |1           |           |
| |3 |   TABLE SCAN      |T1      |1       |1           |           |
| =======================================================           |
| Outputs & filters:                                                |
| -------------------------------------                             |
|   0 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|       dop=2                                                       |
|   2 - output([T1.C1]), filter(nil), rowset=256                    |
|   3 - output([T1.C1]), filter(nil), rowset=256                    |
|       access([T1.C1]), partitions(p0)                             |
|       is_index_back=false, is_global_index=false,                 |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------+
18 rows in set (0.002 sec)

OceanBase(TEST@TEST)>explain select /*+ parallel(t1 3) */ * from t1;
+-------------------------------------------------------------------+
| Query Plan                                                        |
+-------------------------------------------------------------------+
| =======================================================           |
| |ID|OPERATOR           |NAME    |EST.ROWS|EST.TIME(us)|           |
| -------------------------------------------------------           |
| |0 |PX COORDINATOR     |        |1       |2           |           |
| |1 | EXCHANGE OUT DISTR|:EX10000|1       |1           |           |
| |2 |  PX BLOCK ITERATOR|        |1       |1           |           |
| |3 |   TABLE SCAN      |T1      |1       |1           |           |
| =======================================================           |
| Outputs & filters:                                                |
| -------------------------------------                             |
|   0 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|       dop=3                                                       |
|   2 - output([T1.C1]), filter(nil), rowset=256                    |
|   3 - output([T1.C1]), filter(nil), rowset=256                    |
|       access([T1.C1]), partitions(p0)                             |
|       is_index_back=false, is_global_index=false,                 |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------+
18 rows in set (0.002 sec)

对比 session 和 table hint 的优先级。可以看到,使用 table 级的 HINT 时,session 设定的 DOP 不生效。

OceanBase(TEST@TEST)>alter session force parallel query parallel 4;
Query OK, 0 rows affected (0.001 sec)

OceanBase(TEST@TEST)>explain select /*+ parallel(t1 3) */ * from t1;
+-------------------------------------------------------------------+
| Query Plan                                                        |
+-------------------------------------------------------------------+
| =======================================================           |
| |ID|OPERATOR           |NAME    |EST.ROWS|EST.TIME(us)|           |
| -------------------------------------------------------           |
| |0 |PX COORDINATOR     |        |1       |2           |           |
| |1 | EXCHANGE OUT DISTR|:EX10000|1       |1           |           |
| |2 |  PX BLOCK ITERATOR|        |1       |1           |           |
| |3 |   TABLE SCAN      |T1      |1       |1           |           |
| =======================================================           |
| Outputs & filters:                                                |
| -------------------------------------                             |
|   0 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|       dop=3                                                       |
|   2 - output([T1.C1]), filter(nil), rowset=256                    |
|   3 - output([T1.C1]), filter(nil), rowset=256                    |
|       access([T1.C1]), partitions(p0)                             |
|       is_index_back=false, is_global_index=false,                 |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------+
18 rows in set (0.002 sec)

OceanBase(TEST@TEST)>explain select * from t1;
+-------------------------------------------------------------------+
| Query Plan                                                        |
+-------------------------------------------------------------------+
| =======================================================           |
| |ID|OPERATOR           |NAME    |EST.ROWS|EST.TIME(us)|           |
| -------------------------------------------------------           |
| |0 |PX COORDINATOR     |        |1       |1           |           |
| |1 | EXCHANGE OUT DISTR|:EX10000|1       |1           |           |
| |2 |  PX BLOCK ITERATOR|        |1       |1           |           |
| |3 |   TABLE SCAN      |T1      |1       |1           |           |
| =======================================================           |
| Outputs & filters:                                                |
| -------------------------------------                             |
|   0 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|       dop=4                                                       |
|   2 - output([T1.C1]), filter(nil), rowset=256                    |
|   3 - output([T1.C1]), filter(nil), rowset=256                    |
|       access([T1.C1]), partitions(p0)                             |
|       is_index_back=false, is_global_index=false,                 |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------+
18 rows in set (0.002 sec)

对比 session 和 table parallel 的优先级。可以看到 table parallel 属性的优先级低于 session 指定 dop 的优先级。

OceanBase(TEST@TEST)>alter table t1 parallel 5;
Query OK, 0 rows affected (0.135 sec)

OceanBase(TEST@TEST)>explain select * from t1;
+-------------------------------------------------------------------+
| Query Plan                                                        |
+-------------------------------------------------------------------+
| =======================================================           |
| |ID|OPERATOR           |NAME    |EST.ROWS|EST.TIME(us)|           |
| -------------------------------------------------------           |
| |0 |PX COORDINATOR     |        |1       |1           |           |
| |1 | EXCHANGE OUT DISTR|:EX10000|1       |1           |           |
| |2 |  PX BLOCK ITERATOR|        |1       |1           |           |
| |3 |   TABLE SCAN      |T1      |1       |1           |           |
| =======================================================           |
| Outputs & filters:                                                |
| -------------------------------------                             |
|   0 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|       dop=5                                                       |
|   2 - output([T1.C1]), filter(nil), rowset=256                    |
|   3 - output([T1.C1]), filter(nil), rowset=256                    |
|       access([T1.C1]), partitions(p0)                             |
|       is_index_back=false, is_global_index=false,                 |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------+
18 rows in set (0.002 sec)

OceanBase(TEST@TEST)>alter session force parallel query parallel 4;
Query OK, 0 rows affected (0.000 sec)

OceanBase(TEST@TEST)>explain select * from t1;
+-------------------------------------------------------------------+
| Query Plan                                                        |
+-------------------------------------------------------------------+
| =======================================================           |
| |ID|OPERATOR           |NAME    |EST.ROWS|EST.TIME(us)|           |
| -------------------------------------------------------           |
| |0 |PX COORDINATOR     |        |1       |1           |           |
| |1 | EXCHANGE OUT DISTR|:EX10000|1       |1           |           |
| |2 |  PX BLOCK ITERATOR|        |1       |1           |           |
| |3 |   TABLE SCAN      |T1      |1       |1           |           |
| =======================================================           |
| Outputs & filters:                                                |
| -------------------------------------                             |
|   0 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(T1.C1)]), filter(nil), rowset=256 |
|       dop=4                                                       |
|   2 - output([T1.C1]), filter(nil), rowset=256                    |
|   3 - output([T1.C1]), filter(nil), rowset=256                    |
|       access([T1.C1]), partitions(p0)                             |
|       is_index_back=false, is_global_index=false,                 |
|       range_key([T1.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------+
18 rows in set (0.002 sec)

2.1.3 并行度取值

先讲一个简单原则:1. parallel 的设定目的是为了把 CPU 用满。2. parallel 不是越大越好。举个例子:当租户 CPU 为 20 的时:

  • 对于单表简单操作(如单表扫描过滤、单表增删改)parallel 理论值为 20。
  • 对于多表 join 的查询、带全局索引的 PDML,parallel 理论值为 10。
  • 对于一些更加复杂的计划,形态为右深树时,parallel 理论值为 7 左右。

对上面的例子做解释

  • 对于单表操作,只有一个 DFO,可以把 20 个 CPU 全部给这个 DFO 用。
  • 对于多表 join,同一时间会启动 2 个 DFO,形成数据流水线(一个 DFO 是 producer、一个 DFO 是consumer),所有每个 DFO 分 10 个 CPU。
  • 对于更加复杂的右深树计划,我们同一时间会启动 3 个 DFO,此时每个 DFO 大约分 7 个 CPU 可以保证他们都能跑得比较快。

但是,以上只是一般原则,实际操作中还是应该做微调。解释如下:

  • 对于单表操作,只有一个 DFO,可以把 20 个 CPU 全部给这个 DFO 用。

假设这个 DFO 大部分时间是做 IO,那么不妨把并发度设置得比 20 稍高一些,比如 25,这样可以充分利用 CPU

  • 对于多表 join,同一时间会启动 2 个 DFO,形成数据流水线(一个 DFO 是 producer、一个 DFO 是consumer),所有每个 DFO 分 10 个 CPU。

我们实际并不能保证每个 DFO 都能把分给他的 CPU 压满。所以,把并发度适当稍微调整高一些,比如 15 也许能更充分地利用 CPU。 ** 但是,不能无限往上加。把并发度加到 50,根本不会有收益,相反还会增加线程调度开销、框架调度开销。

  • 对于更加复杂的右深树计划,我们同一时间会启动 3 个 DFO,此时每个 DFO 大约分 7 个 CPU 可以保证他们都能跑得比较快。

一个计划里,并不是处处都要启动 3 个 DFO,3 DFO 的情况可能是局部的。大部分时候可能还是同时启动 2 个 DFO。 所以这时候把并发设置为 10 也许比 7 要好。

微调后,当租户 CPU 为 20 的时,一个可能的情况如下:

  • 对于单表简单操作(如单表扫描过滤、单表增删改)parallel 实践值为 30。
  • 对于多表 join 的查询、带全局索引的 PDML,parallel 实践值为 15。
  • 对于一些更加复杂的计划,形态为右深树时,parallel 实践值为 10 ~ 15 左右。

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论