OceanBase 并行执行学习笔记 1 —— 并行执行概念

2024年 5月 7日 53.4k 0

From 晓楚:
   这是一篇迟来的并行执行系统化产品说明文档。
   自 2019 年以来,并行执行功能被快速应用于各个场景,其重要性一步步提升。
   但是,一直以来,缺少一份详尽的用户使用文档,这给业务团队使用并行执行功能带来很多困难。今天,我们要解决这个困难。

并行执行是让单个 SQL 语句使用多个 CPU 和 I/O 资源的能力。这篇文章将详细介绍并行执行的工作原理,并解释如何控制、管理和监控 OceanBase 数据库中的并行执行。

并行执行系列的内容分为以下七篇博客,本篇是其中的第一篇。

第一篇 并行执行概念
第二篇 设定并行度
第三篇 并发控制与排队
第四篇 并行执行分类
第五篇 并行执行控制参数
第六篇 并行执行诊断及调优技巧
第七篇 并行执行 PoC QuickStart

1 并行执行概念

并行执行是指将一个 SQL 查询任务分成多个子任务,并允许这些子任务在多个处理器上同时运行,以提高整个查询任务的执行效率。在现代计算机系统中,多核处理器、多线程和高速网络连接广泛应用,这使得并行执行成为了一种可行的高效率查询技术。

并行执行能够极大降低计算密集型大查询的响应时间,被广泛应用在离线数据仓库、实时报表、在线大数据分析等业务场景,同时还应用在批量导数,快速构建索引表等领域。

以下场景会从并行执行中获益:

  • 大表扫描,大表连接,大数据量排序,聚合
  • 大表 DDL 操作,如修改主键、改变列类型,建索引等
  • 从已有大数据建表(Create Table As Select)
  • 批量插入、删除、更新

本节包含如下内容:

  • 什么场景适用并行执行
  • 什么场景不适用并行执行
  • 硬件要求
  • 并行执行工作原理
  • 并行执行工作线程
  • 通过均衡负载来优化性能

1.1 什么场景适用并行执行

并行执行通过充分利用多个 CPU 和 IO 资源,以达到降低 SQL 执行时间的目的。

当满足下列条件时,使用并行执行会优于串行执行:

  • 访问的数据量大
  • SQL 并发低
  • 要求低延迟
  • 有充足的硬件资源

并行执行用多个处理器协同并发处理同一任务,在这样的系统中会有收益:

  • 多处理器系统(SMPs)、集群
  • IO 带宽足够
  • 内存富余(可用于处理内存密集型操作,如排序、建 hash 表等)
  • 系统负载不高,或有峰谷特征(如系统负载一般在 30% 以下)

如果你的系统不满足上述特征,那么并行执行可能不会带来显著收益。在一些高负载,小内存,或 IO 能力弱的系统里,并行执行甚至会带来负面效果。

并行执行不仅适用于离线数据仓库、实时报表、在线大数据分析等分析型系统,而且在 OLTP 领域也能发挥作用,可用于加速 DDL 操作、以及数据跑批工作等。但是,对于 OLTP 系统中的普通 SELECT 和 DML 语句,并行执行并不适用。

1.2 什么场景不适用并行执行

串行执行使用单个线程来执行数据库操作,在下面这些场景下使用串行执行优于并行执行:

  • Query 访问的数据量很小
  • 高并发
  • Query 执行时间小于 100 毫秒

并行执行一般不适用于如下场景:

  • 系统中的典型 SQL 执行时间都在毫秒级。并行查询本身有毫秒级的调度开销,对于短查询来说,并行执行带来的收益完全会被调度开销所抵消。
  • 系统负载本就很高。并行执行的设计目标就是去充分利用系统的空余资源,如果系统本身已经没有空余资源,那么并行执行并不能带来额外收益,相反还会影响系统整体性能。

1.3 硬件要求

并行执行对硬件没有特殊要求。需要注意的是,CPU 核数、内存大小、存储 I/O 性能、网络带宽都会影响并行执行性能,其中任意一项成为瓶颈都会拖累并行执行性能。

