随着数据技术的快速发展,了解并掌握各种工具和技术变得尤为重要。为此,我们准备在Apache SeaTunnel社区发起如何使用连接器的Demo演示计划,邀请所有热爱数据同步技术的同学分享他们的知识和实操经验!
我们第二期主题是:如何使用MySQL CDC从MySQL同步到Doris,如果您对此计划感兴趣,也欢迎联系社区运营同学参与Demo录制!无论您是数据工程师、开发者还是技术爱好者,都欢迎您参与并展示您的技术才能。
敲重点~敲重点~如果你是用户,想看什么同步场景的Demo!请下滑到最底部留言,我们优先录制大家呼声最高的同步场景Demo!
Demo计划目标
我们的目标是创建一个共享和学习的平台,通过具体的Demo演示和对应的文档帮助社区成员更好地理解和应用各种数据连接器。这些Demo可以帮助新手快速学习,同时也为资深专家提供一个展示创新解决方案的舞台。
本教程详细介绍了 MySQL CDC(变更数据捕获)连接器的设置和使用方法,该连接器支持通过 SeaTunnel Zeta 和 Flink 从 MySQL 数据库中高效读取快照数据和增量数据
如何使用 MySQL CDC 连接器进行高效数据同步
探索如何使用 MySQL CDC 连接器从 MySQL 数据库读取快照和增量数据,以及如何通过 SeaTunnel Zeta 和 Flink 实现高效数据处理。
本视频教程中提及的代码
探索如何使用 MySQL CDC 连接器从 MySQL 数据库读取快照和增量数据,以及如何通过 SeaTunnel Zeta 和 Flink 实现高效数据处理。
env {
job.mode = "STREAMING"
parallelism = 1
}
source {
MySQL-CDC {
base-url = "jdbc:mysql://datasource01:3306/qa_source"
username = "root"
password = "root@123"
table-names = ["qa_source.batch_mysql_to_doris", "qa_source.batch_mysql_to_doris_offline_incremental_where"]
startup.mode = "latest"
}
}
sink {
Doris {
fenodes = "datasource01:8034"
query-port = 9034
username = root
password = "root@123"
schema_save_mode = "RECREATE_SCHEMA"
database = "e2e_sink"
table = "${table_name}_from_mysql"
sink.enable-2pc = "true"
sink.enable-delete = "true"
sink.label-prefix = "test_json"
doris.config = {
format="json"
read_json_by_line="true"
}
}
}
支持的处理引擎
- SeaTunnel Zeta
- Flink
主要特性
- [x] 流式处理
- [x] 精确一次语义
- [x] 支持自定义分片
- [x] 并行处理
- [ ] 批处理
- [ ] 列投影
支持的数据源版本
数据源 | 支持的版本 | 驱动 | 连接示例 | Maven 地址 |
---|---|---|---|---|
MySQL | MySQL 5.6, 5.7, 8.0.x | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | 下载 |
依赖安装
安装 JDBC 驱动
对于 Flink 引擎
请确保jdbc 驱动包已被放置在 ${SEATUNNEL_HOME}/plugins/
目录下。
对于 SeaTunnel Zeta 引擎
请确保jdbc 驱动包已被放置在 ${SEATUNNEL_HOME}/lib/
目录下。
创建 MySQL 用户
您需要定义一个具有适当权限的MySQL用户,以便Debezium MySQL连接器监控所有数据库。
- 创建 MySQL 用户:
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
- 授予用户所需权限:
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
- 最终确认用户权限:
mysql> FLUSH PRIVILEGES;
启用 MySQL Binlog
必须为 MySQL 复制启用二进制日志记录。二进制日志记录交易更新以便复制工具传播更改。
- 检查 log-bin 选项是否已启用:
mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+----------------+
| Variable_name | Value |
+--------------------------+----------------+
| binlog_format | ROW |
| binlog_row_image | FULL |
| enforce_gtid_consistency | ON |
| gtid_mode | ON |
| log_bin | ON |
+--------------------------+----------------+
5 rows in set (0.00 sec)
- 如果与上述结果不一致,请在您的MySQL服务器配置文件($MYSQL_HOME/mysql.cnf)中设置如下属性:
# Enable binary replication log and set the prefix, expiration, and log format.
# The prefix is arbitrary, expiration can be short for integration tests but would
# be longer on a production system. Row-level info is required for ingest to work.
# Server ID is required, but this will vary on production systems
server-id = 223344
log_bin = mysql-bin
expire_logs_days = 10
binlog_format = row
binlog_row_image = FULL
# enable gtid mode
gtid_mode = on
enforce_gtid_consistency = on
- 重启 MySQL 服务器
/etc/inint.d/mysqld restart
- 通过再次检查 Binlog 状态来确认您的更改
mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+----------------+
| Variable_name | Value |
+--------------------------+----------------+
| binlog_format | ROW |
| binlog_row_image | FULL |
| enforce_gtid_consistency | ON |
| gtid_mode | ON |
| log_bin | ON |
+--------------------------+----------------+
5 rows in set (0.00 sec)
注意事项
设置 MySQL 会话超时
对于大型数据库,当进行初始一致性快照时,已建立的连接可能会超时。您可以通过配置 interactive_timeout
和 wait_timeou
t 来防止此行为。
interactive_timeout:服务器在关闭交互连接前等待活动的秒数。wait_timeout:服务器在关闭非交互连接前等待活动的秒数
数据类型映射
MySQL 数据类型 | SeaTunnel 数据类型 |
---|---|
BIT(1)、TINYINT(1) | BOOLEAN |
TINYINT | TINYINT |
SMALLINT UNSIGNED、MEDIUMINT、MEDIUMINT UNSIGNED、INT、INTEGER、YEAR | INT |
BIGINT UNSIGNED | DECIMAL(20,0) |
DECIMAL(p, s)、NUMERIC(p, s) | DECIMAL(p,s) |
FLOAT、FLOAT UNSIGNED | FLOAT |
DOUBLE、DOUBLE UNSIGNED、REAL、REAL UNSIGNED | DOUBLE |
CHAR、VARCHAR、TINYTEXT、MEDIUMTEXT、TEXT、LONGTEXT、ENUM、JSON | STRING |
DATE | DATE |
TIME | TIME |
DATETIME、TIMESTAMP | TIMESTAMP |
BINARY、VARBINAR、BIT(p)、TINYBLOB、MEDIUMBLOB、BLOB、LONGBLOB | BYTES |
源选项
名称 | 类型 | 必填 | 默认值 | 描述 |
---|---|---|---|---|
base-url | String | 是 | - | JDBC 连接的 URL。例如:jdbc:mysql://localhost:3306:3306/test 。 |
username | String | 是 | - | 连接数据库服务器时使用的数据库用户名。 |
password | String | 是 | - | 连接数据库服务器时使用的密码。 |
database-names | List | 否 | - | 监控的数据库名称。 |
table-names | List | 是 | - | 监控的表名称,需要包含数据库名,例如:database_name.table_name 。 |
table-names-config | List | 否 | - | 表配置列表,例如:[{"table": "db1.schema1.table1","primaryKeys":["key1"]}] 。 |
startup.mode | Enum | 否 | INITIAL | MySQL CDC 消费者的可选启动模式,有效值为 initial 、 earliest 、 latest 和 specific 。 |
startup.specific-offset.file | String | 否 | - | 指定的 binlog 文件名。当 startup.mode 为 specific 时,此选项必需。 |
startup.specific-offset.pos | Long | 否 | - | 指定的 binlog 文件位置。当 startup.mode 为 specific 时,此选项必需。 |
... | ... | ... | ... | ... |
实用示例
简单示例:支持多表读取
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 10000
}
source {
MySQL-CDC {
base-url = "jdbc:mysql://localhost:3306/testdb"
username = "root"
password = "root@123"
table-names = ["testdb.table1", "testdb.table2"]
startup.mode = "initial"
}
}
sink {
Console {
}
}
随着技术的迭代,我们将继续优化文档和功能,敬请期待未来的更新。通过本文介绍的 MySQL CDC 连接器,开发者可以实现对 MySQL 数据库的实时变更数据捕获,从而在不中断数据库操作的情况下进行数据同步和分析。
这不仅提高了数据处理的效率,而且也为构建实时数据应用提供了强大的数据支持。无论您是数据工程师还是系统架构师,掌握如何配置和使用 MySQL CDC 连接器都将为您带来极大的便利和效益。