揭秘!OceanBase 并行执行引擎到底如何工作?

2024年 5月 7日 39.2k 0

摘要:好消息!OceanBase现推出分布式数据库产品模块原理简介系列内容,通过完整13篇文章帮助数据库从业者建立更系统完善的数据库知识体系。第八期我们来聊聊分布式数据库中一个重要的模块——并行执行引擎。Tips:关注OceanBase公众号回复“产品原理”获取OceanBase产品模块原理简介系列已发布的8篇文章合集(该系列持续更新中)揭秘!OceanBase 并行执行引擎到底如何工作?-1

前言


OceanBase是一个Share Nothing的分布式数据库系统,处理场景对比传统数据库来说在数据量,数据分布都有更高的挑战性。集群规模会远大于传统商业数据库的场景,这对于整个SQL并行查询执行引擎会带来极高的挑战。一个能够水平扩展的数库集群,需要一个在高并发下迅速进行复杂HTAP场景查询支持的SQL执行引擎和多机并发资源管理、资源调度系统。下面我们来看下OceanBase的并行执行框架引擎是如何工作的。


概念介绍   

  • PX:并行执行(Parallel eXecution,下称 PX)需要同时调度多组线程资源协同完成一个查询。
  • QC:当用户给定的 SQL 语句需要访问的数据位于 2 台或 2 台以上 OBServer、或者单节点包含多个分区的表,就会启用并行执行,当SQL计划在连接的OBserver执行到并行查询时,主进程在决定并行度后,发送工作线程获取请求到各个机器,此时用户连接的OBServer就会作为QC角色,即查询协调者Query  Coordinator(QC)。
  • SQC:每个处理QC请求的线程,自动成为该查询的Sub Query  Coordinator(SQC)即辅助查询协调者。SQC 负责在所在 OBServer上为各个 DFO 申请执行资源、构造执行上下文环境等,然后启动 DFO 在各个 OBServer上并行执行。
  • DFO:Data Flow Operation(DFO)即子计划,每个DFO包含若干个串行执行的算子。例如,一个DFO里包含了扫描分区,聚集,发送算子,另外一个DFO里包含了收集、聚集算子等。
  • DTL:数据传输服务层 Data Transfer Layer(DTL),该数据服务层负责该机器上所有的并行执行计划的数据通道的建立,对外暴露逻辑上的“通道”。通道的接口类似一个FIFO,生产者负责PUSH,消费者负责POP。通道的实现与接口解耦,通道的两端可以在单机上,也可以跨物理机,不同场景的实现对上层透明化。
  • DOP:OceanBase 使用Degree Of Parallelism(DOP)即并行度来指定用多少个线程(Worker)来执行一个 DFO。目前 OceanBase 通过Parallel这个HINT来指定并行度。确定并行度后,对于包含表扫描的DFO,会根据不同Server的待扫描Partition数来决定给每个Server不同比例的Worker数。

任务调度     


调度会分为两级调度。一级QC是SQL查询被执行的机器,在并行查询执行中,负责总体执行计划的调度和划分机器级别资源,并且收集和发送全局信息到所有机器。一级QC和各机器的SQC通信,不直接和各机的实际工作线程通信。各机的SQC负责各机内部工作线程的调度工作,接受QC的调度命令,在机器内根据QC提供的机器级别任务进行本机的任务划分,并通过调度消息分配给各个本机工作线程。线程组工作参照下图:


揭秘!OceanBase 并行执行引擎到底如何工作?-2

通信机制    


如果通道两端位于同一台物理机,单机通道的实现利用共享内存,就是提供一个本地的FIFO队列,此时也不需要DTL提供额外的线程来同步数据的传输。


揭秘!OceanBase 并行执行引擎到底如何工作?-3


如果通道两端位于同一台物理机,单机通道的实现利用共享内存,就是提供一个本地的FIFO队列,此时也不需要DTL提供额外的线程来同步数据的传输。如果是跨机通信,初次建立将基于现有的RPC机制,握手过程完成后,所有的QC到SQC,以及各自机器SQC到工作线程的通信将依赖于DTL机制建立通信连接,每一个SQC到每台机器的两组工作线程全部都有通信连接,每一个工作线程除了和SQC的通信之外,还和对端工作组的所有工作线程有通信连接。考虑在大并发度下的通信资源问题,DTL在内部实现中实现一定的共享机制,比如将某个查询的工作线程分成几组,一组采用一个共享的消息暂存和发送队列,由一组消息处理线程提供服务。在接收到发送到某一个线程的消息时,先放入相应组的消息队列中,然后再由该组的消息服务线程进行下一步的分发到对应工作线程。


