Flink CDC同步TiDB数据到OceanBase

2024年 5月 7日 84.9k 0

测试背景

应公司要求,需要将TiDB数据库替换成OceanBase数据库,替换过程需要业务影响比较小,这就要求TiDB的数据可以实时同步到OceanBase数据库。经过对各类数据同步工具的调研,目前感觉Flink CDC相对来说更容易实现我们的需求。因此,在这里先对Flink CDC做个简单测试,实现下TiDB到OceanBase的同步。

测试环境

TiDB环境

TiDB版本:v6.5.5

TiDB的部署是在一台单机上混部了TiDB Server、TiKV以及PD,TiCDC单节点部署在另外一台机器上

角色 机器 端口
TiDB Server 172.24.255.70 4000
TiKV 172.24.255.70 20160
TiKV 172.24.255.70 20161
TiKV 172.24.255.70 20162
PD 172.24.255.70 2379
TiCDC 172.24.255.55 8300

Flink CDC同步TiDB数据到OceanBase-1

OceanBase环境

OceanBase版本:V4.2.0_CE

角色 机器 端口
OBServer 172.24.255.56 2881
OBServer 172.24.255.57 2881
OBServer 172.24.255.58 2881
OBProxy 172.24.255.56 2883
OBProxy 172.24.255.57 2883

Flink CDC同步TiDB数据到OceanBase-2

Flink环境

这里简单演示下Flink的部署

安装 Flink

Flink 部署有集群模式和单节点模式。本次测试主要使用单节点部署,单节点部署比较简单,直接解压安装包就可以使用,不用进行其他的配置。启动成功后,访问 http://localhost:8081/#/overview,便可以对 Flink 任务进行监控管理。

1、下载 flink 并解压。

[admin@OCP ~]$ wget https://archive.apache.org/dist/flink/flink-1.16.1/flink-1.16.1-bin-scala_2.12.tgz
[admin@OCP ~]$ tar -zxvf flink-1.16.1-bin-scala_2.12.tgz

2、编flink-conf.yaml配置文件,并根据实际安装路径配置 java 环境变量。

[admin@OCP ~]$ cd flink-1.16.1/
[admin@OCP flink-1.16.1]$ vim conf/flink-conf.yaml
# 添加配置
env.java.home: /usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.382.b05-1.el7_9.x86_64
下载connector

tidb-connectorjdbc-connector组件。

# 下载 jar 包
[admin@OCP flink-1.16.1]$ cd lib/
[admin@OCP lib]$ wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-tidb-cdc/2.3.0/flink-sql-connector-tidb-cdc-2.3.0.jar
[admin@OCP lib]$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/1.16.1/flink-connector-jdbc-1.16.1.jar
[admin@OCP lib]$ ll
总用量 293316
-rw-r--r-- 1 tidb tidb    198857 1月  19 2023 flink-cep-1.16.1.jar
-rw-r--r-- 1 tidb tidb    516144 1月  19 2023 flink-connector-files-1.16.1.jar
-rw-r--r-- 1 root root    248892 1月  19 2023 flink-connector-jdbc-1.16.1.jar
-rw-r--r-- 1 tidb tidb    102470 1月  19 2023 flink-csv-1.16.1.jar
-rw-r--r-- 1 tidb tidb 117107159 1月  19 2023 flink-dist-1.16.1.jar
-rw-r--r-- 1 tidb tidb    180248 1月  19 2023 flink-json-1.16.1.jar
-rw-r--r-- 1 tidb tidb  21052640 1月  19 2023 flink-scala_2.12-1.16.1.jar
-rw-rw-r-- 1 tidb tidb  10737871 1月  13 2023 flink-shaded-zookeeper-3.5.9.jar
-rw-r--r-- 1 root root  74930222 11月  9 2022 flink-sql-connector-tidb-cdc-2.3.0.jar
-rw-r--r-- 1 tidb tidb  15367504 1月  19 2023 flink-table-api-java-uber-1.16.1.jar
-rw-r--r-- 1 tidb tidb  36249667 1月  19 2023 flink-table-planner-loader-1.16.1.jar
-rw-r--r-- 1 tidb tidb   3133690 1月  19 2023 flink-table-runtime-1.16.1.jar
-rw-rw-r-- 1 tidb tidb    208006 1月  13 2023 log4j-1.2-api-2.17.1.jar
-rw-rw-r-- 1 tidb tidb    301872 1月  13 2023 log4j-api-2.17.1.jar
-rw-rw-r-- 1 tidb tidb   1790452 1月  13 2023 log4j-core-2.17.1.jar
-rw-rw-r-- 1 tidb tidb     24279 1月  13 2023 log4j-slf4j-impl-2.17.1.jar

启动 Flink

如下所示表示启动成功。

[admin@OCP lib]$cd ../bin
[admin@OCP bin]$ ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host OCP.
Starting taskexecutor daemon on host OCP.

从启动结果可以看到主要启动了两个进程standalonesessiontaskexecutor,其中standalonesession进程对应的是 JobManager 类型,taskexecutor进程对应的是 TaskManager 类型。

