实战教程第四章4.6:如何使用 DataX 加载 CSV 数据文件到 OceanBase

2024年 5月 7日 97.4k 0

DataX 简介

DataX 是阿里云 DataWorks数据集成的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 、OceanBase 等各种异构数据源之间高效的数据同步功能。

开源地址:https://github.com/alibaba/datax 。

OceanBase 企业版客户,可以跟 OceanBase 的技术人员索取 DataX 内部版本(RPM 包)。 OceanBase 社区版客户,可以在 DataX 开源网站上下载源码,自行编译。 编译的时候,注意在 pom.xml 中剔除掉不用的数据库插件。否则,编译出来的包非常大。

DATAX 配置文件

DataX 以任务的形式迁移数据,每个任务只处理一个表,每个任务有一个 json 格式的配置文件。配置文件里会包含 reader 和 writer 两节。具体的 reader 和 writer 都是 DataX 支持的数据库插件,可以随意搭配使用(就跟孩子搭积木一样)

最新版本的 DataX 还提供了一个 WEB 管理界面。

下面是配置文件示例。

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "sliceRecordCount": 10,
            "column": [
              {
                "type": "long",
                "value": "10"
              },
              {
                "type": "string",
                "value": "hello,你好,世界-DataX"
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 2
       }
    }
  }
}

将 json 配置文件放到 datax的目录的 job 下,或者自定义路径。执行方法:

$bin/datax.py job/stream2stream.json

输出信息:

<.....>

2021-08-26 11:06:09.217 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-08-26 11:06:09.218 [job-0] INFO  StandAloneJobContainerCommunicator - Total 20 records, 380 bytes | Speed 38B/s, 2 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2021-08-26 11:06:09.223 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-08-26 11:05:59
任务结束时刻                    : 2021-08-26 11:06:09
任务总计耗时                    :                 10s
任务平均流量                    :               38B/s
记录写入速度                    :              2rec/s
读出记录总数                    :                  20
读写失败总数                    :                   0

DataX 任务执行结束都会有个简单的任务报告,关注一下里面平均流量、写入速度和读写失败总数等。

DataX 的 job 的参数 settings 可以指定速度参数、错误记录容忍度等。

"setting": {"speed": {"channel": 10 
            },"errorLimit": {"record": 10,"percentage": 0.1}}

特别说明:

  • speed 还有个限速的设计(bytes),但是有 bug,大家就不要用了。 errorLimit 表示报错记录数的容忍度,超出这个限制后任务就中断退出。
  • channel 是并发数,理论上并发越大,迁移性能越好。实际也要考虑源端的读压力、网络传输性能以及目标端写入性能。

下面是常用数据源(mysqloracle 、csv 和 oceanbase )的读写插件。

txtfilereader 提供了读取本地文件系统数据存储的能力。在底层实现上,txtfilereader 获取本地文件数据,并转换为 DataX 传输协议传递给 Writer。

本地文件内容存放的是一张逻辑意义上的二维表,例如 CSV 格式的文本信息。

txtfilereader 有一些功能限制和参数,请首先阅读官方说明: https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md 。

下面是 txtfilereader 的 示例。

"reader": {
                    "name": "txtfilereader",
                    "parameter": {
                        "path": ["/tmp/tpcc/bmsql_oorder"],
                        "fileName": "bmsql_oorder",
                        "encoding": "UTF-8",
                        "column": ["*"],
                        "dateFormat": "yyyy-MM-dd hh:mm:ss" ,
                        "nullFormat": "\\N" ,
                        "fieldDelimiter": ","
                    }
                }

特别说明:

  • path 指定到路径即可,fileName 会是生成的文件前缀,完整的文件名很长,有随机字符串(避免重复)。文件的数量可能是根据记录数来自动分的。
  • column 可以指定为 "*" ,这样所有字段值都作为字符串了。这个虽然方便,但不能保证完全没有问题。目前测试常用数据类型是可以的。
  • nullFormat 指定空值的记录,默认是"null",这个读入oracle的时候会有问题。建议导出文件的时候指定为 "\N" 表示空值和。
  • fieldDelimiter 指定 csv 文件的列分隔符,这个跟导出的时候指定的列分隔符保持一致。通常导出的列内容如果含有列分隔符时,会用双引号进行包含(enclosed)。用逗号(,)也可以,只是太过常见,建议用生僻一点的单字符。如 | 或 ^ 等。

