作者简介:鲍贵明,跨越速运高级大数据工程师。负责实时数据研发平台、数据湖的调研和引入,专注于实时/离线数仓架构最佳实践和方案演进,致力于为跨越速运提供低门槛、高效的实时计算和数据湖等大数据基础设施体系。
一、运单查询引发的数据实时处理需求
跨越速运成立于 2007 年,是一家在物流行业颇具影响力的大型综合速运企业。随着国内经济的持续增长,消费者的购物习惯也在不断变化,对物流行业的需求呈现爆发式增长。目前,跨越速运的服务已经覆盖全国 99% 的城市,年服务企业超 100 万家。跨越速运内部有超过 100 位 BI 分析师,日常工作需要借助大数据平台中的服务项目进行数据研发。
跨越速运的数据服务场景中有一万余数据接口,日调用量超过 1000 万次,并保持 99% 查询时延小于 1 秒,我们的大数据平台为 6 万名员工创造了良好的数据查询体验。在本文中,我将介绍跨越速运基于Flink + OceanBase的实时分析解决方案与实践。
众所周知,物流行业最核心的数据分析是围绕运单展开的。一批货物从报单到结单,经过数十个业务系统,走遍整个业务全链路。在这个过程中,我们需要将这些业务系统的基础字段进行整合,使用复杂的关联和分层计算,形成数仓运单域,并通过大数据的各类平台服务对外提供。
早期,用户会满足于使用单表固定维度来进行数据聚合,按指定条件进行点查。即使离线批量更新千万级数据的单表,秒级的查询响应服务也能得到很多用户的支持。
然而,随着物流行业的竞争加剧,用户对数据查询和响应时效要求也在逐渐提高。比如在运单分析中,为了满足用户查询需求,系统需要实时更新过亿行的数据表,达到亚秒级(1GHz/秒)的查询时效,以及多表关联后对任意维度、字段进行聚合。
为了更好的满足用户需求,提升用户体验,跨越速运对数据库提出了以下要求:
- 拥有极致的查询性能。
- 计算引擎能力足够稳定。
- 支持实时写入、更新等。
- 易用,比如标准 SQL、丰富的函数库、活跃的社区,最好可以高度兼容 MySQL 协议。
- 支持多种类型的数据源,确保数据能够进得来、出得去,并且满足使用场景的需求。
二、实时计算引擎和数据库选型背后的思考
基于上述需求,我们需要一款实时计算引擎和性能强劲的数据库作为支撑。
(一)实时计算引擎选型及对比测试
我们对比了不同时期的三款主流实时计算框架:Storm、SparkStreaming 和Flink。从下图可以看出,Storm 和 Flink 的数据延迟属于毫秒级别,而是从状态管理、流批一体、数据集成生态和易用性角度综合对比,Flink 更有优势。因此,Flink 成为了我们计算架构的首要选择。
(二)从业务场景出发,自建 Benchmark 做选型
当计算引擎有了着落后,我们开始进行数据库的选择。如何选择一个适合的数据库?我们的依据是行业常见的数据测试对比标准,比如 TPC-H、TPC-DS、SSB 等,以及各类数据库厂商官网提供的用户实践及案例。
考虑到企业有自身的业务实际场景,在测试时进行了针对性的优化,因此,我们没有完全依赖企业案例中的测试结果进行选择,而是根据业务分析需求自建了一套 Benchmark 标准。该标准包括统一的测试机型与环境、基于运单分析的标准数据集、基于运单分析的标准 SQL 集、基于实际需要的功能测试集等。
依照 Benchmark 标准,我们对 DB-U(某分布式 HTAP 数据库)、OceanBase、 DB-X(某AP实时分析数据库)、Doris、Trino 进行了查询性能测试和对比。从下图可以看出,我们测试选用的是 3 台 32C+128G的固态盘机器,读取的最大表有 1 亿行,数据大小为 35GB 左右。经测试和对比,OceanBase 和 DB-X(某 AP 实时分析数据库)的参数性能表现更优。
(三)综合考量,选择 OceanBase
在测试查询性能后,我们从常用的功能、大数据生态集成、可维护性这三个角度综合对比了上述几款数据库。从下图的测试结果可见,OceanBase 除了不支持 Hive 集成和联邦查询之外,其他表现均非常出色。尽管某 AP 型数据库、Doris、Trino在大数据生态集成方面表现更优,但在可维护性方面相较于 OceanBase 仍有进一步提升的空间。
基于上述对比结果,我们对 OceanBase Connector、JDBC Connector、某 AP 型数据库 Connector 进行了每秒写入速率的性能对比。OceanBase Connector 开启分区写、280 个字段、10 个并发度的条件下,10 分钟写了 1000 万数据,写入速度和某 AP 型数据库 Connector接近,但比 JDBC Connector 写入快两倍左右。这些测试结果进一步证明了 OceanBase 在数据库连接和数据处理性能方面的优势。
因此,综合查询性能、功能支持、写入性能的测试结果,我们认为 OceanBase 相比测试的其他数据库更符合预期。此外, 考虑到 OceanBase 的 HTAP 特性和便捷的运维管理平台,我们最终选择使用 OceanBase 来解决实时分析场景下的业务痛点。
- HTAP 特性:目前我们的业务系统使用 MySQL 支持 OLTP 场景,OLAP 场景使用某 AP 型数据库,但还有许多业务需要兼顾事务处理和实时分析能力。例如实时运单分析场景,既需要实时写入数据、更新数据,又需要良好的实时分析性能。OceanBase 一套系统同时支持 OLTP 和 OLAP 能力,且业务间不影响,正好满足业务需求。
- 便捷的运维管理平台:如果一款数据库需要通过原始的命令行进行管理监控,或需要自研可视化管理监控,就说明它使用复杂,成本较高。某分布式 HTAP 数据库和 OceanBase 均有运维管理平台,但前者的架构更为复杂且不支持行列混存,需要双倍的存储空间;而后者行列混合存储,且具备高压缩比特性,可以降低存储成本。
确定使用 Flink+OceanBase 的解决方案后,我们迅速在企业内部实施落地。
三、Flink + OceanBase 在实时运单分析场景的应用
下图是实时运单公共层的示意图,可见业务数据经过订单、跟单、配载、调度、质控、财务等系统,经过基础字段整合、复杂关联计算,实时写入运单域 DWD 大宽表,并存储于 OceanBase 中,然后通过大数据平台对接铸剑系统,进行实时运单的分析与查询。
借助 OceanBase CDC+Flink 状态管理,我们做分层计算并轻度汇总 DWS,对近 15 天的时效产品分析数据表和各线路走货量进行数据分析,并通过大数据平台的数据接口服务,提供给用户。
最终,我们使用轻度汇总的近 15 天运单时效产品分析统计表,经过高度汇总形成 ADS 层的大盘指标,如近一周的时效达成率、当日的客诉量等指标,通过 QuickBI 做成可视化的数据门户,让数据可以更加直观、实时的展示。
(一)实时运单公共层构建难点
在我们构建实时运单公共层的过程中,遇到了以下难点:
- 数据来源多,需要将不同业务模块的数据实时打宽。
- 数据时效性要求高,数据要求虽然参差不齐,但尽量越快越好。
- 公共层数据需要复用,提供给下游实时打标、数仓分层计算。
在调研实时宽表解决方案后,我们总结了以下 4 个方法:
- 采用 Flink Join,它支持一对多关联,但因为大状态十分影响数据的处理性能,所以需要合理设计状态的 TTL。
- HBase 部分列更新,它支持对数据的合并。可以根据 Rowkey 做高并发的实时写和高效率的随机读。但它需要实现 CDC 服务,同步增量数据给到下游做 OLAP 分析 。
- StarRocks 部分列更新,它可以数据合并和分析一体,但只支持对同一主键进行合并,不支持 CDC。
- OceanBase 部分列更新,解决上述方法不支持 CDC 的问题。
基于上述实时宽表解决方案,我们构建了实时云端分析架构 1.0。
(二)实时运单分析架构优化
基于实时云端分析架构 1.0,实时宽表的实现逻辑可以概括为五个步骤。
第一步,进行数据源配置与监听。为各个业务库的 MySQL 数据库配置 Canal 监听,并将变更数据实时写入 Kafka,让业务数据库的任何数据变动都会被捕获并传输到 Kafka。
第二步,进行数据整合与宽表构建。使用 FlinkSQL 任务读取 Kafka 中的数据,将这些数据写入HBase 的数据表中,通过部分列更新实现宽表的构建。
第三步,配置 HBase CDC 服务配置。可以将数据变更实时写入 Kafka。
第四步,进行实时数据更新与查询。配置另一个 FlinkSQL 任务,该任务实时读取Kafka 中的宽表增量数据,并将其写入 DB-X(某 AP 实时分析数据库),实现宽表的实时更新。
第五步,将数据对外提供使用。前面几步更新的宽表数据可以在大数据的各类服务平台上对外提供使用。
实时云端分析架构 1.0 解决了 Flink 多流 join 状态超大的问题,让数据处理更为高效,降低了系统负担。同时,提升了数据时效性和分析性能,将数据整体处理时间缩短至小于 20 秒,数据分析时间缩短至小于 3 秒,为用户提供了更快速、高效的数据分析服务。
但是,实时云端分析架构需要自研 HBase CDC,我们投入了更多的资源进行维护和更新,增加了研发成本和复杂性。由于实时云端分析架构的复杂性,链路相对较长,排查问题可能变得较为困难,同时,涉及多个组件和技术的集成,需要定期进行更新、优化和故障排除,维护成本变高。
因此,我们升级实时运单分析架构,以下为 2.0 版本。
从架构图中我们可以看到实现逻辑稍有变化。第一步仍然对业务库 MySQL 配置 Canal 监听,生成 Binlong 数据,写入 Kafka;第二步用 FlinkSQL 读取 Kafka 数据。将同主键但不同来源模块的字段写入 OceanBase,此时已经是实时加工的运单宽表,可以直接使用大数据的各类平台对外提供使用;第三步使用 OceanBase CDC+Flink 状态管理做分层计算。通过聚合分层再写入 OceanBase,根据不同需求,使用大数据的各类平台对外提供使用。
经过优化的实时运单分析架构 2.0 带来了较大的收益:
- 数据时效性更高。从数据整体的时效性上而言,架构 2.0 比 1.0 快 5~15 秒。
- 链路简单,排查更方便。架构 1.0 链路复杂,排查问题困难,而 2.0 只需 Flink + OceanBase,排查问题更加方便。
- 现代数据架构,集群成本节约 50%。架构 1.0 需要多维护一套 HBase 集群,多写入 2 次 Kafka,和多写入一份数据到 DB-X(某AP实时分析数据库),而架构 2.0 则只需一套 OceanBase,节约了集群成本。
四、OceanBase提效展望
未来,我们希望在架构侧实现以下三方面设想。
- 探索使用 MySQL CDC 替换 Canal 同步:CDC 能够实时跟踪和记录数据变化,减少数据同步的延迟和复杂性,通过采用 MySQL CDC 来替代 Canal,可以更有效地捕获和处理数据库的变更数据。
- 使用 OceanBase 作为实时维表提供服务:利用 OceanBase 的高性能、高可用性和扩展性,将其作为实时维表可以提高查询效率和数据一致性。
- 使用 OceanBase 替换部分 MySQL 的 TP 应用场景:通过将部分 MySQL 的 TP 应用场景迁移到 OceanBase,可以利用OceanBase在分布式事务处理方面的优势,提高系统的吞吐量和并发性能。
同时,使用 OceanBase 提升开发效能,我们计划:
- 将实时计算平台集成到 OceanBase Catalog,更好地统一管理和调度实时计算任务,提高开发效率和资源利用率;
- 把 FlinkSQL 血缘管理集成到 OceanBase,通过这样的形式进行追踪数据流转和处理过程,能够提高数据治理和运维效率;
- 推动内部的数据平台支持OceanBase数据源,实现数据的统一管理和共享,让数据能进能出,支持不同的流向,提高数据的可用性和利用率。