DDIA:批处理和 MPP 数据库千丝万缕

2023年 12月 21日 67.6k 0

批处理工作流的输出

我们已经讨论了串起 MapReduce 工作流的一些算法,但我们忽略了一个重要的问题:当工作流结束后,处理结果是什么?我们一开始是为什么要跑这些任务来着?

对于数据库查询场景,我们会区分事务型处理场景(OLTP)和分析性场景(OLAP,参见事务型还是分析型)。我们观察到,OLTP 场景下的查询通常只会涉及很小的一个数据子集,因此通常会使用索引加速查询,然后将结果展示给用户(例如,使用网页展示)。另一方面,分析型查询通常会扫描大量的数据记录,执行分组(grouping)和聚集(aggregating)等统计操作,然后以报表的形式呈现给用户:比如某个指标随时间的变化曲线、依据某种排序方式的前十个数据条目、将数据按子类分解并统计其分布。这些报表通常会用于辅助分析员或者经理进行商业决策。

那批处理处于一个什么位置呢?它既不是事务型,也不是分析型。当让,从输入数据量的角度来说,批处理更接近分析型任务。然而,一组 MapReduce 任务组成的执行流通常和用于分析型的 SQL 查询并不相同(参见 Hadoop 和分布式数据库的对比)。批处理的输出通常不是一个报表,而是另外某种格式的数据。

构建查询索引

谷歌发明 MapReduce 大数据处理框架的最初动机就是解决搜索引擎的索引问题,开始时通过 5~10 个 MapReduce 工作流来为搜索引擎来构建索引。尽管谷歌后面将 MapReduce 使用拓展到了其他场景,仔细考察构建搜索引擎索引的过程,有助于深入地了解 MapReduce(当然,即使到今天, Hadoop MapReduce 仍不失为一个给 Lucene/Solr 构建索引的好办法)。

我们在“全文索引和模糊索引”一节粗策略的探讨过像 Lucene 这样的全文索引引擎是如何工作的:倒排索引是一个词表(the term dictionary),利用该词表,你可以针对关键词快速地查出对应文档列表(the postings list)。当然,这是一个很简化的理解,在实践中,索引还需要很多其他信息,包括相关度,拼写订正,同义词合并等等,但其背后的原理是不变的。

