测试背景
应公司要求,需要将TiDB数据库替换成OceanBase数据库,替换过程需要业务影响比较小,这就要求TiDB的数据可以实时同步到OceanBase数据库。经过对各类数据同步工具的调研,目前感觉Flink CDC相对来说更容易实现我们的需求。因此,在这里先对Flink CDC做个简单测试,实现下TiDB到OceanBase的同步。
测试环境
TiDB环境
TiDB版本:v6.5.5
TiDB的部署是在一台单机上混部了TiDB Server、TiKV以及PD,TiCDC单节点部署在另外一台机器上
角色 | 机器 | 端口 |
TiDB Server | 172.24.255.70 | 4000 |
TiKV | 172.24.255.70 | 20160 |
TiKV | 172.24.255.70 | 20161 |
TiKV | 172.24.255.70 | 20162 |
PD | 172.24.255.70 | 2379 |
TiCDC | 172.24.255.55 | 8300 |
OceanBase环境
OceanBase版本:V4.2.0_CE
角色 | 机器 | 端口 |
OBServer | 172.24.255.56 | 2881 |
OBServer | 172.24.255.57 | 2881 |
OBServer | 172.24.255.58 | 2881 |
OBProxy | 172.24.255.56 | 2883 |
OBProxy | 172.24.255.57 | 2883 |
Flink环境
这里简单演示下Flink的部署
安装 Flink
Flink 部署有集群模式和单节点模式。本次测试主要使用单节点部署,单节点部署比较简单,直接解压安装包就可以使用,不用进行其他的配置。启动成功后,访问 http://localhost:8081/#/overview
,便可以对 Flink 任务进行监控管理。
1、下载 flink 并解压。
[admin@OCP ~]$ wget https://archive.apache.org/dist/flink/flink-1.16.1/flink-1.16.1-bin-scala_2.12.tgz
[admin@OCP ~]$ tar -zxvf flink-1.16.1-bin-scala_2.12.tgz
2、编辑flink-conf.yaml
配置文件,并根据实际安装路径配置 java 环境变量。
[admin@OCP ~]$ cd flink-1.16.1/
[admin@OCP flink-1.16.1]$ vim conf/flink-conf.yaml
# 添加配置
env.java.home: /usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.382.b05-1.el7_9.x86_64
下载connector
下载tidb-connector
和jdbc-connector
组件。
# 下载 jar 包
[admin@OCP flink-1.16.1]$ cd lib/
[admin@OCP lib]$ wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-tidb-cdc/2.3.0/flink-sql-connector-tidb-cdc-2.3.0.jar
[admin@OCP lib]$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/1.16.1/flink-connector-jdbc-1.16.1.jar
[admin@OCP lib]$ ll
总用量 293316
-rw-r--r-- 1 tidb tidb 198857 1月 19 2023 flink-cep-1.16.1.jar
-rw-r--r-- 1 tidb tidb 516144 1月 19 2023 flink-connector-files-1.16.1.jar
-rw-r--r-- 1 root root 248892 1月 19 2023 flink-connector-jdbc-1.16.1.jar
-rw-r--r-- 1 tidb tidb 102470 1月 19 2023 flink-csv-1.16.1.jar
-rw-r--r-- 1 tidb tidb 117107159 1月 19 2023 flink-dist-1.16.1.jar
-rw-r--r-- 1 tidb tidb 180248 1月 19 2023 flink-json-1.16.1.jar
-rw-r--r-- 1 tidb tidb 21052640 1月 19 2023 flink-scala_2.12-1.16.1.jar
-rw-rw-r-- 1 tidb tidb 10737871 1月 13 2023 flink-shaded-zookeeper-3.5.9.jar
-rw-r--r-- 1 root root 74930222 11月 9 2022 flink-sql-connector-tidb-cdc-2.3.0.jar
-rw-r--r-- 1 tidb tidb 15367504 1月 19 2023 flink-table-api-java-uber-1.16.1.jar
-rw-r--r-- 1 tidb tidb 36249667 1月 19 2023 flink-table-planner-loader-1.16.1.jar
-rw-r--r-- 1 tidb tidb 3133690 1月 19 2023 flink-table-runtime-1.16.1.jar
-rw-rw-r-- 1 tidb tidb 208006 1月 13 2023 log4j-1.2-api-2.17.1.jar
-rw-rw-r-- 1 tidb tidb 301872 1月 13 2023 log4j-api-2.17.1.jar
-rw-rw-r-- 1 tidb tidb 1790452 1月 13 2023 log4j-core-2.17.1.jar
-rw-rw-r-- 1 tidb tidb 24279 1月 13 2023 log4j-slf4j-impl-2.17.1.jar
启动 Flink
如下所示表示启动成功。
[admin@OCP lib]$cd ../bin
[admin@OCP bin]$ ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host OCP.
Starting taskexecutor daemon on host OCP.
从启动结果可以看到主要启动了两个进程standalonesession
和taskexecutor
,其中standalonesession
进程对应的是 JobManager 类型,taskexecutor
进程对应的是 TaskManager 类型。
在 http://localhost:8081/#/overview
中查看 Flink 的 Web 页面。
同步测试
TiDB写入测试数据:
MySQL [test]> create database flinktest;
Query OK, 0 rows affected (0.09 sec)
MySQL [test]> USE flinktest;
Database changed
MySQL [flinktest]> CREATE TABLE products (
-> id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
-> name VARCHAR(255) NOT NULL,
-> description VARCHAR(512)
-> ) AUTO_INCREMENT = 101;
Query OK, 0 rows affected (0.10 sec)
MySQL [flinktest]> INSERT INTO products
-> VALUES (default,"scooter","Small 2-wheel scooter"),
-> (default,"car battery","12V car battery"),
-> (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
-> (default,"hammer","12oz carpenter's hammer"),
-> (default,"hammer","14oz carpenter's hammer"),
-> (default,"hammer","16oz carpenter's hammer"),
-> (default,"rocks","box of assorted rocks"),
-> (default,"jacket","water resistent black wind breaker"),
-> (default,"spare tire","24 inch spare tire");
Query OK, 9 rows affected (0.01 sec)
Records: 9 Duplicates: 0 Warnings: 0
MySQL [flinktest]> CREATE TABLE orders (
-> order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
-> order_date DATETIME NOT NULL,
-> customer_name VARCHAR(255) NOT NULL,
-> price DECIMAL(10, 5) NOT NULL,
-> product_id INTEGER NOT NULL,
-> order_status BOOLEAN NOT NULL -- Whether order has been placed
-> ) AUTO_INCREMENT = 10001;
Query OK, 0 rows affected (0.10 sec)
MySQL [flinktest]> INSERT INTO orders
-> VALUES (default, '2023-07-30 10:08:22', 'Jark', 50.50, 102, false),
-> (default, '2023-07-30 10:11:09', 'Sally', 15.00, 105, false),
-> (default, '2023-07-30 12:00:30', 'Edward', 25.25, 106, false);
Query OK, 3 rows affected (0.00 sec)
Records: 3 Duplicates: 0 Warnings: 0
创建同步链路
使用flink sql同步数据,启动SQL client
[root@OMS flink-1.16.1]# ./bin/sql-client.sh
Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Session property has been set.
Flink SQL> CREATE TABLE products (
> id INT,
> name STRING,
> description STRING,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'tidb-cdc',
> 'tikv.grpc.timeout_in_ms' = '20000',
> 'pd-addresses' = '172.24.255.70:2379',
> 'database-name' = 'flinktest',
> 'table-name' = 'products'
> );
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE orders (
> order_id INT,
> order_date TIMESTAMP(3),
> customer_name STRING,
> price DECIMAL(10, 5),
> product_id INT,
> order_status BOOLEAN,
> PRIMARY KEY (order_id) NOT ENFORCED
> ) WITH (
> 'connector' = 'tidb-cdc',
> 'tikv.grpc.timeout_in_ms' = '20000',
> 'pd-addresses' = '172.24.255.70:2379',
> 'database-name' = 'flinktest',
> 'table-name' = 'orders'
> );
[INFO] Execute statement succeed.
检查数据同步
Flink SQL> select * from products;
Flink SQL> select * from orders;
通过Flink SQL 写入OceanBase数据库
Flink SQL> CREATE TABLE enriched_orders (
> order_id INT,
> order_date DATE,
> customer_name STRING,
> order_status BOOLEAN,
> product_name STRING,
> product_description STRING,
> PRIMARY KEY (order_id) NOT ENFORCED
> ) WITH (
> 'connector' ='jdbc',
> 'url' = 'jdbc:mysql://172.24.255.56:2883/flinktest',
> 'username' = 'root@obtest#myoceanbase',
> 'password' = '11qq@@WW',
> 'table-name' = 'enriched_orders'
> );
[INFO] Execute statement succeed.
Flink SQL> INSERT INTO enriched_orders
> SELECT o.order_id, o.order_date, o.customer_name, o.order_status, p.name, p.description
> FROM orders AS o
> LEFT JOIN products AS p ON o.product_id = p.id;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 24d82a075885fb0c9127c5abb4baa4e6
OceanBase端查看同步过来的数据
obclient [flinktest]> select * from enriched_orders;
+----------+---------------------+---------------+--------------+--------------+-------------------------+
| order_id | order_date | customer_name | order_status | product_name | product_description |
+----------+---------------------+---------------+--------------+--------------+-------------------------+
| 10001 | 2023-07-30 00:00:00 | Jark | 0 | car battery | 12V car battery |
| 10002 | 2023-07-30 00:00:00 | Sally | 0 | hammer | 14oz carpenter's hammer |
| 10003 | 2023-07-30 00:00:00 | Edward | 0 | hammer | 16oz carpenter's hammer |
+----------+---------------------+---------------+--------------+--------------+-------------------------+
3 rows in set (0.000 sec)
Insert测试
源端写入
MySQL [flinktest]> INSERT INTO orders VALUES (default, '2023-10-20 10:08:22', 'Hongbo', 99.50, 101, false);
Query OK, 1 row affected (0.00 sec)
MySQL [flinktest]> select * from orders;
+----------+---------------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_name | price | product_id | order_status |
+----------+---------------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-30 10:08:22 | Jark | 50.50000 | 102 | 0 |
| 10002 | 2023-07-30 10:11:09 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-30 12:00:30 | Edward | 25.25000 | 106 | 0 |
| 10004 | 2023-10-20 10:08:22 | Hongbo | 99.50000 | 101 | 0 |
+----------+---------------------+---------------+----------+------------+--------------+
4 rows in set (0.00 sec)
目标端查询:
obclient [flinktest]> select * from enriched_orders;
+----------+---------------------+---------------+--------------+--------------+-------------------------+
| order_id | order_date | customer_name | order_status | product_name | product_description |
+----------+---------------------+---------------+--------------+--------------+-------------------------+
| 10001 | 2023-07-30 00:00:00 | Jark | 0 | car battery | 12V car battery |
| 10002 | 2023-07-30 00:00:00 | Sally | 0 | hammer | 14oz carpenter's hammer |
| 10003 | 2023-07-30 00:00:00 | Edward | 0 | hammer | 16oz carpenter's hammer |
| 10004 | 2023-10-20 00:00:00 | Hongbo | 0 | scooter | Small 2-wheel scooter |
+----------+---------------------+---------------+--------------+--------------+-------------------------+
4 rows in set (0.000 sec)
Update测试
源端修改
MySQL [flinktest]> update orders set product_id=103 where order_id=10004;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1 Changed: 1 Warnings: 0
MySQL [flinktest]> select * from orders;
+----------+---------------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_name | price | product_id | order_status |
+----------+---------------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-30 10:08:22 | Jark | 50.50000 | 102 | 0 |
| 10002 | 2023-07-30 10:11:09 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-30 12:00:30 | Edward | 25.25000 | 106 | 0 |
| 10004 | 2023-10-20 10:08:22 | Hongbo | 99.50000 | 103 | 0 |
+----------+---------------------+---------------+----------+------------+--------------+
4 rows in set (0.00 sec)
目标端查询
obclient [flinktest]> select * from enriched_orders;
+----------+---------------------+---------------+--------------+--------------------+---------------------------------------------------------+
| order_id | order_date | customer_name | order_status | product_name | product_description |
+----------+---------------------+---------------+--------------+--------------------+---------------------------------------------------------+
| 10001 | 2023-07-30 00:00:00 | Jark | 0 | car battery | 12V car battery |
| 10002 | 2023-07-30 00:00:00 | Sally | 0 | hammer | 14oz carpenter's hammer |
| 10003 | 2023-07-30 00:00:00 | Edward | 0 | hammer | 16oz carpenter's hammer |
| 10004 | 2023-10-20 00:00:00 | Hongbo | 0 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 |
+----------+---------------------+---------------+--------------+--------------------+---------------------------------------------------------+
4 rows in set (0.000 sec)
Delete测试
源端删除
MySQL [flinktest]> delete from orders where order_id=10004;
Query OK, 1 row affected (0.00 sec)
MySQL [flinktest]> select * from orders;
+----------+---------------------+---------------+----------+------------+--------------+
| order_id | order_date | customer_name | price | product_id | order_status |
+----------+---------------------+---------------+----------+------------+--------------+
| 10001 | 2023-07-30 10:08:22 | Jark | 50.50000 | 102 | 0 |
| 10002 | 2023-07-30 10:11:09 | Sally | 15.00000 | 105 | 0 |
| 10003 | 2023-07-30 12:00:30 | Edward | 25.25000 | 106 | 0 |
+----------+---------------------+---------------+----------+------------+--------------+
3 rows in set (0.00 sec)
目标端查询
obclient [flinktest]> select * from enriched_orders;
+----------+---------------------+---------------+--------------+--------------+-------------------------+
| order_id | order_date | customer_name | order_status | product_name | product_description |
+----------+---------------------+---------------+--------------+--------------+-------------------------+
| 10001 | 2023-07-30 00:00:00 | Jark | 0 | car battery | 12V car battery |
| 10002 | 2023-07-30 00:00:00 | Sally | 0 | hammer | 14oz carpenter's hammer |
| 10003 | 2023-07-30 00:00:00 | Edward | 0 | hammer | 16oz carpenter's hammer |
+----------+---------------------+---------------+--------------+--------------+-------------------------+
3 rows in set (0.000 sec)