摘要:分布式数据库在执行 OLAP 查询时,可能会执行数分钟至数小时不等,为了避免执行期间任何节点失效导致计算前功尽弃,系统需要有一定的容错能力。本文通过分析对比 MPI、MapReduce、流式计算、分布式关系模型的容错策略,总结了做好分布式关系型数据库容错的几个要点,希望能给大家带来一些启发。
本文作者
晓楚,现任蚂蚁金服OceanBase技术专家,2011年毕业后加入OceanBase团队,先后从事过SQL引擎、集群管理、并行执行引擎研发工作。
MPI
早期最为流行的并行执行框架是 MPI(Message Passing Interface),主要应用在超级计算环境中;但是在云环境下,如 AWS/Azure/AliCloud 等,MPI 基本没有什么优势,主要有两方面原因:
- MPI 的一大优势是支持各种网络硬件(如 Infiniband),以及网络拓扑结构(如 Cray 公司的 Dragonfly 结构),而云上主流网络硬件是以太网卡;
- MPI 不支持容灾,任何一个节点失败,会导致整个任务失败。
MapReduce
随着互联网数据量的不断增长,人们对数据处理能力的要求也越来越高,以 Google 为代表的互联网公司引入了比 MPI 更适合互联网时代的并行计算模型 MapReduce[1] 。MapReduce 诞生之初就充分考虑了数据的局部性,将大规模的数据分而治之,划分到多个相对廉价的 PC 机节点上并行处理,同时充分考虑到 PC 机的故障率较高,设计了完善的容错处理能力。
让我们来简单回顾一下 MapReduce 处理数据的过程:首先,各个计算节点对本地数据进行过滤、排序、聚集等操作,输出一个一个的 key-value 对,并保存在本地的临时空间中(map)。然后,计算节点将这些 key-value 对按照 key 的值分发到不同的节点上,保证具有相同 key 值的 key-value 对分发到相同的节点(shuffle)。最后,一组计算节点按 key 来并行处理各组数据,得到聚集结果(reduce)。如下图所示:
基于这种架构,MapReduce 的容错处理可以设计得非常清晰简单。
- Map 阶段:每个计算节点都是写中间结果到本地,一旦控制节点(master)探测到计算节点失效,只需要将这个计算节点踢出,并重新标记上面运行的任务为 idle 状态,即可在其它节点上再次调度。
- Reduce 阶段:在一个 reduce 任务完成之前 map 节点故障,如上所述,控制节点会调度一个新的 map 任务,reduce 任务最终也会从新的 map 任务处获取到中间结果;在一个 reduce 任务完成之前 reduce 节点故障,则丢弃中间结果,在新的节点上调度这个 reduce 任务;在一个 reduce 任务完成之后 reduce 节点故障,不需要做什么,因为 reduce 任务的输出写入到了全局文件系统中(GFS)。
最后一步 reduce 任务写输出结果到 GFS 很重要,它能极大简化容错处理。考虑包含了多步 map-reduce 的计算场景,如 map1-reduce1-map2-reduce2,如果 reduce1 任务的输出结果只写在本地,则 map2 必须始终要和 reduce1 调度在一起,当 reduce1 任务所在节点宕机并在新节点上重启任务时,必须把 map2 也重新调度到对应新节点上。
成也萧何败也萧何,MapReduce 具有非常强大的容错能力,但是其容错能力是建立在每一步中间结果都写盘的基础之上。这使得它只适用批量数据处理,对于交互式数据处理,实时数据处理的支持不够。为了应对互联网中大量实时数据处理的需求,流式计算逐步兴起。
流式并行计算
流式计算和 MapReduce 这种批量数据处理最大不同之处在于,流式计算处理的数据是无边界的,数据像水一样连绵不断地流入系统,流式计算平台基于时间阈值、数据量阈值等触发对数据的处理,而 MapReduce 处理的数据则是确定性的,有边界的。这种不同,也使得流式计算在容错方面具有自己的特点。
目前国内互联网企业中应用较广的流式计算框架先后有 Storm、Spark、Flink 等,下面以 Flink 为例简单说明流式计算中的容错处理机制。
Flink[2] 是一个流式计算框架,它支持从多种存储结构中流式读取数据并行处理,然后将结果输出到多种存储结构中,其总体架构如下:
需要注意的是,Flink 本身并没有实现任何存储系统,它使用 消息存储(如 Kafaka)、数据库(如 MySQL)、文件系统(如 HDFS)等作为数据源端(source)或数据目的端(sink);并且,只要这些存储结构能够支持 Exactly-Once 语义,Flink 就能够很自然地支持计算节点的容灾。
所谓 Exactly-Once 语义指的是流入 Flink 的每一行数据会且只会影响最终结果一次,即使出现宕机或程序退出,也不会重复处理数据,或有数据漏处理。[3]
When we say “exactly-once semantics”, what we mean is that each incoming event affects the final results exactly once. Even in case of a machine or software failure, there’s no duplicate data and no data that goes unprocessed.
Flink 通过检查点机制来实现 Exactly-Once 语义。Flink 在每个固定的间隔生成一个检查点,将当前输入流的偏移位置持久化到外部存储中,一旦检查点生成成功,就会将已经输出到目的端的结果提交。检查点生成成功后,如果有 Flink 计算节点宕机,就会丢弃已经输出到数据目的端未提交的结果,选择新的计算节点,让程序按照检查点记录的偏移位置重新读取数据并处理。
总结一下,Flink 将存储层的容错委托给了外部系统,只专注于计算节点的容错,并将容错的数据粒度做到记录级,一旦错误发生,可以非常高效地从上一个检查点恢复执行。
分布式关系模型
并行执行模式下做 OLAP 查询,会将查询划分成多个子计划。
上图是为该查询生成的一种并行计划[4],一共划分了 4 个子计划,最底层是对 customers、sales 基表的扫描,中间两层节点对数据做分组和聚集运算。
OLTP 型的查询容错一般比较简单,因为这种查询执行时间较短,一旦有错误发生时,可以简单地重试整个查询。OLAP 型查询的容错则较为复杂,考虑四种情况:
- 包含基表数据的节点失效
- 包含有中间计算结果的节点失效
- 失效节点既包含基表数据(生产者),也包含中间计算结果(消费者)
- 失效节点包含两层中间结果,且它们为生产者-消费者关系
对于第一种情况:如果底层使用了多副本技术,只需选择备份副本重新读取即可。
对于第二种情况:如果中间计算节点的输入数据源还可读,只需选择新的中间计算节点重新计算即可。
对于第三种情况:选择备份副本重新读取表数据,并重新调度中间计算任务。
对于第四种情况,较难处理:即使能找到新的节点重新调度这两层任务,生产者的数据源也可能已经释放或宕机,无法重建中间结果。
受 MapReduce 模型的启发,为了系统解决上面这些问题,我们需要:
- 全局文件系统,以保存中间结果。有了可靠的全局文件系统,就不用担心中间节点宕机问题
- 在适当时机生成检查点,以确定保存中间结果的时机
以上还没有考虑另外一个问题:生产者发送部分数据给消费者后宕机再恢复时,如何保证 Exactly-Once 语义?有两种解决粒度:
- 粗粒度解决思路:让所有消费者丢弃已经处理的结果,从生产者处拉取数据重新计算
- 细粒度解决思路:可以参考 Flink,定期对生产者的输出状态做检查点,宕机恢复时从检查点处开始恢复
总结
MapReduce 具有非常健壮的容错能力,Flink 具有非常细粒度的容错能力,将这两种能力结合,应用到分布式关系数据库中,可以极大降低 OLAP 查询的运维成本。
同时也要意识到,将中间结果保存到全局共享存储也好,生成宕机恢复检查点也好,都是有代价的。我们究竟愿意为很少发生的硬件故障付出多大的代价?在实际的设计过程中需要细细权衡。
参考资料:
[1] https://research.google.com/archive/mapreduce-osdi04.pdf
[2]https://flink.apache.org/
[3]https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
[4]https://docs.oracle.com/en/database/oracle/oracle-database/18/vldbg/parallel-exec-intro.html#GUID-F9A83EDB-42AD-4638-9A2E-F66FE09F2B43