如果你想在一个固定文档集合上构建全文索引,批处理非常合适且高效:

  • Mapper 会将文档集合按合适的方式进行分区
  • Reducer 会对每个分区构建索引
  • 最终将索引文件写回分布式文件系统
  • 构建这种按文档分区(document-partitioned,与 term-partitioned 相对,参见分片和次级索引)的索引,可以很好地并发生成。由于使用关键词进行索引查询是一种只读操作,因此,这些索引文件一旦构建完成,就是不可变的(immutable)。

    如果被索引的文档集发生变动,一种应对策略是,定期针对所有文档重跑全量索引构建工作流(workflow),并在索引构建完时使用新的索引对旧的进行整体替换。如果两次构建之间,仅有一小部分文档发生了变动,则这种方法代价实在有点高。但也有优点,索引构建过程很好理解:文档进去,索引出来。

    当然,我们也可以增量式的构建索引。我们在第三章讨论过,如果你想增加、删除或者更新文档集,Lucene 就会构建新的索引片段,并且异步地将其与原有索引进行归并(merge)和压实(compact)。我们将会在第十一章就增量更新进行更深入的讨论。

    以 KV 存储承接批处理输出

    搜索索引只是批处理工作流一种可能的输出。批处理其他的用途还包括构建机器学习系统,如分类器(classifiers,如 垃圾邮件过滤,同义词检测,图片识别)和推荐系统(recommendation system,如你可能认识的人,可能感兴趣的产品或者相关的检索)。

    这些批处理任务的输出通常在某种程度是数据库:如,一个可以通过用户 ID 来查询其可能认识的人列表的数据库,或者一个可以通过产品 ID 来查询相关产品的数据库。

    web 应用会查询这些数据库来处理用户请求,这些应用通常不会跟 Hadooop 生态部署在一块。那么,如何让批处理的输出写回数据库,以应对 web 应用的查询?

    最直观的做法是,在 Mapper 或者 Reducer 代码逻辑中,使用相关数据库的客户端库,将 Mapper 或者 Reducer 的输出直接写入数据库服务器,每次一个记录。这种方式能够工作(须假设防火墙允许我们直接从 Hadoop 中访问生产环境的数据库服务器),但往往并不是一个好的做法:

    • 吞吐不匹配。正如之前 join 一节中所讨论的,通过网络一条条写入记录的吞吐要远小于一个批处理任务的吞吐。即使数据库的客户端通常支持将多个 record 写入 batch 成一个请求,性能仍然会比较差。
    • 数据库过载。一个 MapReduce 任务通常会并行地跑很多个子任务。如果所有 Mapper 和 Reducer ,以批处理产生输出的速率,并发地将输出写到同一个数据库,则该数据库很容会被打爆(overwhelmed)。此时,其查询性能大概率非常差,也因此难以对外提供正常服务,从而给系统的其他组件带来运维问题。
    • 可能产生副作用。通常来说,MapReduce 对外提供简单的“全有或全无(all-or-nothing)”的输出保证:如果整个任务成功,即使子任务一时失败重试,但最终的输出也会看起来像运行了一次;如果整个任务失败,则没有任何输出。但直接从任务内部将输出写入外部服务,会产生外部可见的副作用。在这种情况下,你就必须考虑任务的部分成功状态可能会暴露给其他系统,并要理解 Hadoop 内部重试和推测执行的复杂机制。

    一个更好的方案是,在批处理任务内部生成全新的数据库,并将其以文件的形式写入分布式系统的文件夹中。一旦任务成功执行,这些数据文件就会称为不可变的(immutable),并且可以批量加载(bulk loading)进只处理只读请求的服务中。很多 KV 存储都支持使用 MapReduce 任务构建数据库文件,比如 Voldemort,Terrapin, ElephantDB 和 HBase bulk loading。另外 RocksDB 支持 ingest SST 文件,也是类似的情况。

    直接构建数据库底层文件,就是一个 MapReduce 应用的绝佳案例:使用 Mapper 抽取 key,然后利用该 key 进行排序,已经覆盖了构建索引中的大部分流程。由于大部 KV 存储都是只读的(通过批处理任务一次写入后,即不可变),这些存储的底层数据结构可以设计的非常简单。例如,不需要 WAL(参见让 B 树更可靠)。

    当数据加载进 Voldemort 时,服务器可以利用老文件继续对外提供服务,新文件会从分布式文件系统中拷贝的 Voldemort 服务本地。一旦拷贝完成,服务器可以立即将外部查询请求原子地切到新文件上。如果导入过程中发生了任何问题,也可以快速地切回,使用老文件提供服务。因为老文件是不可变的,且没有立即被删除。

    批处理输出的哲学

    本章稍早我们讨论过 Unix 的设计哲学,它鼓励在做实验时使用显式的数据流:每个程序都会读取输入,然后将输出写到其他地方。在这个过程中,输入保持不变,先前的输出被变换为新的输出,并且没有任何其他的副作用。这意味着,你可以任意多次的重新跑一个命令,每次可以对命令或者参数进行下微调,或者查看中间结果进行调试,而不用担心对你原来系统的状态造成任何影响。

    MapReduce 任务在处理输出时,遵从同样的哲学。通过不改变输入、不允许副作用(比如输出到外部文件),批处理不仅可以获得较好的性能,同时也变得容易维护:

    • 容忍人为错误。如果你在代码中不小心引入了 bug,使得输出出错,你可以简单地将代码回滚到最近一个正确的版本,然后重新运行任务,则输出就会变正确。或者,更简单地,你可将之前正确的输出保存在其他的文件夹,然后在遇到问题时简单的切回去即可。使用读写事务的数据库是没法具有这种性质的:如果你部署了有 bug 的代码,并且因此往数据库中写入了错误的数据,回滚代码版本也并不能修复这些损坏的数据。(从有 bug 的代码中恢复,称为容忍人为错误,human fault tolerance)。这其实是通过牺牲空间换来的,也是经典的增量更新而非原地更新。
    • 便于敏捷开发。相比可能会造成不可逆损坏的环境,由于能够很方便地进行回滚,可以大大加快功能迭代的速度(因为不需要进行严密的测试即可上生产)。最小化不可逆性(minimizing irreversibility)的原则,有助于敏捷软件开发。
    • 简单重试就可以容错。如果某个 map 或者 reduce 任务失败了,MapReduce 框架会自动在相同输入上对其重新调度。如果失败是由代码 bug 引起的,在重试多次后(可以设置某个阈值),会最终引起任务失败;但如果失败是暂时的,该错误就能够被容忍。这种自动重试的机制之所以安全,是因为输入是不可变的,且失败子任务的输出会被自动抛弃。
    • 数据复用。同一个文件集能够作为不同任务的输入,包括用于计算指标的监控任务、评估任务的输出是否满足预期性质(如,和之前一个任务的比较并计算差异)。
    • 逻辑布线分离。和 Unix 工具一样,MapReduce 也将逻辑和接线分离(通过配置输入、输出文件夹),从而分拆复杂度并且提高代码复用度:一些团队可以专注于实现干好单件事的任务开发;另一些团队可以决定在哪里、在何时来组合跑这些代码。

    在上述方面,Unix 中用的很好地一些设计原则也适用 Hadoop——但 Unix 工具和 Hadoop 也有一些不同的地方。比如,大部分 Unix 工具假设输入输出是无类型的文本,因此不得不花一些时间进行输入解析(比如之前的例子中,需要按空格分割,然后取第 7 个字段,以提取 URL)。在 Hadoop 中,通过使用更结构化的数据格式,消除了底层的一些低价值的语法解析和转换:Avro (参见Avro)和 Parquet 是较常使用的两种编码方式,他们提供基于模式的高效编码方式,并且支持模式版本的演进。

    对比 Hadoop 和分布式数据库

    从之前讨论我们可以感觉到,Hadoop 很像一个分布式形态的 Unix。其中,HDFS 对标 Unix 中的文件系统,MapReduce 类似于 Unix 进程的一个奇怪实现(在 map 阶段和 reduce 阶段间必须要进行排序)。在这些源语之上,我们可以实现各种 join 和 group 语义。

    MapReduce 被提出时,并非是一种全新的思想。在其十多年前,所有前述小节我们提到的一些并行 join 算法都已经被 MPP (massive parallel processing)数据库所实现了。如,Gamma data base machine、Teradata 和 Tandem NonStop SQL 都是这个领域的先驱。

    当然,如果硬要区分的话:

  • MPP 数据库是在一组机器上分布式地、并行执行分析型的 SQL
  • MapReduce 和分布式文件系统提供了一种类似于操作系统的、更为通用的计算方式
  • 存储类型更为多样

    数据库要求用户遵循特定的模式(schema,数据模型,如关系型或者文档型)组织数据,但分布式系统中的文件是面向字节序列(byte arrary,即内容对于系统是黑盒),用户可以使用任何必要的方式进行建模和编码。因此,这些文件既可以是数据库记录的集合,也可以是文本、图像、视频、传感器数值、稀疏矩阵、特征向量、基因序列,或者其他任意类型的数据。

    换句话说,Hadoop 允许你以任意格式的数据灌入 HDFS,将如何处理的灵活性推到之后(对应之前讨论过的 schema-less,或者 schema-on-read )。与之相反,MPP 数据库通常要求用户在数据导入之前,就要针对数据类型和常用查询模式,进行小心的建模(对应 schema-on-write)。

    从完美主义者的角度来说,事先对业务场景进行仔细地建模再导入数据才是正道。只有这样,数据库用户才能够得到更高质量的数据。然而在实践中,即便不管模式快速导入数据,可能会让数据处于奇怪、难用、原始的格式,反而会比事先规划、考究建模后将限制死格式更为有价值。

    这种思想和数据库仓库很像:在大型组织中,将从不同部门来的数据快速聚集到一块非常重要,因为这提供了将原先分离的数据进行联结(join)的各种可能性。MPP 数据库所要求的小心精确地建模,会严重拖慢中心化数据的速度。以原始格式将数据聚集到一块,之后再去考虑如何进行建模,可以大大加速数据收集速度(这个概念有时也被称为数据湖,data lake,或者企业数据中心,enterprise data hub)。

    无脑数据导入其实是将数据理解的复杂度进行了转移:数据生产者无需关心数据会被如何使用,这是数据消费者的问题(类似读时模式,参见文档模型中 Schema 的灵活性)。在数据生产者和消费者处于不同团队、具有不同优先级时,这种方式的优势非常明显。因为可能没有一种通用的理想模型,出于不同目的,会有不同的看待数据方式。将数据以原始方式导入,允许之后不同消费者进行不同的数据变换。这种方式被总结为 sushi 原则:数据越原始越好。

    因此 Hadoop 经常用于 ETL 处理:将数据以某种原始的格式从事务型的处理系统中引入到分布式文件系统中,然后编写 MapReduce 任务以处理这些数据,将其转换回关系形式,进而导入到 MPP 数据仓库汇总以备进一步分析之用。数据建模依然存在,但是拆解到了其他的步骤,从而与数据收集解耦了开来。由于分布式文件系统不关心应用方以何种方式对数据进行编码,仅面向字节数组存储,让这种解耦成为了可能。

    处理模型更为多样

    MPP 数据库是一种将硬盘上的存储布局、查询计划生成、调度和执行等功能模块紧密糅合到一块的整体式软件。这些组件都可以针对数据库的特定需求进行调整和优化,针对目标查询类型,系统在整体上可以获得很好的性能。此外,SQL 作为一种声明式的查询语言, 表达能力很强、语义简洁优雅,让人可以通过图形界面而无需编写代码就可以完成对数据进行访问。

    但从另外的角度来说,并非所有类型的数据处理需求都可以合理地表达为 SQL 查询。例如,如果你想要构建机器学习和推荐系统、支持相关性排序的全文索引引擎、进行图像分析,则可能需要更为通用的数据处理模型。这些类型的数据处理构建通常都和特定应用强耦合(例如,用于机器学习的特征工程、用于机器翻译的自然语言模型、用于欺诈预测的风险预估函数),因此不可避免地需要用通用语言写代码来实现,而不能仅仅写一些查询语句。

    MapReduce 使工程师能够在大型数据集尺度上轻松的运行自己的代码(而不用关心底层分布式的细节)。如果你已经有 HDFS 集群和 MapReduce 计算框架,你可以基于此构建一个 SQL 查询执行引擎, Hive 项目就是这么干的。当然,对于一些不适合表达为 SQL 查询的处理需求,也可以基于 Hadoop 平台来构建一些其他形式的批处理逻辑。

    但后来人们又发现,对于某些类型的数据处理, MapReduce 限制太多、性能不佳,因此基于 Hadoop开发了各种其他的处理模型(在之后 “MapReduce 之外”小节中会提到一些)。仅有 SQL 和 MapReduce 这两种处理模型是不够的,我们需要更多的处理模型!由于Hadoop平台的开放性,我们可以较为容易实现各种处理模型。然而,在 MPP 数据库的限制下,我们想支持更多处理模型基本是不可能的。

    更为重要的是,基于 Hadoop 实现的各种处理模型可以共享集群并行运行,且不同的处理模型都可以访问 HDFS 上的相同文件。在 Hadoop 生态中,无需将数据在不同的特化系统间倒来倒去以进行不同类型的处理:Hadoop 系统足够开放,能够以单一集群支持多种负载类型。无需移动数据让我们更容易的从数据中挖掘价值,也更容易开发新的处理模型。

    Hadoop 生态系统既包括随机访问型的 OLTP 数据库,如HBase(参见“SSTables和LSM-Trees”),也包括 MPP 风格的分析型数据库,例如 Impala。HBase 和 Impala 都不依赖 MapReduce 进行计算,但两者都使用 HDFS 作为底层存储。它们访问数据和处理数据的方式都非常不同,但却可以神奇的并存于 Hadoop 生态中。

    面向频繁出错设计

    在对比 MapReduce 和 MPP 数据库时,我们会发现设计思路上的两个显著差异:

  • 故障处理方式:取决于对处理成本、故障频次的假设
  • 内存磁盘使用:取决于对数据量的假设
  • 相对在线系统,批处理系统对故障的敏感性要低一些。如果批处理任务失败,并不会立即影响用户,而且可以随时重试。

    如果在执行查询请求时节点崩溃,大多数 MPP 数据库会中止整个查询,并让用户进行重试或自动重试。由于查询运行时通常会持续数秒或数分钟,这种简单粗暴的重试的处理错误的方式还可以接受,毕竟成本不算太高。MPP 数据库还倾向将数据尽可能地存在内存里(例如在进行 HashJoin 的 HashBuild 时),以避免读取磁盘的额外损耗。

    与之相对,MapReduce 在遇到某个 map 或 reduce 子任务运行出错时,可以单独、自动地进行重试,而不会引起整个 MapReduce 任务的重试。此外,MapReduce 倾向于将数据(甚至是 map 到 reduce 中间环节的数据)进行落盘,一方面是为了容错,另一方面是因为 MapReduce 在设计时假设面对的数据量足够大,内存通常装不下。

    因此,MapReduce 通常更适合大任务:即那些需要处理大量数据、运行较长时间的任务。而巨量的数据、过长的耗时,都会使得处理过程中遇到故障司空见惯。在这种情况下,由于一个子任务(task) 的故障而重试整个任务(job) 就非常得不偿失。当然,即使只在子任务粒度进行重试,也会让那些并不出错的任务运行的更慢(数据要持久化)。但对于频繁出错的任务场景来说,这个取舍是合理的。

    但这种假设在多大程度上是正确的呢?在大多数集群中,机器确实会故障,但非常低频——甚至可以低到大多任务在运行时不会遇到任何机器故障。在这种情况下,为了容错引入的巨量额外损耗值得吗?

    为了理解 MapReduce 克制使用内存、细粒度重试的设计原因,我们需要回顾下 MapReduce 的诞生历程。当时谷歌内部的数据中心很多都是共享使用的——集群中的同一个机器上,既有在线的生产服务,也有离线的批处理任务。每个任务使用容器(虚拟化)的方式进行(CPU、RAM、Disk Space)资源预留。不同任务之间存在优先级,如果某个高优先级的任务需要更多资源,则该任务所在机器上的低优先级任务可能就会被干掉以让出资源。当然,优先级是和计算资源的价格挂钩的:团队需要为用到的资源付费,高优先级的资源要更贵。

    这种架构设计的好处是,可以面向非线上服务超发(overcommitted)资源(这也是云计算赚钱的理由之一)。因为系统通过优先级跟用户约定了,在必要时这些超发的资源都可以被回收。相比在线离线服务分开部署,这种混合部署、超发资源的方式能够更加充分的利用机器资源。当然代价就是,以低优先级运行的 MapReduce 的任务可能会随时被抢占。通过这种方式,批处理任务能够充分地利用在线任务等高优先级任务留下的资源碎片。

    统计来说,在谷歌当时集群中,为了让位给高优先级任务,持续一小时左右 MapReduce 子任务大约有 5% 的概率被中止。这个概率大概比由于硬件问题、机器重启和其他原因造成的子任务重启要高一个数量级。在这种抢占率下,对于一个包含 100 个子任务、每个子任务持续 10 分钟的 MapReduce 任务来说,在运行过程中,有超过一半的概率会发生至少一个子任务被中止。

    这就是为什么 MapReduce 面向频繁异常中止设计的原因:不是为了解决硬件的故障问题,而是给了系统随意中止子任务的自由,进而在总体上提高计算集群的资源利用率。

    但在开源的集群调度系统中,可抢占调度并不普遍。YARN 的 CapacityScheduler 支持抢占以在不同队列间进行资源的均衡,但到本书写作时,YARN、Mesos、Kubernetes 都不支持更为通用的按优先级抢占调度。在抢占不频繁的系统中,MapReduce 这种设计取舍就不太有价值了。在下一节,我们会考察一些做出不同取舍的 MapReduce 的替代品。

    参考资料

    [1]DDIA 读书分享会: https://ddia.qtmuniao.com/

    相关文章

    Oracle如何使用授予和撤销权限的语法和示例
    Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
    下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
    社区版oceanbase安装
    Oracle 导出CSV工具-sqluldr2
    ETL数据集成丨快速将MySQL数据迁移至Doris数据库

    发布评论