如何使用DataX在OB和传统数据库之间同步数据

2024年 5月 7日 84.7k 0

作者:庆涛。 DBA,熟悉 Oracle / MySQL / SQLServer / OceanBase , 现主要从事 OceanBase 开源产品和解决方案推广工作。

个人公众号:OceanBase技术闲谈

DataX 简介以及下载编译

DataX 是阿里云 DataWorks 数据集成的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 、OceanBase 等各种异构数据源之间高效的数据同步功能。

开源地址:https://github.com/alibaba/datax

OB 企业版客户,可以跟 OB 的技术人员获取 DataX 内部版本(RPM包)。 OB 社区版客户,可以在 DataX 开源网站上下载源码,自行编译。 编译的时候,注意在 pom.xml 中剔除掉不用的数据库插件。否则,编译出来的包非常大。

pom.xml

Maven 配置文件:

<modules><module>common</module><module>core</module><module>transformer</module>

        <!-- reader --><module>mysqlreader</module><module>oraclereader</module><module>txtfilereader</module><module>streamreader</module><module>rdbmsreader</module><module>oceanbasev10reader</module>

        <!-- writer --><module>mysqlwriter</module><module>txtfilewriter</module><module>streamwriter</module><module>oraclewriter</module><module>rdbmswriter</module><module>oceanbasev10writer</module><!-- common support module --><module>plugin-rdbms-util</module><module>plugin-unstructured-storage-util</module><module>kuduwriter</module></modules>

编译方法还是很简单的。

mvn -U clean package assembly:assembly -Dmaven.test.skip=true

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] datax-all ......................................... SUCCESS [25.353s]
[INFO] datax-common ...................................... SUCCESS [3.219s]
[INFO] datax-transformer ................................. SUCCESS [1.655s]
[INFO] datax-core ........................................ SUCCESS [5.078s]
[INFO] plugin-rdbms-util ................................. SUCCESS [2.020s]
[INFO] mysqlreader ....................................... SUCCESS [1.316s]
[INFO] oraclereader ...................................... SUCCESS [1.250s]
[INFO] plugin-unstructured-storage-util .................. SUCCESS [1.744s]
[INFO] txtfilereader ..................................... SUCCESS [4.572s]
[INFO] streamreader ...................................... SUCCESS [1.244s]
[INFO] rdbmsreader ....................................... SUCCESS [1.367s]
[INFO] oceanbasev10reader ................................ SUCCESS [1.836s]
[INFO] mysqlwriter ....................................... SUCCESS [1.195s]
[INFO] txtfilewriter ..................................... SUCCESS [3.923s]
[INFO] streamwriter ...................................... SUCCESS [1.113s]
[INFO] oraclewriter ...................................... SUCCESS [1.170s]
[INFO] rdbmswriter ....................................... SUCCESS [1.534s]
[INFO] oceanbasev10writer ................................ SUCCESS [2.150s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1:02.428s
[INFO] Finished at: Wed Aug 25 11:25:29 CST 2021
[INFO] Final Memory: 144M/5065M
[INFO] ------------------------------------------------------------------------

编译好的目录,请把 target 目录下的 datax 复制到实际工作目录。

DataX 配置文件框架

DataX 迁移数据以任务的形式,每个任务只处理一个表,每个任务有一个 json 格式的配置文件。配置文件里会包含 readerwriter 两节。具体的 readerwriter 都是 DataX 支持的数据库插件,可以随意搭配使用(就跟孩子搭积木一样)

下面是配置文件示例。

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "sliceRecordCount": 10,
            "column": [
              {
                "type": "long",
                "value": "10"
              },
              {
                "type": "string",
                "value": "hello,你好,世界-DataX"
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 2
       }
    }
  }
}

json 配置文件放到 datax 的目录的 job 下,或者自定义路径。执行方法:

$bin/datax.py job/stream2stream.json

输出信息:

<.....>

2021-08-26 11:06:09.217 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-08-26 11:06:09.218 [job-0] INFO  StandAloneJobContainerCommunicator - Total 20 records, 380 bytes | Speed 38B/s, 2 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2021-08-26 11:06:09.223 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-08-26 11:05:59
任务结束时刻                    : 2021-08-26 11:06:09
任务总计耗时                    :                 10s
任务平均流量                    :               38B/s
记录写入速度                    :              2rec/s
读出记录总数                    :                  20
读写失败总数                    :                   0

DataX 任务执行结束都会有个简单的任务报告,关注一下里面 平均流量、写入速度和读写失败总数等。

DataX 的 job 的参数 settings 可以指定速度参数、错误记录容忍度等。

  "setting": {"speed": {"channel": 10 
            },"errorLimit": {"record": 10,"percentage": 0.1}}

