初试 DataX 迁移 MySQL 到 OceanBase

2024年 5月 7日 76.6k 0

DataX使用

DataX社区版目前可以做MySQL -> OceanBase数据同步,但不包含表结构,所以迁移中表结构需要单独处理,安装使用上手不难:

  • 下载软件 wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz,下载后直接解压即可,环境需要python、java
  • 准备配置文件,可以将数据导出csv再导入OceanBase,或者直接通过DataX同步MySQL到OceanBase中,具体根据网络情况
  • 插件分别用到:
    • txtfilereader:本地文件读取
    • txtfilewriter:写入本地文件
    • mysqlreader:连接MySQL获取数据
    • oceanbasev10writer:写入OceanBase

DataX就是通过上面这些插件去实现数据获取和写入需求。

迁移步骤

当前了解到DataX开源版应该还不支持增量数据的同步,在实际业务迁移中增量同步是不可少的,迁移通常需要以下几个步骤:

1、源端评估:收集源端数据库数据量、业务量、资源配置,确定目标端数据库架构及资源配置

2、迁移方案:通常为几个阶段:

  • 迁移评估:评估数据库内表,对象,目标端是否兼容,部分需要修改
  • 全量迁移:一般会统计收集源端数据库表大小,将大表和小表进组合创建多个通过进行迁移,是否可以临时调整一些一致性的参数,提高插入性能
  • 全量数据校验
  • 增量迁移:解析增量日志,这里关注是目标端应用速度能否赶上源端产生速度,决定了最后割接时间
  • 增量数据校验

这里也需要多轮的演练,不断优化速度和数据准确性。

3、同步运行:有些客户不会直接切换到新数据库上,会双轨运行一段时间,将一小部分业务流量放到目标端

4、反向同步:这里还需要考虑反向同步,在出现问题时能切换到原库中。

OceanBase">DataX 同步MySQL -> OceanBase

这里主要是尝试下如何用DataX做同步,分为两种:

  • MySQL导出CSV -> OceanBase
  • MySQL -> OceanBase

OceanBase">MySQL导出CSV -> OceanBase

配置文件中主要配置数据库连接信息,表信息、使用到的插件。

下面配置文件就是将通过mysqlreader连接数据库,获取sbtest1表数据并通过txtfilewriter写入到本地/tmp/sbtest/目录下生成CSV文件:

  • 导出MySQL数据到csv文件中
{
    "job": {
        "setting": {
            "speed": {
                "channel": 4
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "letsg0",
                        "column": [
                            "*"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "sbtest1"
                                ],
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/sbtest?useUnicode=true&characterEncoding=utf8"]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "txtfilewriter",
                    "parameter": {
                        "path": "/tmp/sbtest/",
                        "fileName": "sbtest1",
                        "encoding": "UTF-8",
                        "writeMode": "truncate",
                        "dateFormat": "yyyy-MM-dd hh:mm:ss" ,
                        "nullFormat": "\\N" ,
                        "fileFormat": "csv" ,
                        "fieldDelimiter": ","
                    }
                }
            }
        ]
    }
}

下面配置文件通过txtfilereader读取csv数据,通过oceanbasev10writer写入到OB中。

  • 导入csv到OB中
{
    "job": {
        "setting": {
            "speed": {
                "channel": 4
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "txtfilereader",
                    "parameter": {
                        "path": ["/tmp/sbtest/"],
                        "fileName": "sbtest1",
                        "encoding": "UTF-8",
                        "column": ["*"],
                        "dateFormat": "yyyy-MM-dd hh:mm:ss" ,
                        "nullFormat": "\\N" ,
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "oceanbasev10writer",
                    "parameter": {
                        "obWriteMode": "insert",
                        "column": [
                            "*"
                        ],
                        "preSql": [
                            "truncate table sbtest1"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "||_dsc_ob10_dsc_||ob_cluster:tenant_2||_dsc_ob10_dsc_||jdbc:oceanbase://xxxx:2883/sbtest?useLocalSessionState=true&allowBatch=true&allowMultiQueries=true&rewriteBatchedStatements=true",
                                "table": [
                                    "sbtest1"
                                ]
                            }
                        ],
                        "username": "u_sysbench",
                        "password":"123456",
                        "writerThreadCount":10,
                        "batchSize": 1000,
                        "memstoreThreshold": "0.9"
                    }
                }
            }
        ]
    }
}

"jdbcUrl": "||dsc_ob10_dsc||ob_cluster:tenant_2||dsc_ob10_dsc||jdbc:oceanbase://xxxx:2883/sbtest?useLocalSessionState=true&allowBatch=true&allowMultiQueries=true&rewriteBatchedStatements=true", 这里ob_cluster是集群名,tenant_2是租户名

preSql 是指定操作之前指定预处理SQL语句,这里设置执行前将表Truncate

貌似一个job只能针对一张表做导入,如果是多个表,需要写个脚本把表名作为参数传入,或者写多个job

导出导入

在导入前还需要先导入表结构,用mysqldump导出表结构,这里导出一张sbtest测试表:

/*!40101 SET @saved_cs_client     = @@character_set_client */;
/*!50503 SET character_set_client = utf8mb4 */;
CREATE TABLE `sbtest1` (
  `id` int NOT NULL AUTO_INCREMENT,
  `k` int NOT NULL DEFAULT '0',
  `c` char(120) NOT NULL DEFAULT '',
  `pad` char(60) NOT NULL DEFAULT '',
  PRIMARY KEY (`id`),
  KEY `k_1` (`k`)
) ENGINE=InnoDB AUTO_INCREMENT=20000001 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
/*!40101 SET character_set_client = @saved_cs_client */;
  • /**/注释可以直接删除掉
  • 字符集排序规则需要注意,MySQL8.0中默认是utf8mb4_0900_ai_ci,算是utf8mb4_unicode_ci的一种,这两种在OB中都不识别,直接删除即可,用默认utf8mb4_general_ci替换
  • MAX_ROWS OB不支持会报错,直接删除
  • 当然在实际业务中,对于一些大表会做分区表处理,那就要做更多处理添加分区语法
obclient [sbtest]> CREATE TABLE `sbtest1` (
    ->   `id` int NOT NULL AUTO_INCREMENT,
    ->   `k` int NOT NULL DEFAULT '0',
    ->   `c` char(120) NOT NULL DEFAULT '',
    ->   `pad` char(60) NOT NULL DEFAULT '',
    ->   PRIMARY KEY (`id`),
    ->   KEY `k_1` (`k`)
    -> ) ENGINE=InnoDB AUTO_INCREMENT=20000001 DEFAULT CHARSET=utf8mb4 ;
Query OK, 0 rows affected, 1 warning (0.232 sec)

obclient [sbtest]> show create table sbtest1\G
*************************** 1. row ***************************
       Table: sbtest1
Create Table: CREATE TABLE `sbtest1` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `k` int(11) NOT NULL DEFAULT '0',
  `c` char(120) NOT NULL DEFAULT '',
  `pad` char(60) NOT NULL DEFAULT '',
  PRIMARY KEY (`id`),
  KEY `k_1` (`k`) BLOCK_SIZE 16384 LOCAL
) AUTO_INCREMENT = 20000001 AUTO_INCREMENT_MODE = 'ORDER' DEFAULT CHARSET = utf8mb4 ROW_FORMAT = DYNAMIC COMPRESSION = 'zstd_1.3.8' REPLICA_NUM = 3 BLOCK_SIZE = 16384 USE_BLOOM_FILTER = FALSE TABLET_SIZE = 134217728 PCTFREE = 0
1 row in set (0.023 sec)

导出和导入可通过以下两条命令:

python bin/datax.py job/sbtest_mysql2csv.json
python bin/datax.py  job/sbtest_csv2ob.json

这里可能会遇到问题就是导入OB时连接报错,需要修改下租户下ob_tcp_invited_nodes报名单设置,默认是127.0.0.1,::1只允许本机访问,根据自身网络情况进行修改:

SET GLOBAL ob_tcp_invited_nodes='%';

导出导入后能看到,每秒钟条目数和速率:

2023-02-28 17:17:41.701 [job-0] INFO  JobContainer - PerfTrace not enable!
2023-02-28 17:17:41.702 [job-0] INFO  StandAloneJobContainerCommunicator - Total 5000000 records, 958888789 bytes | Speed 22.86MB/s, 125000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 3.649s |  All Task WaitReaderTime 21.872s | Percentage 100.00%
2023-02-28 17:17:41.704 [job-0] INFO  JobContainer -
任务启动时刻                    : 2023-02-28 17:16:59
任务结束时刻                    : 2023-02-28 17:17:41
任务总计耗时                    :                 42s
任务平均流量                    :           22.86MB/s
记录写入速度                    :         125000rec/s
读出记录总数                    :             5000000
读写失败总数                    :                   0

2023-02-28 19:57:35.812 [job-0] INFO  JobContainer - PerfTrace not enable!
2023-02-28 19:57:35.812 [job-0] INFO  StandAloneJobContainerCommunicator - Total 5000000 records, 958888789 bytes | Speed 109.78KB/s, 586 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 8,462.246s |  All Task WaitReaderTime 8.226s | Percentage 100.00%
2023-02-28 19:57:35.812 [job-0] INFO  JobContainer -
任务启动时刻                    : 2023-02-28 17:35:23
任务结束时刻                    : 2023-02-28 19:57:35
任务总计耗时                    :               8531s
任务平均流量                    :          109.78KB/s
记录写入速度                    :            586rec/s
读出记录总数                    :             5000000
读写失败总数                    :                   0

我这里网络和服务器性能都比较差,所以速度很慢~~

OceanBase">直接同步MySQL -> OceanBase

配置文件

{
    "job": {
        "setting": {
            "speed": {
                "channel": 4
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "*"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "sbtest1"
                                ],
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:5738/sbtest?useUnicode=true&characterEncoding=utf8&useSSL=false"]
                            }
                        ]
                    }
                },

                "writer": {
                    "name": "oceanbasev10writer",
                    "parameter": {
                        "obWriteMode": "insert",
                        "column": [
                            "*"
                        ],
                        "preSql": [
                            "truncate table sbtest1"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "||_dsc_ob10_dsc_||ob_cluster:tenant_2||_dsc_ob10_dsc_||jdbc:oceanbase://xxx:2883/sbtest?useLocalSessionState=true&allowBatch=true&allowMultiQueries=true&rewriteBatchedStatements=true",
                                "table": [
                                    "sbtest1"
                                ]
                            }
                        ],
                        "username": "u_sysbench",
                        "password": "123456",
                        "writerThreadCount":10,
                        "batchSize": 1000,
                        "memstoreThreshold": "0.9"
                    }
                }
            }
        ]
    }
}

直接运行 python bin/datax.py job/sbtest_mysql2ob.json 即可不落地文件,同步数据到OceanBase中。

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论