一文详解 OceanBase 并行执行引擎实现

2024年 5月 7日 60.6k 0

摘要本文整理自OceanBase TechTalk第四期杭州站由蚂蚁金服OB团队技术专家余璜(花名:晓楚)的演讲,本文将带读者深入了解OceanBase 2.0的并行执行框架。Tips:您可以关注“OceanBase”公众号回复“0512”获取现场PPT

一文详解 OceanBase 并行执行引擎实现-1

一文详解 OceanBase 并行执行引擎实现-2

背景介绍


OceanBase 在公司内外越来越多的场景落地,需求也开始多样化。客户不仅希望 OceanBase 提供优异的 OLTP 能力,还希望无需导出数据到第三方平台,就地提供 OLAP(Online Analytical Processing)能力。


在 OceanBase 1.4 版本中,我们已经提供了基本的分布式执行能力,但由于其设计考量的核心应用场景还是给类 OLTP 查询提供基础分布式数据访问能力,对于大数据量的查询场景,以及分布式复杂查询的支持都比较有限,主要存在以下几个问题:


  1. 扫描粒度固定:只能以分区为基本单位扫描数据,导致大量分区数场景下需要发起大量 RPC;不支持分区内并行
  2. 调度能力有限:单步调度,无法进一步发掘并行能力
  3. 数据不能流水:中间结果落盘,无法形成数据流水线

为了更好地适应客户需求,我们在 OceanBase 2.0 版本中引入了全新的并行执行框架,融合了 1.4 版本的分布式数据访问能力,同时增强了并行执行的能力,支持对 OceanBase 中的海量数据进行在线分析处理,使得 OceanBase 向 OLTP + OLAP 融合型数据库的路上迈出了第一步


并行执行框架

OceanBase 是一个 Share Nothing 的数据库,数据以分片的形式存储于每个节点,节点之间通过千兆、万兆网络通信。

一文详解 OceanBase 并行执行引擎实现-3 

OceanBase 分布式计算架构示意图


一般会在每个节点上部署一个叫做 observer 的进程,它是 OceanBase 对外服务的主体。OceanBase 会根据一定的均衡策略将数据分片均衡到多个 observer 上,于是,一个并行查询,一般需要同时访问多个 OBserver。


一文详解 OceanBase 并行执行引擎实现-4

数据分片示意图


SQL语句并行执行流程


当用户给定的 SQL 语句需要访问的数据位于 2 台或 2 台以上 observer 时,就会启用并行执行,会执行如下步骤:

  1. 用户所连接的这个 observer 将承担查询协调者(Query Coordinator,QC)的角色
  2. QC 预约足够的线程资源
  3. QC 将需要并行的计划拆成多个子计划(下称 DFO, Data Flow Operation),每个 DFO 包含若干个串行执行的算子。例如,一个 DFO 里包含了扫描分区,聚集,发送算子,另外一个 DFO 里包含了收集、聚集算子等。
  4. QC 按照一定的逻辑顺序将 DFO 调度到合适的 observer 上执行,observer 上会临时启动一个辅助协调者(Sub Query Coordinator,SQC),SQC 负责在所在 observer 上为各个 DFO 申请执行资源、构造执行上下文环境等,然后启动 DFO 在各个 observer 上并行执行
  5. 当各个 DFO 都执行完毕,QC 会串行执行剩余部分的计算。如,一个并行的 COUNT 算法,最终需要 QC 将各个机器上的计算结果做一个 SUM 运算。
  6. QC 所在线程将结果返回给客户端 

一文详解 OceanBase 并行执行引擎实现-5

优化器负责决策生成一个怎样的并行计划,QC 负责具体执行该计划。例如,两分区表 JOIN,优化器根据规则和代价信息,可能生成一个分布式的 PARTITION WISE JOIN 计划,也可能生成一个 HASH-HASH 打散的分布式 JOIN 计划。计划一旦确定,QC 就会将计划拆分成多个 DFO,有序调度执行。


