DataX使用
DataX社区版目前可以做MySQL -> OceanBase数据同步,但不包含表结构,所以迁移中表结构需要单独处理,安装使用上手不难:
- 下载软件 wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz,下载后直接解压即可,环境需要python、java
- 准备配置文件,可以将数据导出csv再导入OceanBase,或者直接通过DataX同步MySQL到OceanBase中,具体根据网络情况
- 插件分别用到:
- txtfilereader:本地文件读取
- txtfilewriter:写入本地文件
- mysqlreader:连接MySQL获取数据
- oceanbasev10writer:写入OceanBase
DataX就是通过上面这些插件去实现数据获取和写入需求。
迁移步骤
当前了解到DataX开源版应该还不支持增量数据的同步,在实际业务迁移中增量同步是不可少的,迁移通常需要以下几个步骤:
1、源端评估:收集源端数据库数据量、业务量、资源配置,确定目标端数据库架构及资源配置
2、迁移方案:通常为几个阶段:
- 迁移评估:评估数据库内表,对象,目标端是否兼容,部分需要修改
- 全量迁移:一般会统计收集源端数据库表大小,将大表和小表进组合创建多个通过进行迁移,是否可以临时调整一些一致性的参数,提高插入性能
- 全量数据校验
- 增量迁移:解析增量日志,这里关注是目标端应用速度能否赶上源端产生速度,决定了最后割接时间
- 增量数据校验
这里也需要多轮的演练,不断优化速度和数据准确性。
3、同步运行:有些客户不会直接切换到新数据库上,会双轨运行一段时间,将一小部分业务流量放到目标端
4、反向同步:这里还需要考虑反向同步,在出现问题时能切换到原库中。
OceanBase">DataX 同步MySQL -> OceanBase
这里主要是尝试下如何用DataX做同步,分为两种:
- MySQL导出CSV -> OceanBase
- MySQL -> OceanBase
OceanBase">MySQL导出CSV -> OceanBase
配置文件中主要配置数据库连接信息,表信息、使用到的插件。
下面配置文件就是将通过mysqlreader连接数据库,获取sbtest1表数据并通过txtfilewriter写入到本地/tmp/sbtest/目录下生成CSV文件:
- 导出MySQL数据到csv文件中
{
"job": {
"setting": {
"speed": {
"channel": 4
},
"errorLimit": {
"record": 0,
"percentage": 0.1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "letsg0",
"column": [
"*"
],
"connection": [
{
"table": [
"sbtest1"
],
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/sbtest?useUnicode=true&characterEncoding=utf8"]
}
]
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"path": "/tmp/sbtest/",
"fileName": "sbtest1",
"encoding": "UTF-8",
"writeMode": "truncate",
"dateFormat": "yyyy-MM-dd hh:mm:ss" ,
"nullFormat": "\\N" ,
"fileFormat": "csv" ,
"fieldDelimiter": ","
}
}
}
]
}
}
下面配置文件通过txtfilereader读取csv数据,通过oceanbasev10writer写入到OB中。
- 导入csv到OB中
{
"job": {
"setting": {
"speed": {
"channel": 4
},
"errorLimit": {
"record": 0,
"percentage": 0.1
}
},
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": ["/tmp/sbtest/"],
"fileName": "sbtest1",
"encoding": "UTF-8",
"column": ["*"],
"dateFormat": "yyyy-MM-dd hh:mm:ss" ,
"nullFormat": "\\N" ,
"fieldDelimiter": ","
}
},
"writer": {
"name": "oceanbasev10writer",
"parameter": {
"obWriteMode": "insert",
"column": [
"*"
],
"preSql": [
"truncate table sbtest1"
],
"connection": [
{
"jdbcUrl": "||_dsc_ob10_dsc_||ob_cluster:tenant_2||_dsc_ob10_dsc_||jdbc:oceanbase://xxxx:2883/sbtest?useLocalSessionState=true&allowBatch=true&allowMultiQueries=true&rewriteBatchedStatements=true",
"table": [
"sbtest1"
]
}
],
"username": "u_sysbench",
"password":"123456",
"writerThreadCount":10,
"batchSize": 1000,
"memstoreThreshold": "0.9"
}
}
}
]
}
}
"jdbcUrl": "||dsc_ob10_dsc||ob_cluster:tenant_2||dsc_ob10_dsc||jdbc:oceanbase://xxxx:2883/sbtest?useLocalSessionState=true&allowBatch=true&allowMultiQueries=true&rewriteBatchedStatements=true", 这里ob_cluster是集群名,tenant_2是租户名
preSql 是指定操作之前指定预处理SQL语句,这里设置执行前将表Truncate
貌似一个job只能针对一张表做导入,如果是多个表,需要写个脚本把表名作为参数传入,或者写多个job
导出导入
在导入前还需要先导入表结构,用mysqldump导出表结构,这里导出一张sbtest测试表:
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!50503 SET character_set_client = utf8mb4 */;
CREATE TABLE `sbtest1` (
`id` int NOT NULL AUTO_INCREMENT,
`k` int NOT NULL DEFAULT '0',
`c` char(120) NOT NULL DEFAULT '',
`pad` char(60) NOT NULL DEFAULT '',
PRIMARY KEY (`id`),
KEY `k_1` (`k`)
) ENGINE=InnoDB AUTO_INCREMENT=20000001 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
/*!40101 SET character_set_client = @saved_cs_client */;
- /**/注释可以直接删除掉
- 字符集排序规则需要注意,MySQL8.0中默认是utf8mb4_0900_ai_ci,算是utf8mb4_unicode_ci的一种,这两种在OB中都不识别,直接删除即可,用默认utf8mb4_general_ci替换
- MAX_ROWS OB不支持会报错,直接删除
- 当然在实际业务中,对于一些大表会做分区表处理,那就要做更多处理添加分区语法
obclient [sbtest]> CREATE TABLE `sbtest1` (
-> `id` int NOT NULL AUTO_INCREMENT,
-> `k` int NOT NULL DEFAULT '0',
-> `c` char(120) NOT NULL DEFAULT '',
-> `pad` char(60) NOT NULL DEFAULT '',
-> PRIMARY KEY (`id`),
-> KEY `k_1` (`k`)
-> ) ENGINE=InnoDB AUTO_INCREMENT=20000001 DEFAULT CHARSET=utf8mb4 ;
Query OK, 0 rows affected, 1 warning (0.232 sec)
obclient [sbtest]> show create table sbtest1\G
*************************** 1. row ***************************
Table: sbtest1
Create Table: CREATE TABLE `sbtest1` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`k` int(11) NOT NULL DEFAULT '0',
`c` char(120) NOT NULL DEFAULT '',
`pad` char(60) NOT NULL DEFAULT '',
PRIMARY KEY (`id`),
KEY `k_1` (`k`) BLOCK_SIZE 16384 LOCAL
) AUTO_INCREMENT = 20000001 AUTO_INCREMENT_MODE = 'ORDER' DEFAULT CHARSET = utf8mb4 ROW_FORMAT = DYNAMIC COMPRESSION = 'zstd_1.3.8' REPLICA_NUM = 3 BLOCK_SIZE = 16384 USE_BLOOM_FILTER = FALSE TABLET_SIZE = 134217728 PCTFREE = 0
1 row in set (0.023 sec)
导出和导入可通过以下两条命令:
python bin/datax.py job/sbtest_mysql2csv.json
python bin/datax.py job/sbtest_csv2ob.json
这里可能会遇到问题就是导入OB时连接报错,需要修改下租户下ob_tcp_invited_nodes报名单设置,默认是127.0.0.1,::1只允许本机访问,根据自身网络情况进行修改:
SET GLOBAL ob_tcp_invited_nodes='%';
导出导入后能看到,每秒钟条目数和速率:
2023-02-28 17:17:41.701 [job-0] INFO JobContainer - PerfTrace not enable!
2023-02-28 17:17:41.702 [job-0] INFO StandAloneJobContainerCommunicator - Total 5000000 records, 958888789 bytes | Speed 22.86MB/s, 125000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 3.649s | All Task WaitReaderTime 21.872s | Percentage 100.00%
2023-02-28 17:17:41.704 [job-0] INFO JobContainer -
任务启动时刻 : 2023-02-28 17:16:59
任务结束时刻 : 2023-02-28 17:17:41
任务总计耗时 : 42s
任务平均流量 : 22.86MB/s
记录写入速度 : 125000rec/s
读出记录总数 : 5000000
读写失败总数 : 0
2023-02-28 19:57:35.812 [job-0] INFO JobContainer - PerfTrace not enable!
2023-02-28 19:57:35.812 [job-0] INFO StandAloneJobContainerCommunicator - Total 5000000 records, 958888789 bytes | Speed 109.78KB/s, 586 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 8,462.246s | All Task WaitReaderTime 8.226s | Percentage 100.00%
2023-02-28 19:57:35.812 [job-0] INFO JobContainer -
任务启动时刻 : 2023-02-28 17:35:23
任务结束时刻 : 2023-02-28 19:57:35
任务总计耗时 : 8531s
任务平均流量 : 109.78KB/s
记录写入速度 : 586rec/s
读出记录总数 : 5000000
读写失败总数 : 0
我这里网络和服务器性能都比较差,所以速度很慢~~
OceanBase">直接同步MySQL -> OceanBase
配置文件
{
"job": {
"setting": {
"speed": {
"channel": 4
},
"errorLimit": {
"record": 0,
"percentage": 0.1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"*"
],
"connection": [
{
"table": [
"sbtest1"
],
"jdbcUrl": ["jdbc:mysql://127.0.0.1:5738/sbtest?useUnicode=true&characterEncoding=utf8&useSSL=false"]
}
]
}
},
"writer": {
"name": "oceanbasev10writer",
"parameter": {
"obWriteMode": "insert",
"column": [
"*"
],
"preSql": [
"truncate table sbtest1"
],
"connection": [
{
"jdbcUrl": "||_dsc_ob10_dsc_||ob_cluster:tenant_2||_dsc_ob10_dsc_||jdbc:oceanbase://xxx:2883/sbtest?useLocalSessionState=true&allowBatch=true&allowMultiQueries=true&rewriteBatchedStatements=true",
"table": [
"sbtest1"
]
}
],
"username": "u_sysbench",
"password": "123456",
"writerThreadCount":10,
"batchSize": 1000,
"memstoreThreshold": "0.9"
}
}
}
]
}
}
直接运行 python bin/datax.py job/sbtest_mysql2ob.json 即可不落地文件,同步数据到OceanBase中。