mysqlreader 插件说明:

MysqlReader 插件实现了从Mysql读取数据。在底层实现上,MysqlReader通过JDBC连接远程Mysql数据库,并执行相应的sql语句将数据从mysql库中SELECT出来。

不同于其他关系型数据库,MysqlReader 不支持 FetchSize.

实现原理方面,简而言之,MysqlReader通过JDBC连接器连接到远程的Mysql数据库,并根据用户配置的信息生成查询SELECT SQL语句,然后发送到远程Mysql数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。详细功能和参数说明请首先阅读官方说明:https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md 。

下面是 mysqlreader 的配置示例。

"reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "tpcc",
                        "password": "********",
                        "column": [
                            "*"
                        ],                        
                        "connection": [
                            {
                                "table": [
                                    "bmsql_oorder"
                                ],
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/tpccdb?useUnicode=true&characterEncoding=utf8"]
                            }
                        ]
                    }
                }

特别说明:

  • 如果表的主键是单列主键,比如说 id。那么可以在 parameter 下加一个配置: "splitPk": "db_id",
  •  。如果是加在最后,就去掉后面的逗号(,)。
  • column 指定读取的列。通常建议写具体的列,可以在列上用函数做逻辑转换。用 * 就是要时刻确保列跟目标端写入的列能对应上。

oceanbasev10writer 插件实现了写入数据到 OceanBase 主库的目的表的功能。在底层实现上, OceanbaseV10Writer 通过 java客户端(底层MySQL JDBC或oceanbase client) 连接obproxy 远程 OceanBase 数据库,并执行相应的 insert sql 语句将数据写入 OceanBase ,内部会分批次提交入库。

实现原理

Oceanbasev10Writer 通过 DataX 框架获取 Reader 生成的协议数据,生成 insert 语句。对于mysql 租户,在主键或唯一键冲突时,可以选择 replace 模式,更新表中的所有字段。对于oracle 租户,目前只有 insert 行为。 出于性能考虑,写入采用 batch 方式批量写,当行数累计到预定阈值时,才发起写入请求。

下面是 oceanbasev10writer 的配置示例。

"writer": {
                    "name": "oceanbasev10writer",
                    "parameter": {
                        "obWriteMode": "insert",
                        "column": [
                            "*"
                        ],
                        "preSql": [
                            "truncate table bmsql_oorder"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "||_dsc_ob10_dsc_||obdemo:oboracle||_dsc_ob10_dsc_||jdbc:oceanbase://127.0.0.1:2883/tpcc?useLocalSessionState=true&allowBatch=true&allowMultiQueries=true&rewriteBatchedStatements=true",
                                "table": [
                                    "bmsql_oorder"
                                ]
                            }
                        ],
                        "username": "tpcc",
                        "password":"********",
                        "batchSize": 512,
                        "writerThreadCount":10,
                        "memstoreThreshold": "0.9"
                    }
                }

参数说明