并行度与任务划分方法


我们用并行度 (Degree Of Parallelism, DOP)的概念指定用多少个线程(Worker)来执行一个 DFO。目前 OceanBase 通过 parallel 这个 hint 来指定并行度。确定并行度后,会将 DOP 拆分到需要运行 DFO 的多个 server 上。


对于包含扫描的 DFO,会计算 DFO 需要访问哪些 partition,这些 partition 分布在哪些 server 上,然后将 DOP 按比例划分给这些 server。例如,DOP = 6,DFO 要访问 120 个 partition,其中 server 1 上有 60 个 partition, server 2 上有 40 个 partition,server 3 上有 20 个 partition,那么,会给 server 1 上分 3 个线程, server 2 上分 2 个线程,server 3 上分 1 个线程,达到平均每个线程可以处理 20 个 partition 的效果。如果 DOP 和 partition 数不能整除,会做一定的调整,达到长尾尽可能短的目的。


如果每个机器上分得的 worker 数远大于分区数,会自动做分区内并行。每个分区会以宏块为边界切分成若干个扫描任务,由多个 worker 争抢执行。


为了将这种划分能力进行抽象和封装,引入 Granule 的概念。每个扫描任务称为一个 Granule,这个扫描任务既可以是扫一个 partition,也可以扫 partition 中的一小块范围。


一文详解 OceanBase 并行执行引擎实现-6

partition 的切分需要把握一个度,既不能且得太粗,也不能太细。太粗,容易出现 worker 工作量不均衡,太细,扫描时反复从一个 granule 切换到下一个 granule 造成的开销太大。目前使用了一个经验值,每个 worker 平均可以拿到 13 个 Granule 是最合适的。多个 Granule 串成一个链表,由各个 worker 从链表上抢任务执行。


一文详解 OceanBase 并行执行引擎实现-7

Worker 争抢 Granule 的例子   


对于不包含扫描任务的 DFO,会分配在 child DFO 所在的机器上,以尽可能减少一些跨机的数据传输。

部分 DFO 不能并行执行,会被打上 LOCAL 标记,QC 会在本地调度这样的 DFO,且强制将其并行度设置为 1。


并行调度方法


优化器生成并行计划后,QC 会将其切分成多个 DFO。如下图,是 t1、t2 表做 HASH JOIN,切分成了 3 个 DFO,DFO 1、DFO 2 负责并行扫描数据,并将数据 HASH 到对应节点,DFO 3 负责做 HASH JOIN,并将最终的 HASH 结果汇总到 QC。

一文详解 OceanBase 并行执行引擎实现-8

QC 会尽量使用 2 组线程来完成计划的调度,例如上面的例子中,QC 首先会调度 DFO 1 和 DFO 3,DFO 1 开始执行后就开始扫数据, 并吐给 DFO 3,DFO 3 开始执行后,首先会阻塞在 HASH JOIN 建 hash table 的步骤上,也就是会一直会从 DFO 1 收数据,直到全部收齐,建立好 hash table。然后 DFO 3 会从右边的 DFO 2 收数据。这时候  DFO 2 还没有被调度起来,所以 DFO 3 会等待在收数据的流程上。DFO 1 在把数据都发送给 DFO 3 后就可以让出线程资源退出了。调度器回收了 DFO 1 的线程资源后,立即会调度 DFO 2。 DFO 2 开始运行后就开始给 DFO 3 发送数据,DFO 3 每收到一行 DFO 2 的数据就回去 hash table 中查表,如果命中,就会立即向上输出给 QC,QC 负责将结果输出给客户端。


读到这里大家通常会有这么两个疑问:


  1. 为什么是先调度 DFO 1,再调度 DFO 2 呢?从右到左调度,先 DFO 2 再 DFO 1 不行吗?
  2. 上面只用 2 组线程就完成了调度,是不是巧合? 如果换了 MERGE JOIN 呢?

