全方位解读SeaTunnel MySQL CDC连接器:实现数据高效同步的强大工具

2023年 12月 7日 88.2k 0

在当今数据快速增长的时代,实时、高效地同步和处理来自各种数据源的信息成为了企业和开发者面临的重要挑战。

MySQL作为广泛使用的数据库之一,其变更数据捕获(CDC)功能对于实现这一目标至关重要。在这篇文章中,我们将深入探讨MySQL CDC源连接器在SeaTunnel框架下的应用,涵盖从基础设置到高级配置的各个方面。

MySQL CDC源连接器

支持的引擎

SeaTunnel Zeta
Flink

主要特性

  • [ ] 批量
  • [x] 流式
  • [x] 精确一次
  • [ ] 列投影
  • [x] 并行处理
  • [x] 支持用户定义的拆分

描述

MySQL CDC连接器允许从MySQL数据库读取快照数据和增量数据。本文档描述了如何设置MySQL CDC连接器以对MySQL数据库运行SQL查询。

支持的数据源信息

数据源 支持的版本 驱动 Url Maven
MySQL
  • MySQL: 5.6, 5.7, 8.0.x

  • RDS MySQL: 5.6, 5.7, 8.0.x

com.mysql.cj.jdbc.Driver jdbc:mysql://localhost:3306/test https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28

数据库依赖

安装Jdbc驱动程序

请将mysql驱动程序下载并放入${SEATUNNEL_HOME}/lib/
目录中。例如:cp mysql-connector-java-xxx.jar $SEATNUNNEL_HOME/lib/

创建MySQL用户

您必须为Debezium MySQL连接器监视的所有数据库定义一个具有适当权限的MySQL用户。

  1. 创建MySQL用户:

mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';

  1. 为用户授予所需权限:

mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

  1. 完成用户的权限设置:

mysql> FLUSH PRIVILEGES;

启用MySQL binlog

为了实现MySQL的复制,必须启用二进制日志。二进制日志记录了用于复制工具传播更改的事务更新。

  1. 检查log-bin
    选项是否已经开启:

mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+----------------+
| Variable_name            | Value          |
+--------------------------+----------------+
| binlog_format            | ROW            |
| binlog_row_image         | FULL           |
| enforce_gtid_consistency | ON             |
| gtid_mode                | ON             |
| log_bin                  | ON             |
+--------------------------+----------------+
5 rows in set (0.00 sec)

  1. 如果与上述结果不一致,请使用以下属性配置您的MySQL服务器配置文件($MYSQL_HOME/mysql.cnf
    ),如下表所示:

# 启用二进制复制日志并设置前缀、过期时间和日志格式。
# 前缀是任意的,过期时间对于集成测试可以短一些,但在生产系统中会更长。
# 行级信息对于摄取工作是必需的。
# 服务器ID在生产系统上是必需的,但会有所不同。
server-id         = 223344
log_bin           = mysql-bin
expire_logs_days  = 10
binlog_format     = row
binlog_row_image  = FULL

# 启用gtid模式
gtid_mode = on
enforce_gtid_consistency = on

  1. 重启MySQL服务器

/etc/inint.d/mysqld restart

  1. 再次确认您的更改,通过再次检查binlog状态:

mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+----------------+
| Variable_name            | Value          |
+--------------------------+----------------+
| binlog_format            | ROW            |
| binlog_row_image         | FULL           |
| enforce_gtid_consistency | ON             |
| gtid_mode                | ON             |
| log_bin                  | ON             |
+--------------------------+----------------+
5 rows in set (0.00 sec)

注意

设置MySQL会话超时

当为大型数据库创建初始一致快照时,在读取表期间,已建立的连接可能会超时。您可以通过在MySQL配置文件中配置interactive_timeout
wait_timeout
来防止这种行为。

  • interactive_timeout
    :服务器等待交互连接活动关闭之前的秒数。有关更多详细信息,请参阅MySQL文档。
  • wait_timeout
    :服务器等待非交互连接活动关闭之前的秒数。有关更多详细信息,请参阅MySQL文档。

有关更多数据库设置,请参见Debezium MySQL连接器

数据类型映射

Mysql Data type SeaTunnel Data type
BIT(1)
TINYINT(1)
BOOLEAN
TINYINT TINYINT
TINYINT UNSIGNED
SMALLINT
SMALLINT
SMALLINT UNSIGNED
MEDIUMINT
MEDIUMINT UNSIGNED
INT
INTEGER
YEAR
INT
INT UNSIGNED
INTEGER UNSIGNED
BIGINT
BIGINT
BIGINT UNSIGNED DECIMAL(20,0)
DECIMAL(p, s)
DECIMAL(p, s) UNSIGNED
NUMERIC(p, s)
NUMERIC(p, s) UNSIGNED
DECIMAL(p,s)
FLOAT
FLOAT UNSIGNED
FLOAT
DOUBLE
DOUBLE UNSIGNED
REAL
REAL UNSIGNED
DOUBLE
CHAR
VARCHAR
TINYTEXT
MEDIUMTEXT
TEXT
LONGTEXT
ENUM
JSON
STRING
DATE DATE
TIME TIME
DATETIME
TIMESTAMP
TIMESTAMP
BINARY
VARBINAR
BIT(p)
TINYBLOB
MEDIUMBLOB
BLOB
LONGBLOB
BYTES

源选项

