记一次flinkcdc sqlservericeberg 数据重复问题排查过程

2023年 8月 18日 48.8k 0

背景

flink cdc 1.0 版本早期,还未支持sqlserver ,oracle数据源。基于业务场景需要,对flink cdc项目进行二次开发,增加 sql server 作为数据源的能力。

业务流程是基于flink cdc,开发 sqlserver ---> Iceberg 数据链路。支持先全量同步后增量cdc同步 和 指定LSN开始执行cdc同步 两种模式。

从技术层次来讲,基于 flink + sqlservcer + Iceberg 可以实现端到端的精准一次语义:即Iceberg目的表的数据与源端sqlserver表的数据最终可以保持一致,不丢不重。但是在后期进入测试阶段后出现了Iceberg表数据集重复问题。

测试场景 & 问题

场景

  • sqlserver表数据1000w,id 字段不重复!但是该表无主键;
  • 同步方式使用先全量同步后增量cdc同步;
  • 同步期间启动10个并发的测试任务随机对sqlserver表数据进行更新N条;
  • sqlserver源表没有主键,目的端Iceberg表以全部字段作为 equality delete fields ;

测试步骤:

  • 启动更新数据程序,对 sqlserver 源表数据进行更新,5 个并发,每次更新生成随机
    1-10 条 id 连续的数据,更新两个字段。
  • 启动配置好的 cdc,开始同步。
  • 观测 flink 任务日志, 当同步完 1000W 条数据后(大约 15 分钟),关闭更新数据库
    表的测试任务,并等待 flink cdc 任务同步完成完所有的更新日志;
  • 通过 spark sql 查询对应的 iceberg 表,观测结果是否含有 1000W 条数据;
  • 预期结果: 最后数据更新的测试任务结束后,等flink cdc任务消费完全部的日志数据后,Iceberg表保持1000w数据量,且每条数据都和源表保持一致。

    测试结果

    Iceberg表出现了16条重复数据。

    问题分析过程

    分析多出来的数据
    首先,通过 spark SQL 执行 SQL:

    select count(id) from t17_v2
    

    得到的结果大于 1000W 条数据。整个过程只有更新操作,因此可以推断 iceberg 表出现了 id
    重复的数据!
    然后,通过以下 sql 找出重复的数据:

    Select id, count(id)  from t17_v2 group by id having(id) > 1
    

    发现的确出现了部分重复的数据(返回结果共16条)!

    id count
    117 2
    119 2
    438 2
    ... ...

    分析 iceberg 元数据

    然后进一步分析 iceberg 表的元数据信息。

  • 通过分析 spark SQL 的日志,可以获取到当前执行的 sql 读取的 iceberg 表的快照。分析
    对应的快照文件。
    Spark sql 日志,关注标记红色的部分:
  • spark-sql> select count(id) from t17_v2 ;
    
    21/12/07 15:43:34 INFO BaseMetastoreTableOperations: Refreshing table metadata from new
    version:
    hdfs://mycluster/warehouse/tablespace/managed/hive/t17_v2/metadata/00008-730f1294-c209
    -4612-84c9-80b2fff44f22.metadata.json
    
    21/12/07 15:43:34 INFO BaseMetastoreCatalog: Table loaded by catalog:
    spark_catalog.default.t17_v2
    .....
    
    21/12/07 15:43:34 INFO SparkContext: Created broadcast 3 from broadcast at
    SparkBatchScan.java:141
    
    
    21/12/07 15:43:34 INFO BaseTableScan: Scanning table spark_catalog.default.t17_v2 snapshot
    3306220549964684376 created at 2021-12-07 15:43:04.765 with filter true
    
    
    ...... 10000016
    Time taken: 7.852 seconds, Fetched 1 row(s)
    21/12/07 15:43:42 INFO SparkSQLCLIDriver: Time taken: 7.852 seconds, Fetched 1 row(s)
    

    从上面的日志中可以知道当时读取的快照为 3306220549964684376 ,sql执行的结果为 10000016

  • 从 meta 文件中摘取对应的快照(3306220549964684376 )信息:
  • {
    	"sequence-number": 8,
    	"snapshot-id": 3306220549964684376,
    	"parent-snapshot-id": 6538052867896813584,
    	"timestamp-ms": 1638862984765,
    	"summary": {
    		"operation": "overwrite",
    		"flink.job-id": "5c2746a3148e0a0941cb84476df50058",
    		"flink.max-committed-checkpoint-id": "8",
    		"added-data-files": "1",
    		"added-delete-files": "2",
    		"added-records": "1027",
    		"added-files-size": "27523",
    		"added-position-deletes": "388",
    		"added-equality-deletes": "1027",
    		"changed-partition-count": "1",
    		"total-records": "10001600",
    		"total-files-size": "89852637",
    		"total-data-files": "8",
    		"total-delete-files": "4",
    		"total-position-deletes": "525",
    		"total-equality-deletes": "1600"
    	},
    	"manifest-list": "hdfs://mycluster/warehouse/tablespace/managed/hive/t17_v2/metadata/snap-3306220549964684376-1-f0df80fb-aa5f-4fda-b24a-06c388ee4b25.avro ",
    	"schema - id ": 0
    }
    

    可以看到,当前快照的 meta 信息描述中,operation 为 overwrite,表示当前快照出现了删除
    数据的操作,而 total-records 为 10001600, total-equality-deletes 为 1600 条,计算可得当
    前快照有效记录数为 1000W 条。
    但是通过 spark sql 查询总数量为 10000016 条,所以推断equality delete file 中有 16 条记录没有命
    中到 data file 中的记录;

    分析 iceberg parquet 数据文件

    结合上面查询出来的重复数据,选择 id=438 这条数据作为突破口。
    找出查询出现重复数据的快照对应的 parquet 文件。Iceberg 的元数据有三层,所以写了个工
    具类来列出所有的 data file,delete file 文件。

    Data file 列表:

    00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00001.parquet
    00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00003.parquet
    00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00005.parquet
    00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00007.parquet
    00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00009.parquet
    00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00011.parquet
    00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00013.parquet
    00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00016.parquet
    

    Equality delete file 列表:

    00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00014.parquet
    00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00017.parquet
    

    Position delete file 列表:

    00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00015.parquet
    00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00018.parquet
    

    然后通过 spark shell 来读取上面的data file 文件,查找 ID= 438 的重复数据:

    image.png

    进一步分析,看最早的一条 id=438 的数据从什么时候开始写入到 iceberg 的,最终发现在第
    一个 parquet 文件(00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00001.parquet)被写入:

    image.png

    查看 meta 和快照元数据文件,得知第一个 parquet 文件(00000-0-647359f6-bb8b-44ce-9946-b6ce52d74a08-00001.parquet)是第一个commit中产生的,此时还在同步全量
    数据的阶段。

    然后,我们开始找第一条被删除的id= 438 的数据,它的全部字段分别是什么。

    通过 spark 读取 equal delete file 文件查找第一条id=438 的数据:

    记一次flink-cdc sqlserver-iceberg 数据重复问题排查过程-1

    综上信息,第一条被删除的 id=438 的数据,city 字段取值 ‘shanghai’, 所以正常的情况下,
    第一条数据被写入的数据,city 字段也应该为 “shanghai” 。

    但是从上面的结果来看,第一条被写入的数据是

    id city
    438 moca

    问题原因推测

    经过上面的分析,可以大概推断出 两个问题:

  • 要么flink-cdc在全量同步期间,数据表的全局快照被破坏了,导致数据一致性问题,出现重复数据。
  • 要么就是 增量同步期间读取到的日志数据是错误的,它读取到的第一条删除数据应该是 id=438 , city = 'moca' ;
  • 分析 SQLserver cdc 日志数据

    为了确认 前面推测的问题,于是对 sql server 的 cdc 日志数据进行了一次分析。
    结合 flink-cdc 任务的日志,拿到当时做快照时最大的 LSN 为“0x0000117700006f780001”

    查询该 LSN 之前的所有日志数据:

    select 
      * 
    from 
      (
        SELECT 
          t1.id, 
          t1.city, 
          t1.name3, 
          t1.name1, 
          t1.[__$operation], 
          master.sys.fn_varbintohex str(t2.start_lsn) as startLsn, 
          t2.start_lsn, 
          t1.[__$seqval], 
          master.sys.fn_varbintohexstr(t1.[__$seqval]) as sq, 
          t2.tran_begin_time, 
          t2.tran_end_time 
        FROM 
          cdc.dbo_t17_CT t1 
          join cdc.lsn_time_mapping t2 on t1.__$start_lsn = t2.start_lsn 
        where 
          t2.tran_begin_time & lt; '2021-12-07 16:30:47'
      ) as t 
    where 
      startLsn & gt;= '0x00001177000154280001' 
      and id = 438 
    order by 
      startLsn asc
    

    查询结果如下:

    image.png

    从结果中看出,快照后对于 id=438 这条数据来说,第一个被删除的数据确实是 city=shanghai。
    然后后面又看到,数据被多次修改后又重新了设置了一次 city=moca ;

    到这里,可以确定写入到equality delete file的第一条数据(id=438,city=shanghai)是正确的,符合sqlserver的日志数据顺序。即后面的增量cdc同步是按照数据库顺序执行的。为了进一步确认这个结论,做了以下源码分析:

    分析 flink-cdc-connector 项目源码

    为什么 id 字段相同,data file 多出来的数据不能被删除?

  • 当源表不存在主键的情况下,flink-cdc 会把表的所有字段作为equality delete fields ,来比较两条记录是否相等的标准。这个是业务设计逻辑。

  • 由于更新类型的数据会被拆成两条,一条删除一条写入。所以猜测是否会出现以下情况:

  • (1) Sql server 单个事务内的日志记录是乱序的;
    (2) Cdc 的 flink 任务做 snapshot 时,将一个 update 操作的两条数据截断了,刚好在写
    入了删除的数据后触发 flink 的快照机制,导致数据不完整。
    

    为了确认上面两个猜想,看了 flink-cdc-connector 的代码,cdc 任务读取 debezium 中的数据时,会竞争 flink 的
    snapshot 的锁,所以不会出现数据被处理了一半就触发 flink 快照提交 iceberg 写入的情况;

    image.png

    另外,从 debezium 的源码中分析,可以知道对于 update 类型的操作,被传递到 flink 上层
    应用的时候,两条数据会被合并为一条,参考 SqlServerStreamingChangeEventSource#execute
    的关键代码:

    image.png

    同时查出来的数据会按照 lsn,seqval,operation 做正排序,所以不会出现数据乱序的情况:

    image.png

    到这里为止,第二种情况已经得到排除;

    分析 debezium 项目源码

    结合 对flink-cdc源码 的分析,数据的顺序是正确的,cdc 日志消费的顺序也是正确的。所以问题的关
    键还是,flink-cdc 为何读取到的第一条 id=438 的数据 city=moca 而不是 city=shanghai。

    所以推测出现下面一种情况:数据快照被破坏,全量同步期间其他事务对表的更新操作会被做快
    照的事务(即cdc全量同步事务)读取到,导致equality delete file 的记录与 data file 的记录对不上,iceberg 查询出现重复数据。

    重新去 debzium 官方翻找资料,找到以下关键配置项信息:

    snapshot.isolation.mode

    default value: repeatable_read

    该配置项的描述信息如下:

    Mode to control which transaction isolation level is used and
    how long the connector locks tables that are designated for
    capture. The following values are supported:
    1. read_uncommitted
    2. read_committed
    3. repeatable_read
    4. snapshot
    5. exclusive (exclusive mode uses repeatable read isolation
    level, however, it takes the exclusive lock on all tables to be
    read). 
    
    The snapshot, read_committed and read_uncommitted modes do not prevent other transactions from updating table rows during initial snapshot. The exclusive and repeatable_read modes do prevent concurrent updates. Mode choice also affects data consistency. 
    Only exclusive and snapshot modes guarantee full consistency, that is, initial snapshot and streaming logs constitute a linear history. 
    In case of repeatable_read and read_committed modes, it might happen that, for instance, a record added appears twice - once in initial snapshot and once in streaming phase. 
    Nonetheless, that consistency level should do for data mirroring. 
    For read_uncommitted there are no data consistency guarantees at all (some data might be lost or corrupted).
    

    对于数据快照的方式,debezium 提供了 5 种配置给到我们选择,显然这五种配置最后是映
    射到 sqlserver 的 5 种事务隔离级别了。

    不同的隔离级别,对于数据一致性的保证也不同。
    从配置信息的介绍,snapshot 和 exclusive 两项都可以保证数据的一致性。但是官网提供的
    默认配置是 repeatable_read,也就是默认情况下不能保证数据的一致性的。

    这里涉及到数据库的隔离级别,所以对数据库隔离级别做一个简单的回顾。

    隔离级别 解析
    未提交读(Read Uncommitted) 允许脏读,也就是可能读取到其他会话中未提交事务修改的数据
    提交读(Read Committed) 只能读取到已经提交的数据。Oracle 等数据库默认都是该级别 (不重复读)
    可重复读(Repeated Read) 可重复读。在同一个事务内的查询都是事务开始时刻一致的,InnoDB 默认级别。在SQL 标准中,该隔离级别消除了不可重复读,但是还存在幻象读
    串行读(Serializable) 完全串行化的读,每次读都需要获得表级共享锁,读写相互都会阻塞

    另外,snapshot 是 sqlserver 特有的隔离级别,参考这里
    (docs.microsoft.com/zh-cn/dotne…

    image.png

    为了理清数数据库事务隔离级别是如何影响debezium的数据快照操作,分析了 debezium 关于 sql server 做数据快照的源码,下面流程图:

    image.png

    4.7 结论

    Debezium 默认使用了可重复读的隔离级别做数据快照,对照上面的流程图,debezium 在获
    取到当前表结构之后,就释放了表锁!
    对于可重复读,一般数据库都是利用行锁来实现的,一旦被读取过的数据,每一行都会被加
    上行锁,防止其他事务对自己已经读取过的数据进行修改,但是由于行锁是锁住一行数据,
    如果其他事务还是可以往这些数据中插入新的数据,因为插入操作是在数据之间的间隙插入
    (B+树的叶子节点之间),所以它不能解决幻读的情况。

    但是对于还没读取到的数据,其他事务是可以被修改的!例如,一张表有 1000W 条记录,
    flink-cdc 全量同步,每次读取 1W 条数据,当读取了第一次,1-1W 的数据会被加上行锁,但
    未提交读(Read Uncommitted) 允许脏读,也就是可能读取到其他会话中未提交事务修
    改的数据
    提交读(Read Committed) 只能读取到已经提交的数据。Oracle 等多数数据库默认
    都是该级别 (不重复读)
    可重复读(Repeated Read) 可重复读。在同一个事务内的查询都是事务开始时刻一
    致的,InnoDB 默认级别。在 SQL 标准中,该隔离级别消
    除了不可重复读,但是还存在幻象读
    串行读(Serializable) 完全串行化的读,每次读都需要获得表级共享锁,读写
    相互都会阻塞
    是其他的事务仍然可以修改第 9W 条数据。

    image.png

    如图:红色位置的数据不能被其他事务更新,白色的没被读取过的数据仍然可以被其他事务
    更新。
    所以,可以推断出,id=438 这条数据,是在 debezium 释放了表锁,但是还没开始读的这个
    期间,被其他事务做了多次的修改,导致 debezium 当前同步数据的事务读取到了已经被更
    新过的数据。导致后面消费的 delete file 中的记录无法匹配到 data file 中的记录。
    最后验证,给 flink 打断点,在 debezium 释放表锁之后,开始读取数据之前对数据进行修改。
    最后发现测试的 iceberg 表也会出现重复数据。

    5. 解决方法

  • 修改 debezium 的代码,同步数据的过程中不释放表锁,直到最后同步完数据后再释放。缺点是需要锁表,对业务产生一定的影响。
  • 上层的flink-cdc在调用debezium的接口时使用snapshot隔离级别。
  • 相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论