http://localhost:8081/#/overview 中查看 Flink 的 Web 页面。

Flink CDC同步TiDB数据到OceanBase-3

同步测试

TiDB写入测试数据:

MySQL [test]> create database flinktest;
Query OK, 0 rows affected (0.09 sec)

MySQL [test]> USE flinktest;
Database changed
MySQL [flinktest]> CREATE TABLE products (
    -> id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
    -> name VARCHAR(255) NOT NULL,
    -> description VARCHAR(512)
    -> ) AUTO_INCREMENT = 101;
Query OK, 0 rows affected (0.10 sec)

MySQL [flinktest]> INSERT INTO products
    -> VALUES (default,"scooter","Small 2-wheel scooter"),
    ->     (default,"car battery","12V car battery"),
    ->     (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
    ->     (default,"hammer","12oz carpenter's hammer"),
    ->     (default,"hammer","14oz carpenter's hammer"),
    ->     (default,"hammer","16oz carpenter's hammer"),
    ->     (default,"rocks","box of assorted rocks"),
    ->     (default,"jacket","water resistent black wind breaker"),
    ->     (default,"spare tire","24 inch spare tire");
Query OK, 9 rows affected (0.01 sec)
Records: 9  Duplicates: 0  Warnings: 0

MySQL [flinktest]> CREATE TABLE orders (
    ->     order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
    ->     order_date DATETIME NOT NULL,
    ->     customer_name VARCHAR(255) NOT NULL,
    ->     price DECIMAL(10, 5) NOT NULL,
    ->     product_id INTEGER NOT NULL,
    ->     order_status BOOLEAN NOT NULL -- Whether order has been placed
    -> ) AUTO_INCREMENT = 10001;
Query OK, 0 rows affected (0.10 sec)

MySQL [flinktest]> INSERT INTO orders
    -> VALUES (default, '2023-07-30 10:08:22', 'Jark', 50.50, 102, false),
    ->       (default, '2023-07-30 10:11:09', 'Sally', 15.00, 105, false),
    ->       (default, '2023-07-30 12:00:30', 'Edward', 25.25, 106, false);
Query OK, 3 rows affected (0.00 sec)
Records: 3  Duplicates: 0  Warnings: 0

创建同步链路

使用flink sql同步数据,启动SQL client

[root@OMS flink-1.16.1]# ./bin/sql-client.sh

Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Session property has been set.

Flink SQL> CREATE TABLE products (
>     id INT,
>     name STRING,
>     description STRING,
>     PRIMARY KEY (id) NOT ENFORCED
>   ) WITH (
>     'connector' = 'tidb-cdc',
>     'tikv.grpc.timeout_in_ms' = '20000',
>     'pd-addresses' = '172.24.255.70:2379',
>     'database-name' = 'flinktest',
>     'table-name' = 'products'
>   );
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE orders (
>    order_id INT,
>    order_date TIMESTAMP(3),
>    customer_name STRING,
>    price DECIMAL(10, 5),
>    product_id INT,
>    order_status BOOLEAN,
>    PRIMARY KEY (order_id) NOT ENFORCED
>  ) WITH (
>     'connector' = 'tidb-cdc',
>     'tikv.grpc.timeout_in_ms' = '20000',
>     'pd-addresses' = '172.24.255.70:2379',
>     'database-name' = 'flinktest',
>     'table-name' = 'orders'
> );
[INFO] Execute statement succeed.

检查数据同步

Flink SQL> select * from products;

Flink CDC同步TiDB数据到OceanBase-4

Flink SQL> select * from orders;

Flink CDC同步TiDB数据到OceanBase-5

通过Flink SQL 写入OceanBase数据库

Flink SQL> CREATE TABLE enriched_orders (
  >    order_id INT,
  >    order_date DATE,
  >    customer_name STRING,
  >    order_status BOOLEAN,
  >    product_name STRING,
  >    product_description STRING,
  >    PRIMARY KEY (order_id) NOT ENFORCED
  >  ) WITH (
    >      'connector' ='jdbc',
    >      'url' = 'jdbc:mysql://172.24.255.56:2883/flinktest',
    >      'username' = 'root@obtest#myoceanbase',
    >      'password' = '11qq@@WW',
    >      'table-name' = 'enriched_orders'
    >  );
[INFO] Execute statement succeed.

Flink SQL>  INSERT INTO enriched_orders
>   SELECT o.order_id, o.order_date, o.customer_name, o.order_status, p.name, p.description
>   FROM orders AS o
>   LEFT JOIN products AS p ON o.product_id = p.id;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 24d82a075885fb0c9127c5abb4baa4e6

OceanBase端查看同步过来的数据

obclient [flinktest]> select * from enriched_orders;
+----------+---------------------+---------------+--------------+--------------+-------------------------+
| order_id | order_date          | customer_name | order_status | product_name | product_description     |
+----------+---------------------+---------------+--------------+--------------+-------------------------+
|    10001 | 2023-07-30 00:00:00 | Jark          |            0 | car battery  | 12V car battery         |
|    10002 | 2023-07-30 00:00:00 | Sally         |            0 | hammer       | 14oz carpenter's hammer |
|    10003 | 2023-07-30 00:00:00 | Edward        |            0 | hammer       | 16oz carpenter's hammer |
+----------+---------------------+---------------+--------------+--------------+-------------------------+
3 rows in set (0.000 sec)

Insert测试

源端写入

MySQL [flinktest]> INSERT INTO orders VALUES (default, '2023-10-20 10:08:22', 'Hongbo', 99.50, 101, false);
Query OK, 1 row affected (0.00 sec)

MySQL [flinktest]> select * from orders;
+----------+---------------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_name | price    | product_id | order_status |
+----------+---------------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-30 10:08:22 | Jark          | 50.50000 |        102 |            0 |
|    10002 | 2023-07-30 10:11:09 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-30 12:00:30 | Edward        | 25.25000 |        106 |            0 |
|    10004 | 2023-10-20 10:08:22 | Hongbo        | 99.50000 |        101 |            0 |
+----------+---------------------+---------------+----------+------------+--------------+
4 rows in set (0.00 sec)

目标端查询:

obclient [flinktest]> select * from enriched_orders;
+----------+---------------------+---------------+--------------+--------------+-------------------------+
| order_id | order_date          | customer_name | order_status | product_name | product_description     |
+----------+---------------------+---------------+--------------+--------------+-------------------------+
|    10001 | 2023-07-30 00:00:00 | Jark          |            0 | car battery  | 12V car battery         |
|    10002 | 2023-07-30 00:00:00 | Sally         |            0 | hammer       | 14oz carpenter's hammer |
|    10003 | 2023-07-30 00:00:00 | Edward        |            0 | hammer       | 16oz carpenter's hammer |
|    10004 | 2023-10-20 00:00:00 | Hongbo        |            0 | scooter      | Small 2-wheel scooter   |
+----------+---------------------+---------------+--------------+--------------+-------------------------+
4 rows in set (0.000 sec)

Update测试

源端修改

MySQL [flinktest]> update orders set product_id=103 where order_id=10004;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0

MySQL [flinktest]> select * from orders;
+----------+---------------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_name | price    | product_id | order_status |
+----------+---------------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-30 10:08:22 | Jark          | 50.50000 |        102 |            0 |
|    10002 | 2023-07-30 10:11:09 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-30 12:00:30 | Edward        | 25.25000 |        106 |            0 |
|    10004 | 2023-10-20 10:08:22 | Hongbo        | 99.50000 |        103 |            0 |
+----------+---------------------+---------------+----------+------------+--------------+
4 rows in set (0.00 sec)

目标端查询


obclient [flinktest]> select * from enriched_orders;
+----------+---------------------+---------------+--------------+--------------------+---------------------------------------------------------+
| order_id | order_date          | customer_name | order_status | product_name       | product_description                                     |
+----------+---------------------+---------------+--------------+--------------------+---------------------------------------------------------+
|    10001 | 2023-07-30 00:00:00 | Jark          |            0 | car battery        | 12V car battery                                         |
|    10002 | 2023-07-30 00:00:00 | Sally         |            0 | hammer             | 14oz carpenter's hammer                                 |
|    10003 | 2023-07-30 00:00:00 | Edward        |            0 | hammer             | 16oz carpenter's hammer                                 |
|    10004 | 2023-10-20 00:00:00 | Hongbo        |            0 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 |
+----------+---------------------+---------------+--------------+--------------------+---------------------------------------------------------+
4 rows in set (0.000 sec)

Delete测试

源端删除

MySQL [flinktest]> delete from orders where order_id=10004;
Query OK, 1 row affected (0.00 sec)

MySQL [flinktest]> select * from orders;
+----------+---------------------+---------------+----------+------------+--------------+
| order_id | order_date          | customer_name | price    | product_id | order_status |
+----------+---------------------+---------------+----------+------------+--------------+
|    10001 | 2023-07-30 10:08:22 | Jark          | 50.50000 |        102 |            0 |
|    10002 | 2023-07-30 10:11:09 | Sally         | 15.00000 |        105 |            0 |
|    10003 | 2023-07-30 12:00:30 | Edward        | 25.25000 |        106 |            0 |
+----------+---------------------+---------------+----------+------------+--------------+
3 rows in set (0.00 sec)

目标端查询

obclient [flinktest]> select * from enriched_orders;
+----------+---------------------+---------------+--------------+--------------+-------------------------+
| order_id | order_date          | customer_name | order_status | product_name | product_description     |
+----------+---------------------+---------------+--------------+--------------+-------------------------+
|    10001 | 2023-07-30 00:00:00 | Jark          |            0 | car battery  | 12V car battery         |
|    10002 | 2023-07-30 00:00:00 | Sally         |            0 | hammer       | 14oz carpenter's hammer |
|    10003 | 2023-07-30 00:00:00 | Edward        |            0 | hammer       | 16oz carpenter's hammer |
+----------+---------------------+---------------+--------------+--------------+-------------------------+
3 rows in set (0.000 sec)

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论