名称 类型 必需 默认值 描述
base-url String - JDBC连接的URL。参考示例:jdbc:mysql://localhost:3306:3306/test
username String - 连接到数据库服务器时使用的数据库名称。
password String - 连接到数据库服务器时使用的密码。
database-names List - 要监视的数据库的名称。
table-names List - 要监视的数据库的表名。表名需要包含数据库名称,例如:database_name.table_name
startup.mode Enum No INITIAL Optional startup mode for MySQL CDC consumer, valid enumerations are initial
, earliest
, latest
and specific
.
initial
: Synchronize historical data at startup, and then synchronize incremental data.
earliest
: Startup from the earliest offset possible.
latest
: Startup from the latest offset.
specific
: Startup from user-supplied specific offsets.
startup.specific-offset.file String No - Start from the specified binlog file name. Note, This option is required when the startup.mode
option used specific
.
startup.specific-offset.pos Long No - Start from the specified binlog file position. Note, This option is required when the startup.mode
option used specific
.
stop.mode Enum No NEVER Optional stop mode for MySQL CDC consumer, valid enumerations are never
, latest
or specific
.
never
: Real-time job don't stop the source.
latest
: Stop from the latest offset.
specific
: Stop from user-supplied specific offset.
stop.specific-offset.file String No - Stop from the specified binlog file name. Note, This option is required when the stop.mode
option used specific
.
stop.specific-offset.pos Long No - Stop from the specified binlog file position. Note, This option is required when the stop.mode
option used specific
.
snapshot.split.size Integer No 8096 The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshot of table.
snapshot.fetch.size Integer No 1024 The maximum fetch size for per poll when read table snapshot.
server-id String No - A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like 5400
, the numeric ID range syntax is like '5400-5408'.
Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the
MySQL cluster as another server (with this unique ID) so it can read the binlog.
By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value.
server-time-zone String No UTC The session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone.
connect.timeout.ms Duration No 30000 The maximum time that the connector should wait after trying to connect to the database server before timing out.
connect.max-retries Integer No 3 The max retry times that the connector should retry to build database server connection.
connection.pool.size Integer No 20 The jdbc connection pool size.
chunk-key.even-distribution.factor.upper-bound Double No 100 The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold
. The default value is 100.0.
chunk-key.even-distribution.factor.lower-bound Double No 0.05 The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold
. The default value is 0.05.
sample-sharding.threshold Integer No 1000 This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by chunk-key.even-distribution.factor.upper-bound
and chunk-key.even-distribution.factor.lower-bound
, and the estimated shard count (calculated as approximate row count chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards.
inverse-sampling.rate Integer No 1000 The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000.
exactly_once Boolean No true Enable exactly once semantic.
format Enum No DEFAULT Optional output format for MySQL CDC, valid enumerations are DEFAULT
COMPATIBLE_DEBEZIUM_JSON
.
debezium Config No - Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from MySQL server.
common-options no - Source plugin common parameters, please refer to Source Common Options for details

Task Example

Simple

Support multi-table reading

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 10000
}

source {
  MySQL-CDC {
    catalog = {
      factory = MySQL
    }
    base-url = "jdbc:mysql://localhost:3306/testdb"
    username = "root"
    password = "root@123"
    table-names = ["testdb.table1", "testdb.table2"]
    
    startup.mode = "initial"
  }
}

sink {
  Console {
  }
}

Support debezium-compatible format send to kafka

Must be used with kafka connector sink, see compatible debezium format for details

Changelog

  • Add MySQL CDC Source Connector

next version

通过对MySQL CDC源连接器的深入了解,我们不仅能够更好地掌握数据同步的核心机制,还能有效提升数据处理的效率和精度。

无论是在数据集成、实时分析还是其他复杂的数据处理场景中,MySQL CDC源连接器都将成为SeaTunnel用户强大的助手。随着数据技术的不断进步,期待看到更多创新和优化在未来版本中的实现,为开发者带来更多便利和可能。

新手入门

 SeaTunnel 让数据集成变得 So easy! /  3 分钟入门指南

从 0 到 1 快速入门 Apache SeaTunnel 

初探 Apache SeaTunnel / 深入理解 Apache SeaTunnel

 MySQL 同步到 Hive / 从MySQL同步到StarRocks

通过 SeaTunnel 将数据写入 OSS-HDFS 

MySQL 到 Elasticsearch 实时同步解决方案

启动 SeaTunnel / 3 分钟部署 SeaTunnel Zeta 

 部署 Apache SeaTunnel 分布式集群

Apache SeaTunnel Web部署指南

最佳实践

 OPPO 清风 天翼云 马蜂窝

孩子王 哔哩哔哩 唯品会

测试报告

 性能测试报告:SeaTunnel 批量同步数据比 GLUE 快 420%!

最新性能对比报告:SeaTunnel 是 Airbyte 30 倍!

比DataX快20%!SeaTunnel同步计算引擎性能测试全新发布

SeaTunnel 与 DataX 、Sqoop、Flume、Flink CDC 对比

Apache SeaTunnel

Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台

仓库地址: 

https://github.com/apache/seatunnel

网址:

https://seatunnel.apache.org/

Apache SeaTunnel 下载地址:

https://seatunnel.apache.org/download

 

衷心欢迎更多人加入!

我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:

https://github.com/apache/seatunnel/issues

贡献代码:

https://github.com/apache/seatunnel/pulls

订阅社区开发邮件列表 : 

dev-subscribe@seatunnel.apache.org

开发邮件列表:

dev@seatunnel.apache.org

加入 Slack:

https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ

关注 Twitter: 

https://twitter.com/ASFSeaTunnel

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论