首先,所有的算子都遵循一个约定俗成的习惯,都是先 open 左支,再 open 右支,处理首行数据的顺序也是先左后右。这也就使得调度的顺序也要和这种习惯相一致,否则即使调度了,也无法推进。


其次,2 组线程完成调度,这是我们设计出来的,是并行框架设计中的人为限制,我们希望计划不要占用过多的线程组就能推进。为了达到这个目的,对于所有的左深树调度,如果有 DFO 需要同时从左右 DFO 读数据,那么我们会在这个 DFO 中插入一些阻塞性算子(如 Sort、Matierial),强行先把左侧 DFO 中数据全部收取上来。例如 TPCH Q12 的并行计划如下,箭头处的 MERGE JOIN(MJ)需要从左边、右边同时收取数据才能做 JOIN,为了达到这个目的,MERGE JOIN 左侧是一个 MERGE SORT RECEIVE IN(IN.SORT)算子,它是阻塞算子,需要把下面的所有数据都收上来之后,才会向 MERGE JOIN 吐出第一行。


一文详解 OceanBase 并行执行引擎实现-9

另外,还需要注意到一个情况,依然以上图为例,考虑场景:MERGE JOIN 匹配了一行结果,会往上吐,图中吐出的结果会被 HASH GROUPBY(HASH GBY)全部缓存下来,直到 MERGE JOIN 匹配完所有数据后, HASH GROUPBY 才会对上层吐数据。这时读 orders 表、lineitem 表的 DFO 都已经调度结束,最上层的 DFO 被调度,正好能消费 HASH GROUPBY 对上层吐出的数据。


假设 MERGE JOIN 上面没有 HASH GROUP BY 算子怎么办?这时数据会尝试向上吐,但上面的 DFO 还没有被调度,MERGE JOIN 吐出的数据发不出去会导致 MERGE JOIN 被阻塞,整个 Query 都无法推进。为了解决这个问题,我们会在 MERGE JOIN 上面插入一个 Material 算子,用它来缓存 MERGE JOIN 吐出的所有数据。

但也有特殊的情况,存在 2 组线程无论如何都搞不定的场景,这时候我们也支持分配 3 组线程。具体参见 特色功能-调度 章节的讨论。


网络通信方法


一对有关联的 DFO,child DFO 作为生产者分配了 M 个 Worker 线程, parent DFO 作为消费者分配了 N 个 Worker 线程。他们之间的数据传输需要用到 M * N 个网络通道。


一文详解 OceanBase 并行执行引擎实现-10

为了对这种网络通信进行抽象,引入数据传输层(Data Transfer Layer, DTL)的概念,任意两点之间的通信连接用通道(channel)的概念来描述。


通道分为发送端和接收端,在最初的实现中我们允许发送端无限地给接收端发送数据,但发现如果接收端无法立即消费掉这些数据,可能会导致接收端内存爆,所以加入了流控逻辑。每个 channel 接收端预留了三个槽位,当槽位被数据占满时会通知发送端暂停发送数据,当有接收端数据被消费空闲槽位出现时通知发送端继续发送。


资源控制与 Query 排队


PX 是以线程为基本单位分配运行资源,有一个固定大小的共享线程池供每个租户的 PX 请求。当并发请求较多,线程资源不够时,会让请求线程失败的 Query 排队。


分布式场景下,如果一个 Query 已经获取了一部分线程,另一部分线程获取失败,会重试获取线程,如果重试若干秒后依然无法获取到线程资源,说明当前系统繁忙,会让当前 Query 失败。


特色功能

1. 调度


OceanBase 2.0 支持多种形态的计划调度,例如常见的左深树、右深树,以及之字型树(Zig-Zag Tree)

一文详解 OceanBase 并行执行引擎实现-11

