我们经常会遇到数据迁移的需求,目前市面上有很多同步工具,今天简单介绍下DataX的使用。DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能;
DataX架构
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
1. Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework;
2. Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端;
3. Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
关于DataX具体的介绍,可访问官网: https://github.com/alibaba/datax ,本次主要是实际运行一遍DataX的迁移流程。
DataX中默认不包含OceanBase的 Reader/Writer 插件,需要通过源码编译生成对应的插件。
源码安装步骤
下载DataX源码
git clone git@github.com:alibaba/DataX.git
或者进到仓库,直接下载
https://github.com/alibaba/DataX
通过maven编译打包
下载maven
wget https://dlcdn.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz --no-check-certificate
解压缩
[root@cdctest opt]# tar -zxvf apache-maven-3.8.8-bin.tar.gz
创建软连接
[root@cdctest opt]# cd apache-maven-3.8.8/bin/
[root@cdctest bin]# ll
总用量 32
-rw-r--r-- 1 root root 228 3月 8 21:58 m2.conf
-rwxr-xr-x 1 root root 5790 3月 8 21:58 mvn
-rw-r--r-- 1 root root 6226 3月 8 21:58 mvn.cmd
-rwxr-xr-x 1 root root 1601 3月 8 21:58 mvnDebug
-rw-r--r-- 1 root root 2082 3月 8 21:58 mvnDebug.cmd
-rwxr-xr-x 1 root root 1532 3月 8 21:58 mvnyjp
[root@cdctest bin]# ln -s /opt/apache-maven-3.8.8/bin/mvn /usr/bin/mvn
检查mvn
[root@cdctest bin]# mvn -v
Apache Maven 3.8.8 (4c87b05d9aedce574290d1acc98575ed5eb6cd39)
Maven home: /opt/apache-maven-3.8.8
Java version: 1.8.0_345, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.345.b01-1.el7_9.x86_64/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1127.19.1.el7.x86_64", arch: "amd64", family: "unix"
安装java及检查
下载jdk-8u291-linux-x64.tar.gz包
解压:
tar -zxvf jdk-8u291-linux-x64.tar.gz -C /usr/local/java/
修改环境变量
[root@cdctest bin]# vim /etc/profile
export JAVA_HOME=/usr/local/java/jdk1.8.0_291
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
[root@cdctest bin]# source /etc/profile
[root@cdctest bin]# java -version
openjdk version "1.8.0_345"
OpenJDK Runtime Environment (build 1.8.0_345-b01)
OpenJDK 64-Bit Server VM (build 25.345-b01, mixed mode)
修改配置文件
这里的配置文件修改,注意删除不必要的插件,否则编译结果会比较大
[root@OCP DataX]# pwd
/opt/DataX
[root@OCP DataX]# cat pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>1.3</version>
</dependency>
</dependencies>
<name>datax-all</name>
<packaging>pom</packaging>
<properties>
<jdk-version>1.8</jdk-version>
<datax-project-version>0.0.1-SNAPSHOT</datax-project-version>
<commons-lang3-version>3.3.2</commons-lang3-version>
<commons-configuration-version>1.10</commons-configuration-version>
<commons-cli-version>1.2</commons-cli-version>
<fastjson-version>2.0.23</fastjson-version>
<guava-version>16.0.1</guava-version>
<diamond.version>3.7.2.1-SNAPSHOT</diamond.version>
<!--slf4j 1.7.10 和 logback-classic 1.0.13 是好基友 -->
<slf4j-api-version>1.7.10</slf4j-api-version>
<logback-classic-version>1.0.13</logback-classic-version>
<commons-io-version>2.4</commons-io-version>
<junit-version>4.13.1</junit-version>
<tddl.version>5.1.22-1</tddl.version>
<swift-version>1.0.0</swift-version>
<project-sourceEncoding>UTF-8</project-sourceEncoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.encoding>UTF-8</maven.compiler.encoding>
<mysql.driver.version>5.1.47</mysql.driver.version>
</properties>
<modules>
<module>common</module>
<module>core</module>
<module>transformer</module>
<!-- reader -->
<module>mysqlreader</module>
<module>cassandrareader</module>
<module>oceanbasev10reader</module>
<!-- writer -->
<module>mysqlwriter</module>
<module>oceanbasev10writer</module>
<module>elasticsearchwriter</module>
<!-- common support module -->
<module>plugin-rdbms-util</module>
<module>plugin-unstructured-storage-util</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3-version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson-version}</version>
</dependency>
<!--<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava-version}</version>
</dependency>-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io-version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j-api-version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback-classic-version}</version>
</dependency>
<dependency>
<groupId>com.taobao.tddl</groupId>
<artifactId>tddl-client</artifactId>
<version>${tddl.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.taobao.diamond</groupId>
<artifactId>diamond-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.taobao.diamond</groupId>
<artifactId>diamond-client</artifactId>
<version>${diamond.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.search.swift</groupId>
<artifactId>swift_client</artifactId>
<version>${swift-version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit-version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
</dependency>
</dependencies>
</dependencyManagement>
<repositories>
<repository>
<id>central</id>
<name>Nexus aliyun</name>
<url>https://maven.aliyun.com/repository/central</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring</id>
<name>spring</name>
<url>https://maven.aliyun.com/repository/spring</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>central</id>
<name>Nexus aliyun</name>
<url>https://maven.aliyun.com/repository/central</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<finalName>datax</finalName>
<descriptors>
<descriptor>package.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
执行编译
[root@cdctest datax]# mvn -U clean package assembly:assembly -Dmaven.test.skip=true
出现如下结果,则表示编译成功
检查文件
编译成功之后,可以在如下目录下,看到对应的oceanbasev10writer jar包
[root@cdctest oceanbasev10writer]# pwd
/opt/datax/target/datax/datax/plugin/writer/oceanbasev10writer
[root@cdctest oceanbasev10writer]# ll
总用量 72
drwxr-xr-x 2 root root 4096 4月 18 19:35 libs
-rw-r--r-- 1 root root 62155 4月 18 19:35 oceanbasev10writer-0.0.1-SNAPSHOT.jar
-rw-r--r-- 1 root root 219 4月 18 19:35 plugin.json
同步数据
编辑模版
这里做一个MySQL 到 OceanBase 的数据迁移,迁移 t1表的所有字段,通过insert的模式,因为DataX不会自动去目标端创建表,因此需要手动在目标端将表结构创建好。
[root@cdctest bin]# cat mysql2ob.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "admin123",
"column": ["*"],
"splitPk": "",
"connection": [
{
"table": [
"t1"
],
"jdbcUrl": [
"jdbc:mysql://172.30.199.217:3306/test?useSSL=false"
]
}
]
}
},
"writer": {
"name": "oceanbasev10writer",
"parameter": {
"column": [
"*"
],
"connection": [
{
"jdbcUrl": "||_dsc_ob10_dsc_||myoceanbase:obtest||_dsc_ob10_dsc_||jdbc:oceanbase://172.30.199.203:2883/test?yearIsDateType=false&ZeroDateTimeBehavior=convertToNull&tinyIntlisBit=false&rewriteBatchedStatements=true",
"table": [
"t1"
]
}
],
"obWriteMode": "insert",
"password": "admin123",
"preSql": [
"truncate table t1;"
],
"username": "root"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
执行同步
[root@cdctest bin]# python datax.py mysql2ob.json
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
2023-04-18 19:57:02.395 [main] INFO MessageSource - JVM TimeZone: GMT+08:00, Locale: zh_CN
2023-04-18 19:57:02.396 [main] INFO MessageSource - use Locale: zh_CN timeZone: sun.util.calendar.ZoneInfo[id="GMT+08:00",offset=28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]
2023-04-18 19:57:02.411 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2023-04-18 19:57:02.415 [main] INFO Engine - the machine info =>
...............
...............
...............
2023-04-18 19:57:12.867 [job-0] INFO JobContainer - DataX Writer.Job [oceanbasev10writer] do post work.
2023-04-18 19:57:12.867 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do post work.
2023-04-18 19:57:12.867 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
2023-04-18 19:57:12.868 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/datax/target/datax/datax/hook
2023-04-18 19:57:12.869 [job-0] INFO JobContainer -
[total cpu info] =>
averageCpu | maxDeltaCpu | minDeltaCpu
-1.00% | -1.00% | -1.00%
[total gc info] =>
NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime
PS MarkSweep | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
PS Scavenge | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
2023-04-18 19:57:12.869 [job-0] INFO JobContainer - PerfTrace not enable!
2023-04-18 19:57:12.869 [job-0] INFO StandAloneJobContainerCommunicator - Total 2 records, 2 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2023-04-18 19:57:12.870 [job-0] INFO JobContainer -
任务启动时刻 : 2023-04-18 19:57:02
任务结束时刻 : 2023-04-18 19:57:12
任务总计耗时 : 10s
任务平均流量 : 0B/s
记录写入速度 : 0rec/s
读出记录总数 : 2
读写失败总数 : 0
最终结束会打印同步的结果。