通道(Channels )数=M(生产者)* N(消费者),每个消费者会预留3个内存槽,如果内存槽占满,当槽位被数据占满时会通知发送端暂停发送数据,当有接收端数据被消费空闲槽位出现时通知发送端继续发送。

切分粒度


对于基表扫描的任务,在进行并行任务切分时,需要考虑选择一个合适的切分粒度。切分粒度过细,将导致任务过多,每次分发占用的额外开销将占整个任务执行时间较高比重,切分粒度过粗,则出现数据倾斜的可能性增加。任务切分在SQC启动相应包含扫描的子树时启动,每个机器的SQC独立进行。对于Query Range类型的并行。基本思路会是按照计算出的最佳任务大小,切分本地宏块组,将每一个均匀切分的宏块组反向映射到Query  Range每一个任务在切分时不跨越分区边界。


动态均衡


并行执行框架实现了工作线程向SQC要扫描表任务的功能,功能类似Oracle的GRANULE ITERATOR,实现扫描任务的动态负载均衡。我们实现一个GRANULE ITERATOR的算子,它的功能就是从每台机器每个查询的任务参数队列中去拿到一个或多个任务参数,以用于给TABLE SCAN配置参数,在TABLE SCAN返回扫描结束的时候,它将去拿下一个任务参数然后重新启动TABLE SCAN。这个算子一方面避免了多次序列化整个DFO的算子子树给工作线程,提供动态的负载均衡。


分布式查询流程说明


1.  Query发起,所连的Server担当起QC角色2.  QC首先预约足够的线程资源3.  QC将需要并行的计划拆成多个DFO,根据优化器或者用户指定4.  QC按照一定的逻辑顺序将DFO分发到SQC,SQC调度Worker执行5.  当各个DFO都执行完毕,QC 会串行执行剩余部分的计算。如,一个并行的COUNT 算法,最终需要QC 将各个机器上的计算结果做一个SUM 运算6.  QC所在线程将结果返回给客户端 


揭秘!OceanBase 并行执行引擎到底如何工作?-4

通过执行计划看并发执行      

了解了实现逻辑,我们看OceanBase中 一条SQL的执行计划是怎样的

 

环境准备


表名

是否分区

行数

PARTSUPP

80000000

SUPPLIER

1000000

PART

20000000

NATION

5

REGION

25

建表语句:


  •  表PARTSUPP
CREATE TABLE PARTSUPP (
        PS_PARTKEY BIGINT NOT NULL,
        PS_SUPPKEY BIGINT NOT NULL,
        PS_AVAILQTY BIGINT NOT NULL,
        PS_SUPPLYCOST DECIMAL(15, 2) NOT NULL,
        PS_COMMENT VARCHAR(199) NOT NULL,
        PRIMARY KEY (PS_PARTKEY, PS_SUPPKEY),
         INDEX INDEX_1(PS_SUPPKEY) LOCAL,
INDEX INDEX_2(PS_PARTKEY) LOCAL
)

PARTITION BY HASH (PS_PARTKEY) PARTITIONS 256;

  • PART
CREATE TABLE PART (
        P_PARTKEY BIGINT NOT NULL,
        P_NAME VARCHAR(55) NOT NULL,
        P_MFGR CHAR(25) NOT NULL,
        P_BRAND CHAR(10) NOT NULL,
        P_TYPE VARCHAR(25) NOT NULL,
        P_SIZE BIGINT NOT NULL,
        P_CONTAINER CHAR(10) NOT NULL,
        P_RETAILPRICE DECIMAL(15, 2) NOT NULL,
        P_COMMENT VARCHAR(23) NOT NULL,
        PRIMARY KEY (P_PARTKEY)
)
PARTITION BY HASH (P_PARTKEY) PARTITIONS 256;
  • 表SUPPLIER
CREATE TABLE SUPPLIER (
        S_SUPPKEY BIGINT NOT NULL,
        S_NAME CHAR(25) NOT NULL,
        S_ADDRESS VARCHAR(40) NOT NULL,
        S_NATIONKEY BIGINT NOT NULL,
        S_PHONE CHAR(15) NOT NULL,
        S_ACCTBAL DECIMAL(15, 2) NOT NULL,
        S_COMMENT VARCHAR(101) NOT NULL,
        PRIMARY KEY (S_SUPPKEY),
        index index_1(S_NATIONKEY) local
)
PARTITION BY HASH (S_SUPPKEY) PARTITIONS 256;
  • NATION