特别说明:

1.speed 还有个限速的设计(bytes),但是有 bug,大家就不要用了。 errorLimit 表示报错记录数的容忍度,超出这个限制后任务就中断退出。

2.channel 是并发数,理论上并发越大,迁移性能越好。实际也要考虑源端的读压力、网络传输性能以及目标端写入性能。

下面是常用数据源(mysqloraclecsvoceanbase )的读写插件。

reader 插件说明

txtfilereader 插件说明

txtfilereader 提供了读取本地文件系统数据存储的能力。在底层实现上,txtfilereader 获取本地文件数据,并转换为 DataX 传输协议传递给 Writer。

本地文件内容存放的是一张逻辑意义上的二维表,例如CSV格式的文本信息。

txtfilereader 有一些功能限制和参数,请首先阅读官方说明: https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md

下面是 txtfilereader 的 示例。

  "reader": {
                    "name": "txtfilereader",
                    "parameter": {
                        "path": ["/tmp/tpcc/bmsql_oorder"],
                        "encoding": "UTF-8",
                        "column": ["*"],
                        "dateFormat": "yyyy-MM-dd hh:mm:ss" ,
                        "nullFormat": "\\N" ,
                        "fieldDelimiter": ","
                    }
                }

特别说明:

1.path 指定到路径即可,fileName 会是生成的文件前缀,完整的文件名很长,有随机字符串(避免重复)。文件的数量可能是根据记录数来自动分的。

2.column 可以指定为 "*" ,这样所有字段值都作为字符串了。这个虽然方便,但不能保证完全没有问题。目前测试常用数据类型是可以的。

3.nullFormat 指定空值的记录,默认是"null",这个读入 oracle 的时候会有问题。建议导出文件的时候指定为 "\N" 表示空值和。

4.fieldDelimiter 指定 csv 文件的列分隔符,这个跟导出的时候指定的列分隔符保持一致。通常导出的列内容如果含有列分隔符时,会用双引号进行包含(enclosed)。用逗号(,)也可以,只是太过常见,建议用生僻一点的单字符。如 |^ 等。

mysqlreader 插件说明

MysqlReader 插件实现了从 Mysql 读取数据。在底层实现上,MysqlReader 通过 JDBC 连接远程 Mysql 数据库,并执行相应的sql语句将数据从 mysql 库中 SELECT 出来。

不同于其他关系型数据库,MysqlReader 不支持 FetchSize。

实现原理方面,简而言之,MysqlReader 通过 JDBC 连接器连接到远程的 Mysql 数据库,并根据用户配置的信息生成查询 SELECT SQL 语句,然后发送到远程 Mysql 数据库,并将该 SQL 执行返回结果使用 DataX 自定义的数据类型拼装为抽象的数据集,并传递给下游 Writer 处理。详细功能和参数说明请首先阅读官方说明:https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

下面是 mysqlreader 的配置示例。

                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "tpcc",
                        "password": "********",
                        "column": [
                            "*"
                        ],                        
                        "connection": [
                            {
                                "table": [
                                    "bmsql_oorder"
                                ],
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/tpccdb?useUnicode=true&characterEncoding=utf8"]
                            }
                        ]
                    }
                }

特别说明:

1.如果表的主键是单列主键,比如说 id。那么可以在 parameter 下加一个配置: "splitPk": "db_id", 。如果是加在最后,就去掉后面的逗号(,)。

2.column 指定读取的列。通常建议写具体的列,可以在列上用函数做逻辑转换。用 * 就是要时刻确保列跟目标端写入的列能对应上。

oraclereader 插件说明

OracleReader 插件实现了从 Oracle 读取数据。在底层实现上,OracleReader 通过 JDBC 连接远程 Oracle 数据库,并执行相应的 sql 语句将数据从 Oracle 库中 SELECT 出来。

其原理也很简单,OracleReader 通过 JDBC 连接器连接到远程的 Oracle 数据库,并根据用户配置的信息生成查询 SELECT SQL 语句并发送到远程 Oracle 数据库,并将该 SQL 执行返回结果使用 DataX 自定义的数据类型拼装为抽象的数据集,并传递给下游 Writer 处理。详细功能和参数说明请首先阅读官方说明:https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md

下面是 oraclereader 的配置示例。

       "reader": {
                    "name": "oraclereader",
                    "parameter": {
                        "username": "tpcc",
                        "password": "********",
                        "column": [
                            "*"
                        ],                    
                        "connection": [
                            {
                                "table": [
                                    "bmsql_oorder"
                                ],
                                "jdbcUrl": [ "jdbc:oracle:thin:@172.17.0.5:1521:helowin"]
                            }
                        ]
                    }
                }

特别说明:

