背景
flink cdc 1.0 版本早期,还未支持sqlserver ,oracle数据源。基于业务场景需要,对flink cdc项目进行二次开发,增加 sql server 作为数据源的能力。
业务流程是基于flink cdc,开发 sqlserver ---> Iceberg 数据链路。支持先全量同步后增量cdc同步 和 指定LSN开始执行cdc同步 两种模式。
从技术层次来讲,基于 flink + sqlservcer + Iceberg 可以实现端到端的精准一次语义:即Iceberg目的表的数据与源端sqlserver表的数据最终可以保持一致,不丢不重。但是在后期进入测试阶段后出现了Iceberg表数据集重复问题。
测试场景 & 问题
场景
- sqlserver表数据1000w,id 字段不重复!但是该表无主键;
- 同步方式使用先全量同步后增量cdc同步;
- 同步期间启动10个并发的测试任务随机对sqlserver表数据进行更新N条;
- sqlserver源表没有主键,目的端Iceberg表以全部字段作为 equality delete fields ;
测试步骤:
1-10 条 id 连续的数据,更新两个字段。
表的测试任务,并等待 flink cdc 任务同步完成完所有的更新日志;
预期结果: 最后数据更新的测试任务结束后,等flink cdc任务消费完全部的日志数据后,Iceberg表保持1000w数据量,且每条数据都和源表保持一致。
测试结果
Iceberg表出现了16条重复数据。
问题分析过程
分析多出来的数据
首先,通过 spark SQL 执行 SQL:
select count(id) from t17_v2
得到的结果大于 1000W 条数据。整个过程只有更新操作,因此可以推断 iceberg 表出现了 id
重复的数据!
然后,通过以下 sql 找出重复的数据:
Select id, count(id) from t17_v2 group by id having(id) > 1
发现的确出现了部分重复的数据(返回结果共16条)!
id | count |
---|---|
117 | 2 |
119 | 2 |
438 | 2 |
... | ... |
分析 iceberg 元数据
然后进一步分析 iceberg 表的元数据信息。
对应的快照文件。
Spark sql 日志,关注标记红色的部分:
spark-sql> select count(id) from t17_v2 ;
21/12/07 15:43:34 INFO BaseMetastoreTableOperations: Refreshing table metadata from new
version:
hdfs://mycluster/warehouse/tablespace/managed/hive/t17_v2/metadata/00008-730f1294-c209
-4612-84c9-80b2fff44f22.metadata.json
21/12/07 15:43:34 INFO BaseMetastoreCatalog: Table loaded by catalog:
spark_catalog.default.t17_v2
.....
21/12/07 15:43:34 INFO SparkContext: Created broadcast 3 from broadcast at
SparkBatchScan.java:141
21/12/07 15:43:34 INFO BaseTableScan: Scanning table spark_catalog.default.t17_v2 snapshot
3306220549964684376 created at 2021-12-07 15:43:04.765 with filter true
...... 10000016
Time taken: 7.852 seconds, Fetched 1 row(s)
21/12/07 15:43:42 INFO SparkSQLCLIDriver: Time taken: 7.852 seconds, Fetched 1 row(s)
从上面的日志中可以知道当时读取的快照为 3306220549964684376 ,sql执行的结果为 10000016
{
"sequence-number": 8,
"snapshot-id": 3306220549964684376,
"parent-snapshot-id": 6538052867896813584,
"timestamp-ms": 1638862984765,
"summary": {
"operation": "overwrite",
"flink.job-id": "5c2746a3148e0a0941cb84476df50058",
"flink.max-committed-checkpoint-id": "8",
"added-data-files": "1",
"added-delete-files": "2",
"added-records": "1027",
"added-files-size": "27523",
"added-position-deletes": "388",
"added-equality-deletes": "1027",
"changed-partition-count": "1",
"total-records": "10001600",
"total-files-size": "89852637",
"total-data-files": "8",
"total-delete-files": "4",
"total-position-deletes": "525",
"total-equality-deletes": "1600"
},
"manifest-list": "hdfs://mycluster/warehouse/tablespace/managed/hive/t17_v2/metadata/snap-3306220549964684376-1-f0df80fb-aa5f-4fda-b24a-06c388ee4b25.avro ",
"schema - id ": 0
}
可以看到,当前快照的 meta 信息描述中,operation 为 overwrite,表示当前快照出现了删除
数据的操作,而 total-records 为 10001600, total-equality-deletes 为 1600 条,计算可得当
前快照有效记录数为 1000W 条。
但是通过 spark sql 查询总数量为 10000016 条,所以推断equality delete file 中有 16 条记录没有命
中到 data file 中的记录;
分析 iceberg parquet 数据文件
结合上面查询出来的重复数据,选择 id=438 这条数据作为突破口。
找出查询出现重复数据的快照对应的 parquet 文件。Iceberg 的元数据有三层,所以写了个工
具类来列出所有的 data file,delete file 文件。
Data file 列表:
00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00001.parquet
00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00003.parquet
00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00005.parquet
00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00007.parquet
00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00009.parquet
00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00011.parquet
00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00013.parquet
00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00016.parquet
Equality delete file 列表:
00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00014.parquet
00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00017.parquet
Position delete file 列表:
00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00015.parquet
00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00018.parquet
然后通过 spark shell 来读取上面的data file 文件,查找 ID= 438 的重复数据:
进一步分析,看最早的一条 id=438 的数据从什么时候开始写入到 iceberg 的,最终发现在第
一个 parquet 文件(00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00001.parquet)被写入:
查看 meta 和快照元数据文件,得知第一个 parquet 文件(00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00001.parquet)是第一个commit中产生的,此时还在同步全量
数据的阶段。
然后,我们开始找第一条被删除的id= 438 的数据,它的全部字段分别是什么。
通过 spark 读取 equal delete file 文件查找第一条id=438 的数据:
综上信息,第一条被删除的 id=438 的数据,city 字段取值 ‘shanghai’, 所以正常的情况下,
第一条数据被写入的数据,city 字段也应该为 “shanghai” 。
但是从上面的结果来看,第一条被写入的数据是
id | city |
---|---|
438 | moca |
问题原因推测
经过上面的分析,可以大概推断出 两个问题:
分析 SQLserver cdc 日志数据
为了确认 前面推测的问题,于是对 sql server 的 cdc 日志数据进行了一次分析。
结合 flink-cdc 任务的日志,拿到当时做快照时最大的 LSN 为“0x0000117700006f780001”
查询该 LSN 之前的所有日志数据:
select
*
from
(
SELECT
t1.id,
t1.city,
t1.name3,
t1.name1,
t1.[__$operation],
master.sys.fn_varbintohex str(t2.start_lsn) as startLsn,
t2.start_lsn,
t1.[__$seqval],
master.sys.fn_varbintohexstr(t1.[__$seqval]) as sq,
t2.tran_begin_time,
t2.tran_end_time
FROM
cdc.dbo_t17_CT t1
join cdc.lsn_time_mapping t2 on t1.__$start_lsn = t2.start_lsn
where
t2.tran_begin_time & lt; '2021-12-07 16:30:47'
) as t
where
startLsn & gt;= '0x00001177000154280001'
and id = 438
order by
startLsn asc
查询结果如下:
从结果中看出,快照后对于 id=438 这条数据来说,第一个被删除的数据确实是 city=shanghai。
然后后面又看到,数据被多次修改后又重新了设置了一次 city=moca ;
到这里,可以确定写入到equality delete file的第一条数据(id=438,city=shanghai)是正确的,符合sqlserver的日志数据顺序。即后面的增量cdc同步是按照数据库顺序执行的。为了进一步确认这个结论,做了以下源码分析:
分析 flink-cdc-connector 项目源码
为什么 id 字段相同,data file 多出来的数据不能被删除?
当源表不存在主键的情况下,flink-cdc 会把表的所有字段作为equality delete fields ,来比较两条记录是否相等的标准。这个是业务设计逻辑。
由于更新类型的数据会被拆成两条,一条删除一条写入。所以猜测是否会出现以下情况:
(1) Sql server 单个事务内的日志记录是乱序的;
(2) Cdc 的 flink 任务做 snapshot 时,将一个 update 操作的两条数据截断了,刚好在写
入了删除的数据后触发 flink 的快照机制,导致数据不完整。
为了确认上面两个猜想,看了 flink-cdc-connector 的代码,cdc 任务读取 debezium 中的数据时,会竞争 flink 的
snapshot 的锁,所以不会出现数据被处理了一半就触发 flink 快照提交 iceberg 写入的情况;
另外,从 debezium 的源码中分析,可以知道对于 update 类型的操作,被传递到 flink 上层
应用的时候,两条数据会被合并为一条,参考 SqlServerStreamingChangeEventSource#execute
的关键代码:
同时查出来的数据会按照 lsn,seqval,operation 做正排序,所以不会出现数据乱序的情况:
到这里为止,第二种情况已经得到排除;
分析 debezium 项目源码
结合 对flink-cdc源码 的分析,数据的顺序是正确的,cdc 日志消费的顺序也是正确的。所以问题的关
键还是,flink-cdc 为何读取到的第一条 id=438 的数据 city=moca 而不是 city=shanghai。
所以推测出现下面一种情况:数据快照被破坏,全量同步期间其他事务对表的更新操作会被做快
照的事务(即cdc全量同步事务)读取到,导致equality delete file 的记录与 data file 的记录对不上,iceberg 查询出现重复数据。
重新去 debzium 官方翻找资料,找到以下关键配置项信息:
snapshot.isolation.mode
default value: repeatable_read
该配置项的描述信息如下:
Mode to control which transaction isolation level is used and
how long the connector locks tables that are designated for
capture. The following values are supported:
1. read_uncommitted
2. read_committed
3. repeatable_read
4. snapshot
5. exclusive (exclusive mode uses repeatable read isolation
level, however, it takes the exclusive lock on all tables to be
read).
The snapshot, read_committed and read_uncommitted modes do not prevent other transactions from updating table rows during initial snapshot. The exclusive and repeatable_read modes do prevent concurrent updates. Mode choice also affects data consistency.
Only exclusive and snapshot modes guarantee full consistency, that is, initial snapshot and streaming logs constitute a linear history.
In case of repeatable_read and read_committed modes, it might happen that, for instance, a record added appears twice - once in initial snapshot and once in streaming phase.
Nonetheless, that consistency level should do for data mirroring.
For read_uncommitted there are no data consistency guarantees at all (some data might be lost or corrupted).
对于数据快照的方式,debezium 提供了 5 种配置给到我们选择,显然这五种配置最后是映
射到 sqlserver 的 5 种事务隔离级别了。
不同的隔离级别,对于数据一致性的保证也不同。
从配置信息的介绍,snapshot 和 exclusive 两项都可以保证数据的一致性。但是官网提供的
默认配置是 repeatable_read,也就是默认情况下不能保证数据的一致性的。
这里涉及到数据库的隔离级别,所以对数据库隔离级别做一个简单的回顾。
隔离级别 | 解析 |
---|---|
未提交读(Read Uncommitted) | 允许脏读,也就是可能读取到其他会话中未提交事务修改的数据 |
提交读(Read Committed) | 只能读取到已经提交的数据。Oracle 等数据库默认都是该级别 (不重复读) |
可重复读(Repeated Read) | 可重复读。在同一个事务内的查询都是事务开始时刻一致的,InnoDB 默认级别。在SQL 标准中,该隔离级别消除了不可重复读,但是还存在幻象读 |
串行读(Serializable) | 完全串行化的读,每次读都需要获得表级共享锁,读写相互都会阻塞 |
另外,snapshot 是 sqlserver 特有的隔离级别,参考这里
(docs.microsoft.com/zh-cn/dotne…
为了理清数数据库事务隔离级别是如何影响debezium的数据快照操作,分析了 debezium 关于 sql server 做数据快照的源码,下面流程图:
4.7 结论
Debezium 默认使用了可重复读的隔离级别做数据快照,对照上面的流程图,debezium 在获
取到当前表结构之后,就释放了表锁!
对于可重复读,一般数据库都是利用行锁来实现的,一旦被读取过的数据,每一行都会被加
上行锁,防止其他事务对自己已经读取过的数据进行修改,但是由于行锁是锁住一行数据,
如果其他事务还是可以往这些数据中插入新的数据,因为插入操作是在数据之间的间隙插入
(B+树的叶子节点之间),所以它不能解决幻读的情况。
但是对于还没读取到的数据,其他事务是可以被修改的!例如,一张表有 1000W 条记录,
flink-cdc 全量同步,每次读取 1W 条数据,当读取了第一次,1-1W 的数据会被加上行锁,但
未提交读(Read Uncommitted) 允许脏读,也就是可能读取到其他会话中未提交事务修
改的数据
提交读(Read Committed) 只能读取到已经提交的数据。Oracle 等多数数据库默认
都是该级别 (不重复读)
可重复读(Repeated Read) 可重复读。在同一个事务内的查询都是事务开始时刻一
致的,InnoDB 默认级别。在 SQL 标准中,该隔离级别消
除了不可重复读,但是还存在幻象读
串行读(Serializable) 完全串行化的读,每次读都需要获得表级共享锁,读写
相互都会阻塞
是其他的事务仍然可以修改第 9W 条数据。
如图:红色位置的数据不能被其他事务更新,白色的没被读取过的数据仍然可以被其他事务
更新。
所以,可以推断出,id=438 这条数据,是在 debezium 释放了表锁,但是还没开始读的这个
期间,被其他事务做了多次的修改,导致 debezium 当前同步数据的事务读取到了已经被更
新过的数据。导致后面消费的 delete file 中的记录无法匹配到 data file 中的记录。
最后验证,给 flink 打断点,在 debezium 释放表锁之后,开始读取数据之前对数据进行修改。
最后发现测试的 iceberg 表也会出现重复数据。
5. 解决方法