CREATE TABLE NATION (
        N_NATIONKEY BIGINT NOT NULL,
        N_NAME CHAR(25) NOT NULL,
        N_REGIONKEY BIGINT NOT NULL,
        N_COMMENT VARCHAR(152),
        PRIMARY KEY (N_NATIONKEY),
        INDEX INDEX1(N_REGIONKEY) LOCAL
);
  •  表REGION
CREATE TABLE REGION (
        R_REGIONKEY BIGINT NOT NULL,
        R_NAME CHAR(25) NOT NULL,
        R_COMMENT VARCHAR(152),
        PRIMARY KEY (R_REGIONKEY)
);
  •  SQL

SELECT *

FROM (

   SELECT /*+   parallel(256) */ s_acctbal, s_name, n_name, p_partkey, p_mfgr

      , s_address, s_phone, s_comment

   FROM part, supplier, partsupp, nation, region

   WHERE p_partkey = ps_partkey

      AND s_suppkey = ps_suppkey

      AND p_size = 30

      AND p_type LIKE '%STEEL'

      AND s_nationkey = n_nationkey

      AND n_regionkey = r_regionkey

      AND r_name = 'ASIA'

      AND ps_supplycost = (

         SELECT MIN(ps_supplycost)

         FROM partsupp, supplier, nation,  region

         WHERE p_partkey = ps_partkey

            AND s_suppkey = ps_suppkey

            AND s_nationkey = n_nationkey

            AND n_regionkey = r_regionkey

            AND r_name = 'ASIA'

      )

   ORDER BY s_acctbal DESC, n_name, s_name,  p_partkey

) WHERE rownum <=  100

执行计划

揭秘!OceanBase 并行执行引擎到底如何工作?-5

PX计划树:

揭秘!OceanBase 并行执行引擎到底如何工作?-6

  • 通过HINT将DOP设置为256
  • QC将查询切分成多个DFO ,处理REGION表的DFO通过多Worker并行扫描,获得到相应数据,作为生产者将数据Broadcast通知到其他SQC节点
  • 同上,查询NATION的WORKER作为生产者将数据Broadcast通知到其他SQC节点
  • PART和PARTSUPP是分区表,Worker是在各分区并行做全表扫描。将结果集NLJ后向上传递
  • 处理SUPPLIER表的DFO 全表扫描,与其他DFO的结果集做HASH JOIN
  • 各个DFO的HASH JOIN、聚合、排序都执行完成
  • QC生成最终结果,并将结果返回给请求方


并行执行使用场景和操作指南   

并行执行相关系统变量

变量名 功能 类型 取值范围 级别 命令
parallel_servers_target 设置好后10s内后台自动生效。 int64 [0, 3600] 租户级 set global parallel_servers_target = 24; alter system set parallel_servers_target = 24;
parallel_max_servers parallel_max_servers 可以在运行时动态更改,最小为 0,最大为 3600。租户能够动态感知到parallel_max_servers 值的变化,并根据值的变化来动态扩容/缩容线程池。当取值为 0 时,PX 路径关闭,所有查询都走老并行查询框架。设置好后10s内后台自动生效。 int64 [0,1800] 租户级 set global parallel_max_servers = 24; alter system set parallel_max_servers = 24;

可以使用并行查询的操作


下述各种查询操作都可以使用并行查询:

  • 各种Access Methods

全表扫描(包括分区间并行和分区内并行扫描),索引表扫描

  • 各种表连接操作

包括NESTED LOOP JOIN,MERGE JOIN和HASH JOIN

  • 其他一些SQL操作

包括一些聚合操作,例如GROUP BY,DISTINCT,SUM等,LIMIT算子的下压等

建议使用并行查询的场景


并行查询对于以下情况有显著效果


  • 充足的IO带宽
  • 系统CPU负载较低
  • 充足的内存资源以满足并行查询的需要

如果系统没有充足的资源进行额外的并行处理,使用并行查询或者提高并行度并不能提高执行性能。相反,在系统过载的情况下,操作系统被迫进行更多的调度,上下文切换或者页面交换,可能会导致性能的进一步下降。通常在D(ecision)S(upport)S(ystem)系统,大量分区需要被访问和数据仓库环境下,并行执行能够提升执行响应时间。OLTP系统通常在批量DML操作或者进行SCHEMA维护操作时能够受益,例如进行INDEX的创建等。对于简单的DML操作或者分区内查询以及涉及分区数比较小的查询来说,使用并行查询并不能很明显的提高查询响应时间。

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论