1.如果表的主键是单列主键,比如说 id。那么可以在 parameter 下加一个配置: "splitPk": "db_id", 。如果是加在最后,就去掉后面的逗号(,)。

2.column 指定读取的列。通常建议写具体的列,可以在列上用函数做逻辑转换。用 * 就是要时刻确保列跟目标端写入的列能对应上。

oceanbasev10reader 插件说明

OceanbaseV10Reader 插件实现了从 Oceanbase 读取数据。在底层实现上,该读取插件通过 java client(jdbc)连接远程 Oceanbase 1.0 数据库,并执行相应的 sql 语句将数据从库中 SELECT 出来。

实现原理

简而言之,Oceanbasev10Reader 通过java client 连接器连接到远程的 Oceanbase 数据库,并根据用户配置的信息生成查询 SELECT SQL 语句,然后发送到远程Oceanbase 数据库,并将该SQL执行返回结果使用 DataX 自定义的数据类型拼装为抽象的数据集,并传递给下游 Writer 处理。 对于用户配置 Table、Column、Where 的信息,OceanbaseV10Reader 将其拼接为 SQL 语句发送到 Oceanbase 数据库;对于用户配置 querySql 信息,OceanbaseReader 直接将其发送到、Oceanbase 数据库。

下面是 oceanbasev10reader 的配置示例。

reader": {
                    "name": "oceanbasev10reader",
                    "parameter": {
                        "where": "",
                        "readBatchSize": 100000,
                        "column": [
                            "*"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": ["||_dsc_ob10_dsc_||obdemo:oboracle||_dsc_ob10_dsc_||jdbc:oceanbase://127.0.0.1:2883/tpcc"],"table": ["bmsql_oorder"
                                ]
                            }
                        ],
                        "username": "tpcc",
                        "password":"********"
                    }
                }

参数说明

供参考,以后可能会变化,请关注 DataX 开源地址说明 (https://github.com/alibaba/DataX/tree/master/oceanbasev10reader)。

1.jdbcUrl: 描述:连接ob使用的jdbc url,格式固定,注意此处为数组,应用中括号([])括起来 必选:是 默认值:无

2.table: 描述:所选取的需要同步的表。使用 JSON 的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一 schema 结构,OceanbaseReader 不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。 必选:是 默认值:无

3.column: 描述:所配置的表中需要同步的列名集合,使用 JSON 的数组描述字段信息。 支持列裁剪,即列可以挑选部分列进行导出。 支持列换序,即列可以不按照表 schema 信息进行导出,同时支持通配符 *,在使用之前需仔细核对列信息。 必选:是 默认值:无

4.where: 描述:筛选条件,OceanbaseReader 根据指定的 column、table、where 条件拼接 SQL,并根据这个 SQL 进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将 where 条件指定为 gmt_create > $bizdate 。这里 gmt_create 不可以是索引字段,也不可以是联合索引的第一个字段。where 条件可以有效地进行业务增量同步。如果不填写 where 语句,包括不提供 where 的 key 或者 value,DataX 均视作同步全量数据。 必选:否 默认值:无

5.splitPk: 描述:OBReader 进行数据抽取时,如果指定 splitPk,表示用户希望使用 splitPk 代表的字段进行数据分片,DataX 因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。 推荐 splitPk 用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。 目前splitPk 仅支持 int 数据切分,不支持其他类型。如果用户指定其他非支持类型将报错。 splitPk 如果不填写,将视作用户不对单表进行切分,OBReader 使用单通道同步全量数据。 必选:否 默认值:空

6.querySql: 描述:在有些业务场景下,where 这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选 SQL。当用户配置了这一项之后,DataX 系统就会忽略 table,column 这些配置型,直接使用这个配置项的内容对数据进行筛选 当用户配置 querySql 时,OceanbaseReader 直接忽略 table、column、where 条件的配置,querySql 优先级大于 table、column、where 选项。 必选:否 默认值:无

7.timeout: 描述:sql 执行的超时时间 单位分钟。目前默认被设置为 48h,所以用户不用设置。 必选:否 默认值:5

8.username: 描述:访问 oceanbase1.0 的用户名 必选:是 默认值:无

9.** password** 描述:访问 oceanbase1.0 的密码 必选:是 默认值:无

10.readByPartition 描述:对分区表是否按照分区切分任务 必选:否 默认值:fasle

11.readBatchSize 描述:一次读取的行数,如果遇到内存不足的情况,可将该值调小 必选:否 默认值:100000

writer 插件说明

txtfilewriter 插件说明

TxtFileWriter 提供了向本地文件写入类 CSV 格式的一个或者多个表文件。TxtFileWriter 服务的用户主要在于 DataX 开发、测试同学。

写入本地文件内容存放的是一张逻辑意义上的二维表,例如 CSV 格式的文本信息。

详细功能和参数说明请首先阅读官方说明:https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md

下面是 txtfilewriter 的配置示例。

                "writer": {
                    "name": "txtfilewriter",
                    "parameter": {
                        "path": "/tmp/tpcc/bmsql_oorder",
                        "fileName": "bmsql_oorder",
                        "encoding": "UTF-8",
                        "writeMode": "truncate",
                        "dateFormat": "yyyy-MM-dd hh:mm:ss" ,
                        "nullFormat": "\\N" ,
                        "fileFormat": "csv" ,
                        "fieldDelimiter": ","
                    }
                }

特别说明:

1.fileFormat 指定为 csv

2.fieldDelimiter 指定字段分隔符,单字符,尽量使用内容里不会出现的分隔符。默认是逗号(,),推荐用 ^| 等。当内容中出现分隔符的时候,该字段会使用双引号包含起来。 如下面就是字段值包含分隔符,的记录:

32,300000,.1111,LCJgPD6U,"pIScfm2oihf4KrC,1U",kwmFinypFhN,ZJdxWAwUHhx,XI,754811111

3.nullFormat 指定空值的表现形式,默认是 "null" ,建议改为 "\N" 。 如下面就是空值记录:

3,1,2101,1015,\N,13,1,2021-08-25 10:50:56

mysqlwriter 插件说明

MysqlWriter 插件实现了写入数据到 Mysql 主库的目的表的功能。在底层实现上, MysqlWriter 通过 JDBC 连接远程 Mysql 数据库,并执行相应的 insert into ... 或者 ( replace into ...) 的 sql 语句将数据写入 Mysql,内部会分批次提交入库,需要数据库本身采用 innodb 引擎。

实现原理,MysqlWriter 通过 DataX 框架获取 Reader 生成的协议数据,根据你配置的 writeMode 生成。

1.insert into...(当主键/唯一性索引冲突时会写不进去冲突的行) 或者

2.replace into...(没有遇到主键/唯一性索引冲突时,与 insert into 行为一致,冲突时会用新行替换原有行所有字段) 的语句写入数据到 Mysql。出于性能考虑,采用了 PreparedStatement + Batch,并且设置了:rewriteBatchedStatements=true,将数据缓冲到线程上下文 Buffer 中,当 Buffer 累计到预定阈值时,才发起写入请求。

详细功能和参数说明请首先阅读官方说明: https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md

下面是 mysqlwriter 的配置示例。

               "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "tpcc",
                        "password": "123456",
                        "column": [
                            "*"
                        ],
                        "session": [
                        	"set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "truncate table bmsql_oorder"
                        ],
                        "batchSize": 512,                        
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/tpccdb?useUnicode=true&characterEncoding=utf8",
                                "table": [
                                    "bmsql_oorder"
                                ]
                            }
                        ]
                    }
                }

