更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群
文章主要介绍了火山引擎湖仓一体分析服务 LAS Spark(下文以 LAS Spark 指代)在 TPC-DS 上的性能突破与优化策略。TPC-DS 是一个模拟复杂数据仓库环境的测试基准,LAS Spark 通过采用规则优化、缓存优化和运行时优化三类优化策略,实现了超越社区版本的巨大性能提升,且已在内部生产环境得到验证。文末更有专属彩蛋,新人优惠购福利,等着你来解锁!
本篇文章提纲如下:
- TPC-DS 简介
- 性能表现
- 自研优化策略
- 总结
TPC-DS 简介
针对数据库不同的使用场景 TPC 组织发布了多项测试标准。
TPC-DS 采用星型、雪花型等多维数据模式。它包含 7 张事实表,17 张纬度表,平均每张表含有 18 列。其工作负载包含 99 个 SQL 查询,覆盖 SQL 99 和 2003 的核心部分以及 OLAP。这个测试集包含对大数据集的统计、报表生成、联机查询、数据挖掘等复杂应用,测试用的数据和值有倾斜,与真实数据一致。可以说 TPC-DS 是一个与真实场景非常接近的测试集,难度较大,覆盖场景广,能有效反应不同业务的需求。
TPC-DS 的这个特点与大数据的分析挖掘应用非常类似。Hadoop 等大数据分析技术也是对海量数据进行大规模的数据分析和深度挖掘,也包含交互式联机查询和统计报表类应用,同时大数据的数据质量较低,数据分布真实而不均匀。因此 TPC-DS 成为客观衡量多个不同 Hadoop 版本以及 SQL on Hadoop 技术的最佳测试集。这个基准测试有以下几个主要特点:
- 一共 99 个测试案例,遵循 SQL 99 和 SQL 2003 的语法标准,SQL 案例比较复杂
- 分析的数据量大,并且测试案例是在回答真实的商业问题
- 测试案例中包含各种业务模型(如分析报告型,迭代式的联机分析型,数据挖掘型等)
- 几乎所有的测试案例都有很高的 IO 负载和 CPU 计算需求
TPC-DS 数据集的业务模型丰富,在 TPC-DS 数据集上测试 Spark 并验证优化性能,能对 LAS 环境的多个业务方作业带来性能提升。
LAS Spark 在 TPC-DS 测试集的性能表现
我们对比了火山引擎 LAS Spark 3.0 于社区 3.0 版本在 TPC-DS 上的性能表现。
- Spark 3.0 TPC -DS 1T 数据集
TPC-DS 1T 的性能对比中,火山引擎 LAS Spark 3.0 达到了社区 3.0 性能的 2.1x。
- Spark 3.2 TPC -DS 1T 数据集
TPC-DS 1T 的性能对比中,火山引擎 LAS Spark 3.2 达到了社区 3.2 性能的 2.5x。
LAS Spark 团队自研优化
火山引擎 LAS Spark 相比社区有较大的性能提升,这些性能提升一部分来源于厂内已有的性能优化,例如AdaptiveShuffledHashJoin、AdaptiveFileSplit 等;还有一部分来源于对 TPC-DS 数据集的研究和挖掘。在对 TPC-DS 的 workload 的测试和研究中,Spark SQL 团队发现了一些潜在的性能优化点。
火山引擎 LAS Spark 在 TPC-DS 数据集上的性能优化可以分为三种类型,分别是规则优化、缓存优化和运行时优化,下面我们将分别介绍这三类优化,以及具体的优化策略。
3.1 规则优化
规则优化,指的是在 Spark Optimizer 阶段增加了一些规则来优化逻辑计划。我们常说的谓词下推优化就是 Optimizer 阶段的一条优化规则。
3.1.1 Fast Decimal
Decimal 的计算比较耗时,在一些情况下可以把 Decimal 类型先转成 Long 计算,然后再恢复成 Decimal。Spark 现有的优化规则 DecimalAggregates 就是做这样的优化。
DecimalAggregates 针对 window/agg 的聚合函数是对 decimal 的 sum/agg 的场景做了如下优化
Sum(e) => MakeDecimal(Sum(UnScaledValue(e)))
Avg(e) => CastToDecimal(Avg(UnScaledValue(e)))
但是当前这个优化规则还不足够,我们在此基础上做了更多的优化:
当前判断能否把 decimal 转成 Long 是根据 hive schema 里定义的 decimal 类型,但是如果我们已经有了每列的统计信息(最大最小值),我们可以进一步把这个 decimal 的 precision 缩小,进而可以覆盖更多 case。
比如,tpc-ds 里 store_returns 的 sr_fee 的schema 定义是 Decimal(7,2),但是通过 analyze table 之后可以知道,这个列的最大值是 100,那我们就可以把这个 schema 变成 Decimal(5,2)。
DecimalAggregates 规则的更优实现
a. 当前的规则是对 Sum 最外层的表达式把 Decimal 转成了 Long,比如对于 TPCDS Query4 来说,里面有一个 sum 如下
sum((ss_ext_list_price-ss_ext_wholesale_cost-ss_ext_discount_amt+ss_ext_sales_price)/2) year_total
当前规则下, 生成的 plan 是
MakeDecimal(Sum(UnScaledValue((ss_ext_list_price-ss_ext_wholesale_cost-ss_ext_discount_amt+ss_ext_sales_price)/2)))
其中标黄色的部分是按 decimal 计算的,在做 sum 之前转成了 Long。显然这里更高效的做法是
MakeDecimal(Sum((UnScaledValue(ss_ext_list_price)-UnScaledValue(ss_ext_wholesale_cost)-UnScaledValue(ss_ext_discount_amt)+UnScaledValue(ss_ext_sales_price))/2))
除此之外,对于 Sort,并且里面有 Decimal 类型,我们可以直接改成通过 unscaled long 排序;对于两个 Decimal 进行 BinaryComparison,如果他们的 precision 和 scale 都相同,那也可以通过unscaled long 进行对比等等。
Fast Decimal 的中心思想就是避免 Decimal 的计算,尽可能把 Decimal 类型先转成 Long 计算,以达到加速计算的效果。
3.1.2 Push Order Limit Through Agg
对于下面的 AGG + ORDER + Limit
场景的 在 TPC-DS 中比较常见(例如 Query3,Query 7 Query8 等), 可以将 Ordered Limit
限制下推到 Aggregation 中:
select a, b, c, agg_f0, agg_f1, agg_f2
from t
group by a, b, c
order by c, b, [agg_f0]...
limit 100
-- 限制条件: order by 的前缀字段需要是 group by 字段的子集.
一般来讲, 上述的 Query
会生成 Agg
+ Sort
+ Limit
算子,其中 Sort
+ Limit
算子会被优化成 TopK
, 也即 Agg
+ TopK
. 其中 Agg
算子不会感知到任何 limit
或者 order
信息. 但仔细观察上述查询特征, order by
中的最前面几个字段是 group by
字段的子集, 这些字段在Partial
聚合过程已经确定, 因此我们可以利用 Orderd Limit
信息, 在 Partitial
聚合阶段就应用这部分信息, 减少数据聚合. 也即:
input -> partial agg -> exchange -> final agg -> takeOrderedAndProject
-- 将会转换成
input -> partial agg(group key=c,b,a order key=c,b limit=100) -> exchange -> final agg -> takeOrderedAndProject
3.1.3 Window TopK
在对 Query 67 的分析中,我们发现耗时的瓶颈在于 window 内的 rank 计算。Spark 在执行 window 计算之前,为了保证一个 partition 内具有相同分区的字段的数据分布是连续的,会按照分区字段做一次 partition 内的局部排序. 但由于 Q67
中 window 的分区字段 i_category
的基数较少, 导致单个 task 数据较多,执行 Sort + Window
耗时很久。
由于 Query 67 中 window 计算后紧跟着过滤条件: rk