实时计算Flink集成开源连接器TiDB CDC Connector案例实践

2024年 5月 7日 100.1k 0

TIDB部署(阿里云ECS)

1、系统配置

TIDB官方建议使用CentOS7.3及以上版本:

Linux 操作系统 版本
Red Hat Enterprise Linux 7.3 及以上
CentOS 7.3 及以上

本次实验我们选择CentOS 7.6 64位,考虑网络连通,需将TIDB ECS实例与Flink集群部署在相同VPC网络。

实时计算Flink集成开源连接器-TiDB CDC Connector案例实践-1

2、TIDB部署

a、下载并安装 TiUP

curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh

实时计算Flink集成开源连接器-TiDB CDC Connector案例实践-2

b. 安装 TiUP 的 cluster 组件

声明全局的环境变量,不然找不到tiup命令:

source .bash_profile

执行安装cluster命令:

tiup cluster

实时计算Flink集成开源连接器-TiDB CDC Connector案例实践-3

c. 调大 sshd 服务的连接数限制(可选)

#调大sshd服务的连接数限制
cat /etc/ssh/sshd_config | grep -n MaxSessions
vim +41 /etc/ssh/sshd_config

#改完后重启sshd
systemctl restart sshd.service

d. 创建并启动集群

按配置模板,编辑配置文件(当前目录vim编辑后保存即可),命名为 topo.yaml,其中:

  • host:部署TIDB的服务器i(ECS内网IP)
  • ssh_port默认是22
  • 官方文件的tikv_servers是3个节点,这里设置成了只有1个节点,原因是本地配置多个节点时只有1个节点能启动成功

模板如下:

global:
 user: "tidb"
 ssh_port: 22
 deploy_dir: "/tidb-deploy"
 data_dir: "/tidb-data"

monitored:
 node_exporter_port: 9100
 blackbox_exporter_port: 9115

server_configs:
 tidb:
   log.slow-threshold: 300
 tikv:
   readpool.storage.use-unified-pool: false
   readpool.coprocessor.use-unified-pool: true
 pd:
   replication.enable-placement-rules: true
   replication.location-labels: ["host"]
 tiflash:
   logger.level: "info"

pd_servers:
 - host: 192.168.0.1

tidb_servers:
 - host: 192.168.0.1

tikv_servers:
 - host: 192.168.0.1
   port: 20160
   status_port: 20180
   config:
     server.labels: { host: "laomo-talkservice-tidb-1" }

tiflash_servers:
 - host: 192.168.0.1

部署集群:

# cluster-name:集群名称
# tidb-version:TiDB版本号,可以通过tiup list tidb这个命令来查看
tiup cluster deploy   ./topo.yaml --user root -p 

# 这里使用 v6.0.0,集群名称叫mytidb-cluster来部署
tiup cluster deploy mytidb-cluster v6.0.0 ./topo.yaml --user root -p P...D

实时计算Flink集成开源连接器-TiDB CDC Connector案例实践-4

启动集群:

#启动刚才部署的集群,集群名称为mytidb-cluster
tiup cluster start mytidb-cluster

实时计算Flink集成开源连接器-TiDB CDC Connector案例实践-5