特别说明:

1.writeMode :有三种取值:insertreplaceupdate ,分别对应的 SQL 类型为 insert intoreplace intoon duplicate key update

2.preSQL: 是在任务开始的时候提前执行一次的 SQL。通常是清空表数据。大表不要使用 delete from @table ,那个很慢。

3.batchSize: 时候一次性批量提交的记录数大小,该值可以极大减少 DataX 与 Mysql 的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成 DataX 运行进程 OOM 情况。

oraclewriter 插件说明

OracleWriter 插件实现了写入数据到 Oracle 主库的目的表的功能。在底层实现上,OracleWriter 通过 JDBC 连接远程 Oracle 数据库,并执行相应的 insert into ... sql 语句将数据写入 Oracle,内部会分批次提交入库。

实现原理,OracleWriter 通过 DataX 框架获取 Reader 生成的协议数据,根据你配置生成相应的 SQL 语句。

insert into...(当主键/唯一性索引冲突时会写不进去冲突的行)

详细功能和参数说明请首先阅读官方说明:https://github.com/alibaba/DataX/blob/master/oraclewriter/doc/oraclewriter.md。

下面是 oraclewriter 的配置示例。

                "writer": {
                    "name": "oraclewriter",
                    "parameter": {
                        "username": "tpcc",
                        "password": "********",
                        "column": [
                            "*"
                        ],
                        "preSql": [
                            "truncate table bmsql_oorder"
                        ],
                        "batchSize": 512,
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:oracle:thin:@127.0.0.1:1521:helowin",
                                "table": [
                                    "bmsql_oorder"
                                ]
                            }
                        ]
                    }
                }

特别说明:

1.OracleWriter MysqlWriter 不同,不支持配置 writeMode 参数。

2.preSQL:是在任务开始的时候提前执行一次的 SQL。通常是清空表数据。大表不要使用 delete from @table ,那个很慢。