供参考,以后可能会变化,请关注DataX 开源地址说明 (https://github.com/alibaba/DataX/tree/master/oceanbasev10writer)。

  • jdbcUrl : 描述:目的数据库的连接信息,包含了ob的集群、租户、obproxy的地址和端口以及库名;使用域名可能会报错,建议使用 ip 。 必选:是 默认值:无
  • table: 描述:目的表的表名称。支持写入一个或者多个表。当配置为多张表时,必须确保所有表结构保持一致;表名中一般不含库名; 必选:是 默认值:无
  • column: 描述:目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。 column配置项必须指定,不能留空! 必选:是 默认值:否
  • preSql: 描述:写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,请使用 @table 表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。比如你的任务是要写入到目的端的100个同构分表(表名称为:datax_00,datax01, ... datax_98,datax_99),并且你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:"preSql":["delete from @table"],效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称.只支持delete语句 必选:否 默认值:无
  • batchSize: 描述:一次性批量提交的记录数大小,该值可以极大减少DataX与Oceanbase的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。 必选:否 默认值:1000
  • memstoreThreshold: 描述:OB租户的memstore使用率,当达到这个阀值的时候暂停导入,等释放内存后继续导入. 防止租户内存溢出。 必选:否 默认值:0.9
  • username: 描述:访问oceanbase1.0的用户名,注意,此处不配置ob的集群名和租户名。 必选:是 默认值:无
  • ** password** 描述:访问oceanbase1.0的密码 必选:是 默认值:无
  • writerThreadCount: 描述:每个通道(channel)中写入使用的线程数 必选:否 默认值:1

MySQL 数据导出到 CSV 文件

先将 MySQL 数据导出到 CSV 文件。

配置文件如下:

$cat job/bmsql_oorder_mysql2csv.json
{
    "job": {
        "setting": {
            "speed": {
                "channel": 4
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "tpcc",
                        "password": "********",
                        "column": [
                            "*"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "bmsql_oorder"
                                ],
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/tpccdb?useUnicode=true&characterEncoding=utf8"]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "txtfilewriter",
                    "parameter": {
                        "path": "/tmp/tpcc/bmsql_oorder",
                        "fileName": "bmsql_oorder",
                        "encoding": "UTF-8",
                        "writeMode": "truncate",
                        "dateFormat": "yyyy-MM-dd hh:mm:ss" ,
                        "nullFormat": "\\N" ,
                        "fileFormat": "csv" ,
                        "fieldDelimiter": ","
                    }
                }
            }
        ]
    }
}

CSV 文件导入到 OceanBase

将源端导出的 CSV 文件复制到目标端的 DATAX 服务器上,然后导入到目标端 OceanBase 中。

配置文件如下:

$cat job/bmsql_oorder_csv2ob.json
{
    "job": {
        "setting": {
            "speed": {
                "channel": 4
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "txtfilereader",
                    "parameter": {
                        "path": ["/tmp/tpcc/bmsql_oorder"],
                        "fileName": "bmsql_oorder",
                        "encoding": "UTF-8",
                        "column": ["*"],
                        "dateFormat": "yyyy-MM-dd hh:mm:ss" ,
                        "nullFormat": "\\N" ,
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "oceanbasev10writer",
                    "parameter": {
                        "obWriteMode": "insert",
                        "column": [
                            "*"
                        ],
                        "preSql": [
                            "truncate table bmsql_oorder"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "||_dsc_ob10_dsc_||obdemo:oboracle||_dsc_ob10_dsc_||jdbc:oceanbase://127.0.0.1:2883/tpcc?useLocalSessionState=true&allowBatch=true&allowMultiQueries=true&rewriteBatchedStatements=true",
                                "table": [
                                    "bmsql_oorder"
                                ]
                            }
                        ],
                        "username": "tpcc",
                        "password":"********",
                        "writerThreadCount":10,
                        "batchSize": 1000,
                        "memstoreThreshold": "0.9"
                    }
                }
            }
        ]
    }
}

附录:

  • 4.1 OceanBase 的 MySQL 兼容性简介
  • 4.2 如何使用 mysqldump 迁移 MySQL 表 OceanBase
  • 4.3 如何使用 dbcat 迁移 MySQL 表结构到 OceanBase
  • 4.4 如何把 MySQL 表数据导出到 CSV 文件
  • 4.5 如何使用 OceanBase 的 LOAD 命令加载 csv 数据文件 OceanBase
  • 4.6 如何使用 DataX 加载 CSV 数据文件到 OceanBase
  • 4.7 如何使用 DATAX 迁移 MySQL数据到 OceanBase
  • 4.8 如何使用 OBDUMPER / OBLOADER 工具导出/导入 OceanBase 数据
  • 4.9 如何使用 DATAX 迁移 OceanBase 数据到 MySQL/ORACLE
  • 4.10 如何使用 CANAL 将 MySQL 数据实时同步到 OceanBase

结束语

加入教程直播群方式一:钉钉群号3255 4020

加入教程直播群方式二:扫码下方二维码加入

实战教程第四章4.6:如何使用 DataX 加载 CSV 数据文件到 OceanBase-1

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论