树的形态决定了调度需要的线程资源组数。左深树的调度是最简单的,只需要同时启动两组线程就可以驱动左深树的执行,对于右深树,则需要 3 组线程才能驱动。如上图,用数子标注了调度顺序,数子相同时表示对应边会同时调度,上面的右深树中的数字 3、4 分别对应了两条边,两条边连接了 3 个 DFO,这 3 个 DFO 会同时调度。


下面略微变形的左深树、之字型树虽然看起来复杂不少,它们本质和上面的的 3 种树型是一样的,调度上依然可以轻松支持。


一文详解 OceanBase 并行执行引擎实现-12

除了上面介绍的树型之外,都称之为 Bushy Tree,目前 Bushy Tree 的调度不支持流水执行。当遇到 Bushy Tree 时,回退为单步执行模式,每次只调度执行一个 DFO,写中间结果。


一文详解 OceanBase 并行执行引擎实现-13

2. 流水与落盘


上面提到单步调度、双 DFO 调度、 3-DFO 调度,那么不同的调度方式,对数据的流水有什么影响呢?

首先看单步调度,每一步计算都写中间结果,数据无法流水。


再看双 DFO 调度,由于同时调度 2 层 DFO,下层 DFO 作为生产者,上层 DFO 作为消费者,可以形成一个局部流水线,下层生产的数据无需落盘,可以直接通过网络推送给上层消费。


但是,双 DFO 调度推进若干步后,还是会面临落盘的场景。例如下面的左深树中,红色节点表示当前正在被调度的 DFO,灰色表示已经调度完成的 DFO,白色表示尚未调度的 DFO。左图中,HASH JOIN 1 的 hash 表已经通过 t1 送来的数据建好,从右边 t2 表每读入一行,就可以立即向上面输出匹配的数据。但是,HASH JOIN 2 还没有开始调度执行,无法接受 HASH JOIN 1 的数据。作为应对,需要将 HASH JOIN 1 的结果暂存在内存中,甚至落盘。当 HASH JOIN 2 被调度后,才开始消费暂存的数据,如右图所示。


一文详解 OceanBase 并行执行引擎实现-14

我们还有一种策略避免落盘,即 3-DFO 调度。还是用上面的例子来说明,SCAN t2、HASH JOIN 1、HASH JOIN 2 同时调度,就可以避免在 HASH JOIN 1 中强制落盘。这么做的代价是:需要启动更多组线程。


一文详解 OceanBase 并行执行引擎实现-15

是不是还可以有 N-DFO 调度策略呢?理论上是可以的,但通常这么做没什么意义。处在高层的 DFO 在调度执行后,会很长一段时间无事可做,直到首行输入到来。


综上所述,DFO 的并行调度层数和流水的长度紧密相关,到底并行调度多少层 DFO 最合适,需要根据系统的资源富裕程度来合理决策。考虑到 SQL 中很多算子(如 Sort、Group By 等)天然具备暂存结果的能力,两层 DFO 就可以满足 OceanBase 大部分场景的需求。


3. 可变并行度


并行度用于控制用多少线程来执行一个 DFO,一种简单策略是每个 DFO 的并行度都一样,但在部分场景下这不是最优选择,考虑下面的场景:t1, t2 做连接,其中 t1 是大表, t2 是小表,但 t1 经过复杂条件过滤后输出的行数很少,广播到 t2 表上做连接。那么可以考虑给 t1 表扫描分配较大并行度,t2 扫描以及 join 分配较小的并行度。如下图:

一文详解 OceanBase 并行执行引擎实现-16

虽然可变并行度的设计是很符合直觉的,但并不是所有的数据库都支持这么做。允许可变并行度,可以让执行优化具备更多的灵活性,在不损失效率的基础上更加节省资源。


未来工作

目前,OceanBase 并行框架还远远没有达到成熟,未来我们会在以下几个方面继续夯实基础:

  • 资源管控,包括更好的 CPU 资源管控策略,memory 管控策略等
  • 性能分析与诊断框架
  • 容错策略
  • 算子、表达式性能优化,数据迭代模式的优化
  • 网络框架优化
  • 存储优化

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论