3.batchSize:时候一次性批量提交的记录数大小,该值可以极大减少 DataX 与 Mysql 的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成 DataX 运行进程 OOM 情况。

oceanbasev10writer 插件说明

oceanbasev10writer 插件实现了写入数据到 OceanBase 主库的目的表的功能。在底层实现上, OceanbaseV10Writer 通过 java 客户端(底层 MySQL JDBC 或oceanbase client) 连接 obproxy 远程 OceanBase 数据库,并执行相应的 insert sql 语句将数据写入 OceanBase ,内部会分批次提交入库。

实现原理

Oceanbasev10Writer 通过 DataX 框架获取 Reader 生成的协议数据,生成 insert 语句。对于 mysql 租户,在主键或唯一键冲突时,可以选择 replace 模式,更新表中的所有字段。对于 oracle 租户,目前只有 insert 行为。 出于性能考虑,写入采用 batch 方式批量写,当行数累计到预定阈值时,才发起写入请求。

下面是 oceanbasev10writer 的配置示例。

                "writer": {
                    "name": "oceanbasev10writer",
                    "parameter": {
                        "obWriteMode": "insert",
                        "column": [
                            "*"
                        ],
                        "preSql": [
                            "truncate table bmsql_oorder"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "||_dsc_ob10_dsc_||obdemo:oboracle||_dsc_ob10_dsc_||jdbc:oceanbase://127.0.0.1:2883/tpcc?useLocalSessionState=true&allowBatch=true&allowMultiQueries=true&rewriteBatchedStatements=true",
                                "table": [
                                    "bmsql_oorder"
                                ]
                            }
                        ],
                        "username": "tpcc",
                        "password":"********",
                        "batchSize": 512,
                        "writerThreadCount":10,
                        "memstoreThreshold": "0.9"
                    }
                }

参数说明

