1.背景
近一年交易对系统的技术架构做了重大升级和改造,其中包括订单、运单、账单模型升级,升级过程中很重要的一部分是数据迁移。在数据迁移过程中面临如下挑战:
由于是交易的核心数据的迁移,并且迁移过程中业务并行运行,所以需要保障在迁移中和迁移后都不能影响业务。
数据量大:账单和订单数据规模庞大(订单、账单相关数据在百亿级以上),迁移过程需要高效处理大规模数据,同时也要高质量保障数据无丢失及准确性。
异构数据复杂:新老数据库表结构差异很大,在迁移中需要做大量且复杂的数据转换。
业务运行要稳:由于是交易的核心数据的迁移,并且迁移过程中业务并行运行,所以需要保障在迁移中和迁移后都不能影响业务。
2.迁移工具选型
2.1 常见迁移工具
Sqoop:apache开源的一款在关系数据库和Hadoop之间传输数据的工具,它更偏向于关系型数据库与Hadoop间的迁移。
yugong:愚公移山,阿里为去IOE开发的一款从Oracle到Mysql的数据迁移工具,它偏向单表间的数据迁移,扩展性相对较弱。
Datax:阿里开源的一个被广泛使用的异构数据源离线同步工具,它提供MySQL、Oracle、PgSQL、HDFS、Hive、HBase等各种异构数据源之间高效同步数据的功能,它偏向于离线数据同步。
2.2 现状分析
表分片:交易的数据库表是水平分表的,一张逻辑表对应若干分片表,因此迁移需要考虑到表分片的问题。
自定义模型转换 :交易数据迁移中最为复杂、工作量最大的一块是数据模型的转换,无法用SQL来完成,需要编写大量的数据读取、转换、持久化的代码。
分布式任务调度:因为数据量庞大,为了提高数据迁移速率,需要拆分为多个任务并分配给多台机器并行执行。
灵活流程控制:迁移周期长,在迁移过程中会涉及代码改动,以适应不同的业务诉求和数据场景,需支持可延伸、可中断、可恢复。
任务编排:我们的数据是按时间段拆分顺序迁移,当同时切分多个时段的任务时,需要编排这些任务的执行顺序。
稳定性保障:数据迁移过程中是要与业务并行运行,需具备可限流、可降级、可熔断、可重试和告警能力,确保万无一失。
2.3 选型结果
工具对比:
在面对交易数据迁移的这些特点时,业内常见的迁移工具都不能很好的支持,加之工具使用成本高、难以定制化等方面的考量,因此决定自建一个**适合交易
**的数据迁移工具。
3. 自建迁移框架
架构图:
3.1 角色
Supervisor:一个JVM实例,负责任务生成、拆分、分发、编排、管理等,supervisor_id为ip:port
。
Worker:JVM实例,负责具体任务的执行,执行完成后通知Supervisor,worker_id为group:uuid:ip:port
。
3.2 注册中心
Supervisor与Worker通过注册中心相互发现。
注册中心:通过Redis的Zset和pub/sub实现,Zset做角色的注册及发现,pub/sub做上线/下线的通知。
register:定期保持心跳,更新socre为当前毫秒时间戳。
discovery:定期去更新对方角色列表,使用zremrangebyscore
移除过期死亡的角色。
pub/sub:当角色上下线时,会发送事件通知对方角色实时更新discovery列表。
3.3 核心流程
3.3.1 创建job
job是用来定义任务配置的表数据,通过job来指定何时触发做何事,使用方可通过Openapi接口来创建job
以下是job的一份配置样例数据(只列了一些主要字段)
{
"jobGroup": "test",
"jobName": "XXX数据迁移【20210101~20210331】",
"jobHandler": "xxx.xxxxxx.xxx.xxx.xxx.DataMigrateJobHandler",
"jobParam": "{"batch_size":200,"parallelism":50,"sharding_number":1024,"from_create_time":"2021-01-01","to_create_time":"2021-03-31"}",
"triggerType": 2,
"triggerConf": "2022-08-21 21:40:00",
"routeStrategy": 1
}
jobHandler:由使用方编写的自己的任务处理器
triggerType:触发器类型,具体的值在triggerConf
字段中配置,支持多种类型
1:Cron表达式,如0 0 23 * * ?
2:指定时间触发一次,如2022-08-21 21:40:00
3:任务依赖,配置父jobId,当父任务执行完成会触发执行子任务(多个父jobId以逗号分隔)
routeStrategy:分发任务给Worker的路由策略
1:轮询
2:随机
3:一致性哈希
4:本地优先,当一个JVM同时是Supervisor和Worker时,Supervisor会优先分发给本地的Worker
3.3.2 生成任务
Supervisor会定时去扫描job表,获取1分钟内将要触发执行的job数据,然后生成instance以及拆分为多个task。
3.3.3 分发任务
Supervisor会按Job配置的路由策略把task分发给Worker执行。分发方式使用Redis List数据结构实现。分发后如果Worker宕机且未执行,Supervisor有一个定时任务会扫描未执行的task重新分发。
worker_key: tasks.dispatch.group:uuid:ip:port
。
任务分发:redis.rPush({worker_key}, {task_data})
。
3.3.4 接收任务
Worker有一个定时任务从redis list中接收task,然后放入时间轮中。当到达触发时间后,会把task提交到自定义的线程池WorkerThreadPool
中,交由WorkerThread
线程执行。
Redis lua脚本
local ret = redis.call('lrange', {worker_key}, 0, {batch_size}-1);
redis.call('ltrim', {worker_key}, {batch_size}, -1);
return ret;
任务的流转过程
3.3.5 执行任务
WorkerThread
会创建JobHandler对象并执行JobHandler#execute
方法,execute方法就是使用方的具体业务代码。
3.4 Checkpoint
任务执行期间可以使用Checkpoint来保存task的执行快照,以便在宕机/重启/暂停
后的恢复时能继续执行,Checkpoint有以下一些使用场景:
1.如果是在循环体中处理业务,可每隔一定周期调用checkpoint保存执行快照。
2.发生异常时,在中断任务前调用checkpoint保存异常前的执行快照。
3.可以在业务处理完的最后调用checkpoint保存最终的执行结果数据。
3.5 暂停/恢复
暂停:客户端可以调用openapi接口来暂停正在执行
的任务,暂停时可使用Checkpoint保存任务当前的执行快照。
恢复:客户端可以调用openapi接口来恢复已被暂停
的任务,恢复时先读取暂停前的执行快照,然后从上一次的执行快照中恢复继续执行。
3.6 限流
在数据迁移过程中,为保障平稳运行不影响业务,需要对迁移速率做限流:根据DB资源情况调整流量
、按业务高低峰分时段限流
、异常时自动限流
、压测时手动限流
等场景。
{
"flow_limit":{
"07": 100,
"12": 50, // 中午12点后,限制每个POD每秒最多迁移50个订单
"20": 200,
"22": 500,
"00": 700
}
}
常见的限流算法有漏桶算法、令牌桶算法等,迁移框架是基于Guava库的令牌桶算法来做的二次封装。
漏桶算法:当请求进入漏桶中,漏桶以一定速度响应,请求速度过大直接溢出丢弃,速率平滑。缺点是无法很好的处理突发流量
令牌桶算法:请求获取令牌,当没有令牌可拿时阻塞或者拒绝服务,允许一定程度的突发调用。
3.7 错误率熔断
通过bit set来统计最近N条数据迁移的错误率,当错误率高于指定的阈值则自动暂停
任务。
1.创建一个指定长度(如10240位)的bit set,初始化全为1(成功)。
2.每批次迁移N条数据,对应迁移成功或失败的结果写入bit set。
3.统计迁移失败的比率,如果大于阈值,则直接自动暂停任务。
4. 迁移实践
方案图:
4.1 迁移流程
数据迁移流程分为4个步骤:
源数据扫描:基于各个业务的基表的物理分片进行源数据扫描,可按照时间范围、业务类型等条件筛选。
数据模型转换:根据各个业务需求,将源数据结构转换为目标数据的结构。
目标数据持久化:将转换好的目标数据,持久化到目标数据库中。
结果数据核对:持久化目标数据后,对目标数据进行数据质量核对。
4.2 迁移保障
4.2.1 失败重试
当迁移的任务出现异常时,会立即进行一次重试,重试失败后, 我们会将失败的任务数据记录到重试表中,当达到最大重试次数后仍失败时,系统将触发告警,通知相关人员介入处理。
我们通过SQL语法特性来支持数据的可重复迁移
INSERT INTO target_table (...) VALUES (...) ON DUPLICATE KEY UPDATE ...
4.2.2 数据核对
为了确保数据迁移的准确性,我们使用了以下核对机制:
**迁移前离线核对:**在双写后,我们通过大禹核对新老库数据是否一致,保障双写准确性以及为后续迁移核对做铺垫。
**迁移中实时核对:**在迁移过程中,我们通过调用业务新老查询接口,将迁移数据重新转换为源数据的异源同构数据后,进行字段核对。
**迁移后离线核对:**在迁移完成后,我们通过数仓抽取全量的数据,在大禹进行全量数据的核对,确保数据的一致性。
迁移后流量回放:在迁移完成后,我们将从线上录制真实的旧链路请求,并回放到新链路的对应接口中,对比新旧链路结果的差异,确保差异都是预期内的,以确认线上查询请求的结果符合预期。
4.3 迁移调控
为了确保在迁移过程中不影响业务系统,我们通过以下方法进行调控:
**任务中断恢复:**异常情况下,可中断迁移任务以避免进一步影响业务流程。在问题解决后,可以重新启动新的迁移任务或恢复中断的任务,确保迁移流程的稳定性和连续性。
**速率动态调整:**对于迁移过程中的性能指标和异常情况,可以通过实时配置,调整读取源数据的速率,降低迁移对业务系统的影响。
**错误率阈值熔断:**可以设定迁移错误率的阈值,如果错误率超过阈值,将终止迁移任务,待问题确认或修复后再重新进行迁移。在错误率未达到阈值之前,迁移错误不会影响整个迁移任务的执行。
4.4 迁移策略
通过配置多个任务,指定任务的执行优先级,指定任务迁移数据的时间、业务类型等限制范围,按照下面的策略进行数据迁移:
**测试数据:**迁移测试地区全量数据,确认测试地区数据迁移无问题。
**早期数据:**逐步迁移最早半年10%的数据,用于灰度观察及验证,确认早期数据迁移无问题。
**近期数据:**逐步迁移最近半年10%的数据,用于灰度观察及验证,确认近期数据迁移无问题。
**全量数据:**从前往后按时间段分多批次迁移所有数据。
4.5 降级预案
当迁移后如果发现数据有问题时,我们需要有快速的方案做降级止血,防止问题进一步扩大影响,我们提供了两种方案降级:
**打标降级:**在迁移后可批量或按时间范围对订单的双写标记置为false,使订单沿用旧链路逻辑操作。
**删除降级:**在迁移后通过接口能批量删除订单新库数据及打标信息,使订单沿用旧链路逻辑操作。
5、总结与展望
通过本次大规模数据迁移探索与实践,我们沉淀了一套通用、高效、可靠的迁移工具及方案。尽管不同的迁移任务可能在复杂程度和侧重点上有所差异,但总体的迁移诉求及过程基本相似。通过这套工具及方案的实施,能高效的完成大规模数据迁移及保障迁移中数据的完整性、准确性。
在未来我们重点会去探索迁移工具平台化,当前主要是在面对不同任务需要针对性的编码,无法快速高效的复用到其他团队,未来我们将建成平台,让不同业务团队可以通过平台动态配置化的能力来自动完成迁移。