1.4 并行执行工作原理

并行执行将一个 SQL 查询任务分解成多个子任务,并调度这些子任务到多个处理器上运行。

本节包含如下内容:

  • SQL 语句的并行执行
  • 生产者消费者流水线模型
  • 并行的粒度
  • 生产者和消费者之间的数据分发方式
  • 生产者和消费者之间的数据传输机制

1.4.1 SQL 语句的并行执行

当一个 SQL 被解析为并行执行计划后,会按照下面的步骤执行:

  1. SQL 主线程(接收、解析SQL的线程)根据计划形态预约并行执行需要的线程资源。这些线程资源可能来自集群中的多台机器。
  2. SQL 主线程打开并行调度算子(PX COORDINATOR)。
  3. 并行调度算子解析计划,将它们切分成多个操作步骤,按照自底向上的顺序调度执行这些操作。每个操作都会尽可能并行执行。
  4. 当所有操作并行执行完成后,并行调度算子会接收计算结果,并将结果吐给它的上层算子(如 Aggregate 算子),串行完成剩余不可并行的计算(如最终的 SUM 计算)。

1.4.2 生产者-消费者流水线模型

并行执行使用生产者-消费者模型来进行流水执行。并行调度算子解析计划,将它们切分成多个操作步骤,每个操作步骤称之为一个 DFO(Data Flow Operation)

一般情况下,并行调度算子在同一时刻会启动两个 DFO,DFO 之间会以生产者-消费者的模式连接起来,这称为 DFO 间的并行执行。每个 DFO 会使用一组线程来执行,这称为 DFO 内的并行执行,这个 DFO 使用的线程数称为 DOP(Degree Of Parallisim)

上一阶段的消费者 DFO 会成为下一阶段的生产者 DFO。在并行调度算子的协调下,会同时启动消费者 DFO 和生产者 DFO。

下图中:

(1)DFO A 生成的数据会立即传输给DFO B 进行计算;

(2)DFO B 完成计算后,会将数据暂存在当前线程中,等待它的上层 DFO C 启动;

(3)当 DFO B 收到 DFO C 启动完成通知后,会将自己的角色转变成生产者,开始向 DFO C 传输数据,DFO C 收到数据后开始计算。

OceanBase 并行执行学习笔记 1 —— 并行执行概念-1

考虑下面的查询:

create table game (round int primary key, team varchar(10), score int)
    partition by hash(round) partitions 3;

insert into game values (1, "CN", 4), (2, "CN", 5), (3, "JP", 3);
insert into game values (4, "CN", 4), (5, "US", 4), (6, "JP", 4);

select /*+ parallel(3) */ team, sum(score) total from game group by team;

查询语句对应的执行计划:

OceanBase(admin@test)>explain select /*+ parallel(3) */ team, sum(score) total from game group by team;
+---------------------------------------------------------------------------------------------------------+
| Query Plan                                                                                              |
+---------------------------------------------------------------------------------------------------------+
| =================================================================                                       |
| |ID|OPERATOR                     |NAME    |EST.ROWS|EST.TIME(us)|                                       |
| -----------------------------------------------------------------                                       |
| |0 |PX COORDINATOR               |        |1       |4           |                                       |
| |1 | EXCHANGE OUT DISTR          |:EX10001|1       |4           |                                       |
| |2 |  HASH GROUP BY              |        |1       |4           |                                       |
| |3 |   EXCHANGE IN DISTR         |        |3       |3           |                                       |
| |4 |    EXCHANGE OUT DISTR (HASH)|:EX10000|3       |3           |                                       |
| |5 |     HASH GROUP BY           |        |3       |2           |                                       |
| |6 |      PX BLOCK ITERATOR      |        |1       |2           |                                       |
| |7 |       TABLE SCAN            |game    |1       |2           |                                       |
| =================================================================                                       |
| Outputs & filters:                                                                                      |
| -------------------------------------                                                                   |
|   0 - output([INTERNAL_FUNCTION(game.team, T_FUN_SUM(T_FUN_SUM(game.score)))]), filter(nil), rowset=256 |
|   1 - output([INTERNAL_FUNCTION(game.team, T_FUN_SUM(T_FUN_SUM(game.score)))]), filter(nil), rowset=256 |
|       dop=3                                                                                             |
|   2 - output([game.team], [T_FUN_SUM(T_FUN_SUM(game.score))]), filter(nil), rowset=256                  |
|       group([game.team]), agg_func([T_FUN_SUM(T_FUN_SUM(game.score))])                                  |
|   3 - output([game.team], [T_FUN_SUM(game.score)]), filter(nil), rowset=256                             |
|   4 - output([game.team], [T_FUN_SUM(game.score)]), filter(nil), rowset=256                             |
|       (#keys=1, [game.team]), dop=3                                                                     |
|   5 - output([game.team], [T_FUN_SUM(game.score)]), filter(nil), rowset=256                             |
|       group([game.team]), agg_func([T_FUN_SUM(game.score)])                                             |
|   6 - output([game.team], [game.score]), filter(nil), rowset=256                                        |
|   7 - output([game.team], [game.score]), filter(nil), rowset=256                                        |
|       access([game.team], [game.score]), partitions(p[0-2])                                             |
|       is_index_back=false, is_global_index=false,                                                       |
|       range_key([game.round]), range(MIN ; MAX)always true                                              |
+---------------------------------------------------------------------------------------------------------+
29 rows in set (0.003 sec)

select语句的执行计划会首先对 game 表做全表扫描,按照 team 做分组求和,然后算出每个 team 的总分数。这个查询的执行示意图如下:

OceanBase 并行执行学习笔记 1 —— 并行执行概念-2

从图中可以看到,这个查询实际使用了 6 个线程。

  • 第一步:前三个线程负责 game 表扫描,并且在每个线程内对 game.team 数据做预聚合;
  • 第二步:后三个线程负责对预聚合的数据做最终聚合;
  • 第三步:最终聚合结果发给并行调度器,由它返回给客户端。

第一步的数据发给第二步时,需要用 game.team 字段做 hash,决定将预聚合数据发给哪个线程。

1.4.3 并行的粒度

并行数据扫描的基本工作单元称为 granule。OceanBase 将表扫描工作划分成多个 granule,每个 granule 描述了一段扫描任务的范围。因为 granule 不会跨表分区 ,所以每段扫描任务一定是位于一个分区内。

根据 granule 的粒度特征,可以划分为两类:

  • Partition Granule

Partition granule 描述的范围是一整个分区,扫描任务涉及到多少个分区,就会划分出多少个 partition granule。这里的分区既可以是主表分区,也可以是索引分区。

Partition Granule 最常用的应用场景是 partition wise join,两张表里对应的分区通过 partition granule 可以确保被同一个工作线程处理。

  • Block Granule

Block granule 描述的范围是一个分区中的一段连续数据。数据扫描场景里,一般都是使用 block granule 划分数据。每个分区都会被划分为若干个 block,这些 block 再以一定的规则串联起来形成一个任务队列,供并行工作线程消费。

OceanBase 并行执行学习笔记 1 —— 并行执行概念-3

在给定并行度的情况下,为了确保扫描任务的均衡,优化器会自动选择将数据划分成分区粒度(Partition Granule)或块粒度(Block Granule)。如果选择了 Block Granule,并行执行框架会在运行时决策 block 的划分,总体原则是确保一个 block 既不会太大,也不会太小。太大,可能导致数据倾斜,让部分线程少干活;太小,会导致频繁的扫描切换开销。

划分好分区粒度后,每个粒度对应一个扫描任务。Table Scan 扫描算子会一个接一个地处理这些扫描任务,处理完一个之后,接着处理下一个,直到所有任务处理完毕。

1.4.4 生产者和消费者之间的数据分发方式

数据分发方式指的是,数据从一组并行执行工作线程(生产者)发送给另一组(消费者)时使用的方法。优化器会使用一系列的优化策略,决策使用哪种数据重分布方式,以达到最优的性能。

并行执行中的常见数据分发方式包括:

  • Hash Distribution

使用 Hash distribution 发送数据时,生产者根据 distribution key 对数据行计算 hash 值并取模,算出发给哪个消费者工作线程。大部分情况下,使用 hash distribution 能将数据较为均匀的分发给多个消费者线程。

  • Pkey Distribution

使用 Pkey Distribution 时,生产者计算出数据行对应的目标表所在分区,然后将行数据发给处理这个分区的消费者线程。

Pkey Distribution 常用于 partitial partitions wise join 场景。在 partitial partitions wise join 场景下,消费者侧的数据不需要做重分布,就可以和生产者侧的数据做 Partition Wise Join。这种方式可以减少网络通信量,提升性能。

  • Pkey Hash Distribution

使用 Pkey Hash Distribution 时,生产者首先需要计算出数据行对应的目标表所在的分区。然后,根据 distribution key 对数据行进行 hash 计算,以便决定将其发给哪一个消费者线程来处理。

Pkey Hash Distribution 常常应用于 Parallel DML 场景中。在这种场景下,一个分区可以被多个线程并发更新,因此需要使用 Pkey Hash Distribution 来确保相同值的数据行被同一个线程处理,不同值的数据行尽可能均分到多个线程处理。

  • Broadcast Distribution

使用 broadcast distribution 时,生产者将每一个数据行发送消费者端的每一个线程,使得消费者端每一个线程都拥有全量的生产者端数据。

Broadcast distribution 常用于将小表数据复制到所有执行 join 的节点,然后做本地 join 操作。这种方式可以减少网络通信量。

  • Broadcast to Host Distribution(简称 BC2HOST)

使用 broadcast to host distribution 时,生产者将每一个数据行发送消费者端的每一个节点上,使得消费者端每一个节点都拥有全量的生产者端数据。然后,节点里的消费者线程协同处理这份数据。

Broadcast to host distribution 常用于 NESTED LOOP JOIN、SHARED HASH JOIN 场景。NESTED LOOP JOIN 场景里,消费端的每个线程会从共享数据里取一部分数据行作为驱动数据,去目标表里做 join 操作;SHARED HASH JOIN 场景里,消费端的每个线程会基于共享数据协同构建 hash 表,避免每个线程独立构建相同 hash 表导致的不必要开销。

  • Range Distribution

使用 Range distribution 时,生产者将数据按照 range 范围做划分,让不同消费者线程处理不同范围的数据。

Range distribution 常用于排序场景,各个消费者线程只需排序好发给自己的数据,数据就能在全局范围内有序。

  • Random Distribution

使用 Random distribution 时,生产者将数据随机打散,发给消费者线程,使得每个消费者线程处理的数据数量几乎一致,从而达到均衡负载的目的。

Random distribution 常用于多线程并行 UNION ALL 场景,该场景只要求数据打散,负载均衡,数据之间无其它关联约束。

  • Hybrid Hash Distribution

Hybrid hash distribtuion 用于自适应的 join 算法。结合收集的统计信息,OceanBase 提供了一组配置项来定义常规值和高频值。Hybrid hash distribtuion 方法将 join 两侧的常规值做 hash 分布,左侧的高频值使用 broadcast 分布,右侧的高频值使用 random 分布。
OceanBase 并行执行学习笔记 1 —— 并行执行概念-4

1.4.5 生产者和消费者之间的数据传输机制

并行调度算子在同一时刻会启动两个 DFO,DFO 之间会以生产者-消费者的模式连接起来并行执行。为了方便在生产者和消费者之间传输数据,需要创建一个传输网络。

例如,生产者 DFO 以 DOP = 2 来做数据扫描,消费者 DFO 以 DOP = 3 来做数据聚合计算,每个生产者线程都会创建 3 个虚拟链接去连接消费者线程,总计会创建 6 个虚拟链接。如下图:

OceanBase 并行执行学习笔记 1 —— 并行执行概念-5

生产者和消费者之间创建的虚拟传输网络被称为数据传输层(Data Transfer Layer,简称 DTL)。OceanBase 并行执行框架中,所有的控制消息和行数据都通过 DTL 进行收发。每个工作线程可以对外建立数千个虚拟链接,具有高度的可扩展性。除此之外,DTL 还具有数据缓冲、批量数据发送和自动流量控制等能力。

当 DTL 链接的两端位于同一个节点时,DTL 会通过内存拷贝的方式来传递消息;当 DTL 链接的两端位于不同节点时,DTL 会通过网络通信的方式来传递消息。

1.5 并行执行工作线程

一个并行查询会使用两类线程:1个主线程,若干个并行工作线程。其中主线程和普通的 TP 查询使用的线程没有任何区别,来自普通工作线程池,并行工作线程来自专用线程池。

OceanBase 使用专用线程池模型来分配并行工作线程。每个租户在其所属的各个节点里都有一个租户专属的并行执行线程池,并行查询工作线程都通过这个线程池来分配。

并行调度算子在调度每个 DFO 之前,会去线程池中申请线程资源。当 DFO 执行完成时,会立即源释放线程资源。

线程池的初始大小为 0,按需增长,没有上限。为了避免空闲线程数过多,线程池引入自动回收机制。对于任意线程:

  • 如果空闲时间超过 10 分钟,并且线程池中剩余线程数大于 8 个,则被回收销毁;
  • 如果空闲时间超过 60 分钟,则无条件销毁

虽然线程池的大小没有上限,但是通过下面两个机制,能在绝大多数场景里形成事实上限:

  1. 并行执行开始执行前,需要通过 Admission 模块预约线程资源,预约成功后才能投入执行。通过这个机制,能限制并发查询数量。Admission 模块的详细内容参考本文中 《3 并发控制与排队》一节。
  2. 查询每次从线程池申请线程时,单次申请的线程数量不会超过 N, N 等于租户 UNIT 的 MIN_CPU 乘以 px_workers_per_cpu_quota,如果超过,则最多只分配 N 个线程。px_workers_per_cpu_quota 是租户级配置项,默认值为 10。例如,一个 DFO 的 DOP = 100,它需要从 A 节点申请 30 个线程,从 B 节点申请 70 个线程,UNIT 的 MIN_CPU = 4,px_workers_per_cpu_quota = 10,那么 N = 4 * 10 = 40。最终这个 DFO 在 A 节点上实际申请到 30 个线程,B 节点上实际申请到 40 个线程,它的实际 DOP 为 70。

1.6 通过均衡负载来优化性能

为了达到最优性能,所有工作线程分到的工作任务应该尽量相等。

SQL 使用 Block Granule 划分任务的时候,工作任务会动态地分配到工作线程之间。这样可以最小化工作负载不均衡问题,即一些工作线程的工作量不会明显超过其它工作线程。SQL 使用 Partition Granule 划分任务时,可以通过让任务数是工作线程数的整数倍来优化性能。这适用于 Partition Wise Join 和并行 DML 场景。

举个例子,假设一个表有 16 个分区,每个分区的数据量差不多。你可以使用 16 工作线程(DOP 等于 16)以大约十六分之一的时间完成工作,你也可以使用五个工作线程以五分之一的时间完成工作,或使用两个线程以一半的时间完成工作。

但是,如果你使用 15 个线程来处理 16 个分区,则第一个线程完成一个分区的工作后,就开始处理第 16 个分区。而其他线程完成工作后,它们变为空闲状态。当每个分区的数据量差不多时,这种配置会导致性能不优;当每个分区的数据量有所差异时,实际性能则会因情况而异。

类似地,假设你使用 6 个线程来处理 16 个分区,每个分区的数据量差不多。在这种情况下,每个线程在完成其第一个分区后,会处理第二个分区,但只有四个线程会处理第三个分区,而其他两个线程会保持空闲。

一般来说,不能假设在给定数量的分区(N)和给定数量的工作线程(P)上执行并行操作所花费的时间等于 N 除以 P。这个公式没有考虑到一些线程可能需要等待其他线程完成最后的分区。但是,通过选择适当的 DOP,可以最小化工作负载不均衡问题并优化性能。

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论