供参考,以后可能会变化,请关注 DataX 开源地址说明 (https://github.com/alibaba/DataX/tree/master/oceanbasev10writer)。

1.jdbcUrl: 描述:目的数据库的连接信息,包含了 ob 的集群、租户、obproxy 的地址和端口以及库名;使用域名可能会报错,建议使用 ip 。 必选:是 默认值:无

2.table: 描述:目的表的表名称。支持写入一个或者多个表。当配置为多张表时,必须确保所有表结构保持一致;表名中一般不含库名; 必选:是 默认值:无

3.column: 描述:目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。 column配置项必须指定,不能留空! 必选:是 默认值:否

4.preSql: 描述:写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,请使用 @table 表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。比如你的任务是要写入到目的端的100个同构分表(表名称为:datax_00,datax01, ... datax_98,datax_99),并且你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:"preSql":["delete from @table"],效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称.只支持 delete 语句 必选:否 默认值:无

5.batchSize: 描述:一次性批量提交的记录数大小,该值可以极大减少 DataX 与 Oceanbase 的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成 DataX 运行进程 OOM 情况。 必选:否 默认值:1000

6.memstoreThreshold: 描述:OB 租户的 memstore 使用率,当达到这个阀值的时候暂停导入,等释放内存后继续导入. 防止租户内存溢出。 必选:否 默认值:0.9

7.username: 描述:访问 oceanbase1.0 的用户名,注意,此处不配置 ob 的集群名和租户名。 必选:是 默认值:无

8.** password** 描述:访问 oceanbase1.0 的密码 必选:是 默认值:无

9.writerThreadCount: 描述:每个通道(channel)中写入使用的线程数 必选:否 默认值:1

DataX 的调优建议

DataX 本质上是个数据交换平台,将源端的数据读出,写入到目标端。其数据迁移性能取决于下面几个因素:

1.源端的读性能。可以加并发,制约条件就是对源库的影响、源库的性能瓶颈等。

2.DataX 自身的性能。DataX 是个 Java 程序,其能起的线程数也是有限,受限于所在主机的CPU和内存大小。

3.网络传输性能。并发高的时候,网络传输要留意吞吐量是否达到网卡瓶颈。现在万兆网卡的吞吐量 10000Mb,很难达到。不过占用网络带宽对其他业务可能也会有影响。

4.目标端的写入性能。也可以加并发,制约条件就是目标库写入性能瓶颈、对目标库的影响等。如果目标端是 OB,需要针对 OB 调优。

5.涉及到文件数据源的时候,关注文件所在磁盘 IO 性能。如 iops、吞吐量等。

所以 DataX 的调优就是调节 readerwriter 的各个并行参数,尽可能的把 源和目标端数据库资源能力都利用上,那么整体 DataX 的迁移效率会最好。

此外,如果主机内存够大的话, datax.py 能使用的 JVM 内存也可以调大。编辑脚本,调大 -Xms-Xmx 参数。

vim bin/datax.py

 30 DEFAULT_JVM = "-Xms16g -Xmx16g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)

主机资源监控 tsar

tsar 是阿里巴巴开源的主机性能监控工具,非常轻量和实用,阿里内部服务器标配。开源地址:https://github.com/alibaba/tsar/

常用用法:

$tsar --cpu --load --traffic --io -I sdb -l -i 3
Time              -----------------------cpu---------------------- ---------------------traffic-------------------- ------------------------------------------sdb-------------------------------------------  -------------------load-----------------
Time                user     sys    wait    hirq    sirq    util    bytin  bytout   pktin  pktout  pkterr  pktdrp    rrqms   wrqms      rs      ws   rsecs   wsecs  rqsize  qusize   await   svctm    util     load1   load5  load15    runq    plit
26/08/21-16:15:15  27.84    7.08    0.03    0.00    0.01   34.93   632.00    2.1K    9.00    7.00    0.00    0.00     0.00   33.33    2.00  100.00   40.00  561.33    5.90    0.00    0.06    0.04    0.40     15.69   14.30   14.05   27.00    3.9K
26/08/21-16:15:18  26.69    7.21    0.07    0.00    0.03   33.92     1.0K    6.3K   13.00   15.00    0.00    0.00     0.00   38.67    2.00  139.33   40.00   10.4K   75.70    0.00    1.42    0.05    0.73     15.69   14.30   14.05    6.00    3.9K
26/08/21-16:15:21  29.51    7.44    0.03    0.00    0.01   36.96     1.0K    5.7K   12.00   13.00    0.00    0.00     0.00   33.33    2.00  100.33   40.00  562.67    5.89    0.00    0.05    0.04    0.43     15.63   14.31   14.05   30.00    3.9K
26/08/21-16:15:24  26.33    7.34    0.03    0.00    0.01   33.68   720.00    1.4K    9.00    6.00    0.00    0.00     0.00   29.67    2.00   88.67   40.00  501.33    5.97    0.00    0.04    0.03    0.27     15.02   14.21   14.02   22.00    3.9K
^C

$tsar -l -i 3
Time              ---cpu-- ---mem-- ---tcp-- -----traffic---- --sda--- --sdb---  ---load-
Time                util     util   retran    bytin  bytout     util     util     load1
26/08/21-16:15:32  35.66    29.92     0.21     1.0K    6.5K     1.73     0.43     15.98
26/08/21-16:15:35  34.02    29.92     0.00     1.5K    5.3K     1.40     0.40     15.18
26/08/21-16:15:38  34.84    29.92     0.00     1.1K    5.8K     2.73     0.93     15.18
^C

读写并行度

DataX 的配置文件中首先有 speed 的设置,其中 channel 是总的并发数。 OB 的 oceanbasev10writer 还进一步支持 多线程,通过参数 "writerThreadCount":10 指定每个 channel 从源端读取的数据,再分几个并发线程写入。所以 OB 总的写入并发数是 channel * writerThreadCount

每个 writer 里还有个 batchsize,那个是一个事务的记录数数量。通常建议 200-10000 都可以。尽量不要超过 1万。事务太大了,不是很好。在 OB 里事务太大太长,可能会到达事务超时时间(默认120秒)。 这里有趣的是,OB 的 oceanbasev10writer 会把这个 batch insert 合并为 一个 insert 多个 values 子句。所以 batchSize 也不要太大;否则,insert sql 文本太长, 高并发时也可能会报错(内存不足方面的错误)。当列非常多的时候(比如说100 列) 或者 值的内容有大文本的时候,这个 batchSize 控制在 几百左右比较好。

源端数据库读优化

当源端是数据库的时候,如果表有单列主键,并且主键列类型的 数值型 (如 number、bigint、integer、decimal 等),可以在源端 reader 里增加配置 : "splitPk": "id" 。这个时候,DataX 能先对主键进行切片,然后多个 channel 同时并发分段去读取源数据。 如果没有这个设置,那源端只能单并发读取数据。

OB写入的内存调优

OB 的数据读写模型比较特殊,增量都在内存里。当 OB 机器已经是 SSD 盘的时候, IO 不大可能会首先成为 OB 的性能瓶颈,内存和 CPU 更有可能先是瓶颈。大量数据写入的时候,增量对 memtable 内存的消耗会很快。OB 设置不当的情况下,可能会出现内存耗尽,从而写入报错。其他业务写入也会跟着报错。

OB 的内存优化过程比较复杂。这里先给出一个初始的设置,能降低内存写入报错的概率。

alter system set merge_thread_count = 32;  --  增大合并的线程数。
alter system set minor_merge_concurrency = 16;  --  增大转储的线程数,期望提高转储的速度。
alter system set _mini_merge_concurrency = 8;  --  增大mini_merge的线程数,期望提高mini_merge的速度(默认值为3)。调大为8以后,发现会导致压测中CPU使用率有时飙升至90%,对性能有影响。
alter system set memory_limit_percentage = 90;    --  OB占系统总内存的比例,提高OB可用的内存量。
alter system set memstore_limit_percentage = 55;  --  memstore占租户的内存比,尽量增大memstore的空间(但是可能对读操作有负面影响)。
alter system set freeze_trigger_percentage = 40;  --  启动major/minor freeze的时机,让转储(minor freeze)尽早启动,memstore内存尽早释放。
alter system set minor_freeze_times = 100;        --  minor freeze的次数,尽量不在测试期间触发major freeze。
alter system set minor_warm_up_duration_time = 0;  --  加快minor freeze

OB 的 oceanbasev10writer 插件也提供参数 memstoreThreshold 监测增量内存的利用率,如果到达这个阈值,DataX 自动降速。

OB 的增量内存使用也可以监控,关键 SQL 是:

SELECT tenant_id, ip, round(active/1024/1024/1024) active_gb, round(total/1024/1024/1024) total_gb, round(freeze_trigger/1024/1024/1024) freeze_trg_gb, round(mem_limit/1024/1024/1024) mem_limit_gb
    , freeze_cnt , round((active/freeze_trigger),2) freeze_pct, round(total/mem_limit, 2) mem_usage
FROM `gv$memstore`
WHERE tenant_id =1001
ORDER BY tenant_id, ip;

OB 的监控产品里也能监控增量内存的变化。

附录

示例1:MySQL 数据导出到 CSV 文件

$cat job/bmsql_oorder_mysql2csv.json
{
    "job": {
        "setting": {
            "speed": {
                "channel": 4
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "tpcc",
                        "password": "********",
                        "column": [
                            "*"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "bmsql_oorder"
                                ],
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/tpccdb?useUnicode=true&characterEncoding=utf8"]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "txtfilewriter",
                    "parameter": {
                        "path": "/tmp/tpcc/bmsql_oorder",
                        "fileName": "bmsql_oorder",
                        "encoding": "UTF-8",
                        "writeMode": "truncate",
                        "dateFormat": "yyyy-MM-dd hh:mm:ss" ,
                        "nullFormat": "\\N" ,
                        "fileFormat": "csv" ,
                        "fieldDelimiter": ","
                    }
                }
            }
        ]
    }
}