e. 开启CDC(重要:CDC不开启无法捕获数据变更。另外,本示例单节点,多节点集群需要配置节点访问免密

# 创建scale-out.yaml文件添加 TiCDC 节点信息
cdc_servers:
  - host: 192.168.0.1
    gc-ttl: 86400

# TiUP运行scale-out命令
tiup cluster scale-out mytidb-cluster scale-out.yaml

#TiUP 停止和启动 TiCDC
tiup cluster stop -R cdc
tiup cluster start -R cdc
tiup cluster restart -R cdc

f. 访问TiDB

在ECS上安装 MySQL 客户端(通过MySQL客户端访问TiDB):

yum -y install mysql

访问TiDB,密码默认为空

mysql -h 192.168.0.1 -P 4000 -u root

实时计算Flink集成开源连接器-TiDB CDC Connector案例实践-6

Flink TiDB CDC

TiDB CDC

TiDB CDC connector 是一个 Flink Source connector,它会先读取数据库快照,然后在发生故障时以exactly-once 处理继续读取更改事件,支持exactly-once语义

兼容版本

TiDB CDC Version

Flink Version

TiDB Version

2.2.*

1.13.*, 1.14.*

TiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0

2.3.*

1.13.*, 1.14.*, 1.15.*, 1.16.0

TiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0

参数配置

参数 说明 默认值 数据类型
connector 固定值:tidb-cdc none String
database-name TiDB 服务器的数据库名称 none String
table-name TiDB 服务器的表名称 none String
scan.startup.mode 消费者启动模式,直接读取其底层 TiKV 存储中的
全量数据和增量数据实现数据捕获,读取具体枚举如下:
initial:在第一次启动时,会先扫描历史全量数据,
其中全量部分是通过按 key 划分 range进行读取
latest-offset:不会扫描历史全量数据,
增量部分使用 TiDB 提供的 CDC Client 获取增量变更数据
initial String
pd-addresses TiKV 集群的 PD 地址 none String
tikv.grpc.timeout_in_ms 以毫秒为单位的 TiKV GRPC 超时 none Long
tikv.grpc.scan_timeout_in_ms 以毫秒为单位的 TiKV GRPC 扫描超时 none Long
tikv.batch_get_concurrency TiKV GRPC并发 20 Integer
tikv.* TiDB 客户端的属性 none String

集成TiDB连接器

实时计算Flink版是一套基于Apache Flink构建的⼀站式实时大数据分析平台,支持作业开发、数据调试、运行与监控、自动调优、智能诊断等全生命周期能力。本文案例基于阿里云实时计算Flink版进行演示。

1、版本准备

兼容版本可以参考Flink TiDB CDC小结介绍,本次实验选型如下:

❏ Flink全托管版本:vvr-6.0.4-flink-1.15

❏ TiDB版本:v6.0.0

❏ TiDB CDC Connector:flink-connector-tidb-cdc-2.3.0

2、集群创建

登录阿里云控制台,创建全托管VVP集群,注意集群部署网络环境与TiDB保持一致(同一VPC):

实时计算Flink集成开源连接器-TiDB CDC Connector案例实践-7

3、注册TiDB连接器

实时计算Flink集成开源连接器-TiDB CDC Connector案例实践-8

4创建Session集群

创建一个vvr-6.0.4-flink-1.15版本的Session集群,创建后并启动:

实时计算Flink集成开源连接器-TiDB CDC Connector案例实践-9

5、基于Flink SQL开发作业

注意:代码里没有配置scan.startup.mode参数,默认值initial:在第一次启动时,会先扫描历史全量数据(后续会演示相关效果)

CREATE TEMPORARY TABLE students_tidb_source (
  db_name STRING METADATA FROM 'database_name' VIRTUAL,
  table_name STRING METADATA  FROM 'table_name' VIRTUAL,
  operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
  id INT,
  `name` STRING,
  score INT,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'tidb-cdc',
  'tikv.grpc.timeout_in_ms' = '20000',
  'pd-addresses' = '192.168.0.1:2379',
  'database-name' = 'tidb_source',
  'table-name' = 'student'
);

CREATE TEMPORARY TABLE students_print_sink (
  `db_name` STRING,
  `table_name` STRING,
  operation_ts TIMESTAMP_LTZ(3),
  id INT,
  `name` STRING,
  score INT
) WITH (
  'connector' = 'print',
  'limit' = '100',
  'logger'='true'
);

insert into students_print_sink
select * from students_tidb_source;

6、TiDB数据准备

运行作业前,先添加一条数据:

CREATE DATABASE  tidb_source;
use  tidb_source;
--创建student表
CREATE TABLE student(
  id int NOT NULL ,
  name varchar(100) NULL ,
  score int NULL ,
  primary key(id)
);

--插入数据
insert into student values(1,"lincloud",100);
select * from student;

实时计算Flink集成开源连接器-TiDB CDC Connector案例实践-10

7、验证测试

启动作业,测试

实时计算Flink集成开源连接器-TiDB CDC Connector案例实践-11

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论