示例2:CSV 文件导入到 OceanBase

$cat job/bmsql_oorder_csv2ob.json
{
    "job": {
        "setting": {
            "speed": {
                "channel": 4
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "txtfilereader",
                    "parameter": {
                        "path": ["/tmp/tpcc/bmsql_oorder"],
                        "encoding": "UTF-8",
                        "column": ["*"],
                        "dateFormat": "yyyy-MM-dd hh:mm:ss" ,
                        "nullFormat": "\\N" ,
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "oceanbasev10writer",
                    "parameter": {
                        "obWriteMode": "insert",
                        "column": [
                            "*"
                        ],
                        "preSql": [
                            "truncate table bmsql_oorder"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "||_dsc_ob10_dsc_||obdemo:oboracle||_dsc_ob10_dsc_||jdbc:oceanbase://127.0.0.1:2883/tpcc?useLocalSessionState=true&allowBatch=true&allowMultiQueries=true&rewriteBatchedStatements=true",
                                "table": [
                                    "bmsql_oorder"
                                ]
                            }
                        ],
                        "username": "tpcc",
                        "password":"********",
                        "writerThreadCount":10,
                        "batchSize": 1000,
                        "memstoreThreshold": "0.9"
                    }
                }
            }
        ]
    }
}

示例3:ORACLE 数据同步到 CSV 文件

{
    "job": {
        "setting": {
            "speed": {
                "channel": 4 
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "oraclereader",
                    "parameter": {
                        "username": "tpcc",
                        "password": "********",
                        "column": [
                            "*"
                        ],                    
                        "connection": [
                            {
                                "table": [
                                    "bmsql_oorder"
                                ],
                                "jdbcUrl": [ "jdbc:oracle:thin:@172.17.0.5:1521:helowin"]
                            }
                        ]                    
                    }
                },
                "writer": {
                    "name": "txtfilewriter",
                    "parameter": {
                        "path": "/tmp/tpcc/bmsql_oorder",
                        "fileName": "bmsql_oorder",
                        "encoding": "UTF-8",
                        "writeMode": "truncate",
                        "dateFormat": "yyyy-MM-dd hh:mm:ss" ,
                        "nullFormat": "\\N" ,
                        "fileFormat": "csv" ,
                        "fieldDelimiter": ","
                    }
                }
            }
        ]
    }
}

示例4:OB 数据同步到 MySQL

{
    "job": {
        "setting": {
            "speed": {
                "channel": 16 
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "oceanbasev10reader",
                    "parameter": {
                        "where": "",
                        "readBatchSize": 10000,
                        "column": [
                            "*"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": ["||_dsc_ob10_dsc_||obdemo:oboracle||_dsc_ob10_dsc_||jdbc:oceanbase://127.0.0.1:2883/tpcc"],
                                "table": [
                                    "bmsql_oorder"
                                ]
                            }
                        ],
                        "username": "tpcc",
                        "password":"********"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "replace",
                        "username": "tpcc",
                        "password": "123456",
                        "column": [
                            "*"
                        ],
                        "session": [
                        	"set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "truncate table bmsql_oorder"
                        ],
                        "batchSize": 512,
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/tpccdb?useUnicode=true&characterEncoding=utf8",
                                "table": [
                                    "bmsql_oorder"
                                ]
                            }
                        ]
                    }
                }
            }    
        ]
    }
}

示例5:OB 数据同步到 ORACLE

{
    "job": {
        "setting": {
            "speed": {
                "channel": 16 
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "oceanbasev10reader",
                    "parameter": {
                        "where": "",
                        "readBatchSize": 10000,
                        "column": [
                            "*"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": ["||_dsc_ob10_dsc_||obdemo:oboracle||_dsc_ob10_dsc_||jdbc:oceanbase://127.0.0.1:2883/tpcc"],
                                "table": [
                                    "bmsql_oorder"
                                ]
                            }
                        ],
                        "username": "tpcc",
                        "password":"********"
                    }
                },
                "writer": {
                    "name": "oraclewriter",
                    "parameter": {
                        "username": "tpcc",
                        "password": "********",
                        "column": [
                            "*"
                        ],
                        "preSql": [
                            "truncate table bmsql_oorder"
                        ],
                        "batchSize": 512,
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:oracle:thin:@127.0.0.1:1521:helowin",
                                "table": [
                                    "bmsql_oorder"
                                ]
                            }
                        ]
                    }
                }
            }    
        ]
    }
}

示例6:ORACLE 数据同步到 OB

{
    "job": {
        "setting": {
            "speed": {
                "channel": 4 
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "oraclereader",
                    "parameter": {
                        "username": "tpcc",
                        "password": "********",
                        "column": [
                            "*"
                        ],                    
                        "connection": [
                            {
                                "table": [
                                    "bmsql_oorder"
                                ],
                                "jdbcUrl": [ "jdbc:oracle:thin:@172.17.0.5:1521:helowin"]
                            }
                        ]                    
                    }
                },
                "writer": {
                    "name": "oceanbasev10writer",
                    "parameter": {
                        "obWriteMode": "insert",
                        "column": [
                            "*"
                        ],
                        "preSql": [
                            "truncate table bmsql_oorder"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "||_dsc_ob10_dsc_||obdemo:oboracle||_dsc_ob10_dsc_||jdbc:oceanbase://127.0.0.1:2883/tpcc?useLocalSessionState=true&allowBatch=true&allowMultiQueries=true&rewriteBatchedStatements=true",
                                "table": [
                                    "bmsql_oorder"
                                ]
                            }
                        ],
                        "username": "tpcc",
                        "password":"********",
                        "writerThreadCount":10,
                        "batchSize": 1000,
                        "memstoreThreshold": "0.9"
                    }
                }
            }
        ]
    }
}

示例7:DB2 数据同步到 OB

{
    "job": {
        "setting": {
            "speed": {
                "channel": 8,
            },
            "errorLimit": {
                "record": 0
            }
        },
        "content": [
            {
                "reader": {
                    "name": "db2reader",
                    "parameter": {
                        "username": "db2inst1",
                        "password": "db2inst1",
                        "column": [
                            "c1",
                            "c2"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:db2://127.0.0.1:50000/testdb:currentSchema=JXXD;"
                                ],
                                "table": [
                                    "blob_test"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "oceanbasev10writer",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "jxxd",
                        "password": "123456",
                        "column": [
                            "c1",
                            "c2"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "||_dsc_ob10_dsc_||obdemo:oboracle||_dsc_ob10_dsc_||jdbc:mysql://127.0.0.1:2883/jxxd",
                                "table": [
                                    "blob_test"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

如果您有任何疑问,可以通过以下方式与我们进行交流:

微信群:扫码添加小助手,将拉你进群哟~

如何使用DataX在OB和传统数据库之间同步数据-1

钉钉群:33254054

如何使用DataX在OB和传统数据库之间同步数据-2

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论