实战教程第四章4.10:如何使用 CANAL 将 MySQL 数据实时同步到 OceanBase

2024年 5月 7日 58.2k 0

Canal 是 Alibaba 开源的一个产品,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。开源项目地址:https://github.com/alibaba/canal 。

架构原理

canal 主要提供了 4 个组件:

  • canal deployercanal 的 server 端,进行 binlog 到 CanalEntry 的转换。
  • canal admincanal 的配置管理服务,提供 web 页面管理 canal 的 server 端服务。
  • canal adaptercanal 的客户端适配器,解析 CanalEntry 并将增量变动同步到目的端。
  • canal examplecanal 的 client 端示例,用户可以基于该部分代码实现自己的消费逻辑。

实战教程第四章4.10:如何使用 CANAL 将 MySQL 数据实时同步到 OceanBase-1

图中 canal-admin 是部署用的,可选的。canal-server 就是 canal deployer 软件,adapter 就是canal adapter 软件。源端数据库是 mariadb, 目标端是 oceanbase 的 mysql 租户。

Canal Deployer

Canal Deployer 的服务中有 Server 和 Instance 的概念,一个 server 代表一个 deployer 服务,一个 instance 代表一个实际的数据同步通路,在 Canal Server 中,一个 Server 可以有多个 Instance

Canal Instance 由 Spring 在运行时创建,其配置信息 canal deployer 的 conf/canal.properties 中指定。Canal 本身提供了几种可以直接使用的配置,存放在 conf/spring 目录下。

Canal Instance 在解析完日志信息后,得到的 CanalEntry 数据会放入内存等待消费。Canal 提供了两种消费方式供用户选择:

  • TCP 模式:直接使用客户端连接Canal消费数据。
  • MQ 模式:先将 Canal 内存中的数据写入 MQ ,用户可以使用客户端连接 MQ 进行数据消费。

Canal 中有两种位点信息,一个是解析位点,即日志转化 Entry 的过程记录的位点,由 LogPositionManager 管理,另一个是客户端消费的位点,由 MetaManager 管理。两者同样是在 instance 的 spring xml 文件进行配置。

Canal Adapter

Canal Adapter 用于消费 CanalEntry,并写入对应的目的容器。adapter 与 deployer 一样有 instance 的概念,实际运行时,adapter 本身由 adapter launcher 服务启动,并根据用户配置生成 adapter instance,由 instance 执行具体的 CanalEntry 读取和目的端写入的工作。

Canal Admin

Canal Admin 就是为了简化部署操作而引入的一个管理平台服务。Canal Deployer 和 Canal Instance 都分别支持单机部署和高可用集群部署两种模式,通过 Canal Admin,用户可以通过 web 页面来方便地管理 Canal Deployer 和 Canal Instance 的部署,同样也是支持单机部署和高可用集群化部署。

部署示例

MySQL 准备

这里使用的是 mariadb

yum install mariadb mariadb-server
  • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant 。
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

然后初始化了一个业务数据库(TPCH,后面介绍性能测试的时候再重点介绍)。

部署 Canal Admin

用 Canal Admin 部署 Canal Deployer 会方便一些,这个不是必须的。

本文就使用 Canal Admin

  • 下载 Canal Admin,访问地址:https://github.com/alibaba/canal/releases
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz
  • 解压缩到指定目录
mkdir ~/canal-admin && tar zxvf canal.admin-1.1.5.tar.gz -C ~/canal-admin/
  • 修改配置文件
cd ~/canal-admin && vim conf/application.yml

内容如下:

server:port: 8089
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8
spring.datasource:address: 127.0.0.1:3306database: canal_managerusername: canalpassword: canaldriver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=falsehikari:maximum-pool-size: 30minimum-idle: 1
canal:adminUser: adminadminPasswd: admin

初次测试时,不要修改上面密码 adminPasswd,以免后面密码修改不对导致连接不上。

  • 初始化元数据库
mysql -h127.1 -uroot -P3306 -p
source conf/canal_manager.sql

脚本会自动创建相关表,如下:

show tables;
MariaDB [tpch]> use canal_manager;
Database changed
MariaDB [canal_manager]> show tables;
+-------------------------+
| Tables_in_canal_manager |
+-------------------------+
| canal_adapter_config    |
| canal_cluster           |
| canal_config            |
| canal_instance_config   |
| canal_node_server       |
| canal_user              |
+-------------------------+
6 rows in set (0.00 sec)
  • 启动 web 服务
cd ~/canal-admin && bin/startup.sh

正常情况下,启动成功会监听 8089 端口。

[root@obce00 adapter]# netstat -ntlp |grep 15973
tcp        0      0 0.0.0.0:8089            0.0.0.0:*               LISTEN      15973/java

启动如果有问题,可以查看日志。

vim logs/admin.log +
  • 登录 web 界面

Canal Admin 的 web 访问地址:http://127.0.0.1:8089/

登录用户名:admin

登录密码:123456

部署 Canal Deployer

  • 下载 canal,访问地址:https://github.com/alibaba/canal/releases。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
  • 解压缩
mkdir ~/canal && tar zxvf canal.deployer-1.1.5.tar.gz  -C ~/canal
  • 修改配置

如果不使用 Canal Admin 部署,则使用默认的配置文件 conf/canal.properties 和 conf/example/instance.properties。这个是默认创建了一个 instance 叫 example。需要修改 example 的实例配置文件,修改数据库连接地址、用户名和密码。

vi conf/example/instance.properties

# mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*

canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8GBKISO-8859-1 。 如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false 。

如果使用 Canal Admin 部署 server 和 instance,则使用配置文件 conf/canal_local.properties 替换 conf/canal.properties。需要修改 conf/canal.properties 里的 manager 地址,其他参数值可以保持默认。

[root@obce00 canal]# cat conf/canal.properties
# register ip
canal.register.ip =
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name =

注意:passwd 后面的字符串是 admin 在 MySQL 里的密文。这个密码跟前面 Canal Admin 配置文件里的密码保持一致。如果前面密码改了,这里也要相应修改,密文的值可以通过 MySQL 的 password 方法获取。

MariaDB [canal_manager]> select password('admin');
+-------------------------------------------+
| password('admin')                         |
+-------------------------------------------+
| *4ACFE3202A5FF5CF467898FC58AAB1D615029441 |
+-------------------------------------------+
1 row in set (0.00 sec)
  • 启动 Cananl Server

不管是那种部署方法,配置文件修改好后,就可以启动服务。

sh bin/startup.sh
  • 图形化部署 Canal server 和 Canal instance

如果使用 Canal Admin 管理 Canal server,则登录 admin 的管理界面http://172.24.50.39:8089/#/canalServer/nodeServers,选择 Canal Server>Server 管理,单击 新建 Server

实战教程第四章4.10:如何使用 CANAL 将 MySQL 数据实时同步到 OceanBase-2

在弹出的 新建Server信息 界面单击 确定 按钮。

之后,选择 Canal Server>Instance 管理,单击 新建 Instance

实战教程第四章4.10:如何使用 CANAL 将 MySQL 数据实时同步到 OceanBase-3

单击 载入模板,显示一个配置文件,跟前面看到的类似。修改配置文件中的跟源端 canal 和数据库有关的信息。

实战教程第四章4.10:如何使用 CANAL 将 MySQL 数据实时同步到 OceanBase-4

单击 保存,命名为 mariadb

保存后的 instance 状态为 停止,单击 操作>启动 可启动 instance,启动后,instance 状态为 启动

实战教程第四章4.10:如何使用 CANAL 将 MySQL 数据实时同步到 OceanBase-5

  • 查看 server 日志

可以命令行下查看日志,或者在 Canal Admin 里查看 server 的日志。

vi logs/canal/canal.log
  • 查看 instance 的日志

可以命令行下查看日志,或者在 Canal Admin 里查看 instance 的日志。

tail -f logs/canal/canal.log
tail -f logs/example/example.log
tail -f logs/mariadb/mariadb.log
  • 停止服务
sh bin/stop.sh

部署 RDB 适配器

Canal Adapter 提供了对多种目标容器的支持,对于 OceanBase 社区版来说,主要使用它的 rdb 模块,目的端容器为 MySQL 或社区版 OceanBase 。

Adapter 的部署需要手动部署。

  • 修改启动器配置: application.yml,这里以 OceanBase 目标库为例。

首先指定 adapter 源端类型,通过 mode 指定。这里选择 tcp。后面就要指定 canal.tcp 相关属性,包括 canal server 的 IP 和 端口,数据库的连接用户和密码。

然后指定 adapter 目标端连接信息。instance 是源端实例名称,在 canal 部署的时候定义的。如果没有用 Canal Admin 部署,沿用的是 example 这个名称;如果用了 Canal Admin 部署 instance,前面命名的是 mariadb

key 是自定义,名字后面有用。jdbc 相关属性是目标端 OceanBase MySQL 的连接方式,可以使用 MySQL 自带的驱动。

mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000retries: 0timeout:
  accessKey:
  secretKey:
  consumerProperties:
   # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username: tpch
    canal.tcp.password: Dg0gIexJAV
  canalAdapters:
  - instance: mariadb # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
    - name: logger
      - name: rdb
        key: obmysql
        properties:
          jdbc.driverClassName: com.mysql.jdbc.Driver
          jdbc.url: jdbc:mysql://127.0.0.1:2883/tpch?useUnicode=true
          jdbc.username: tpch@obmysql#obdemo
          jdbc.password: Dg0gIexJAV   
  • RDB 映射文件

修改 conf/rdb/mytest_user.yml 文件。

映射有两种:一是按表映射;二是整库映射。下面示例是整库映射。

[root@obce00 adapter]# cat conf/rdb/mytest_user.yml
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
#dbMapping:
#  database: mytest
#  table: user
#  targetTable: mytest2.user
#  targetPk:
#    id: id
##  mapAll: true
#  targetColumns:
#    id:
#    name:
#    role_id:
#    c_time:
#    test1:
#  etlCondition: "where c_time>={}"
#  commitBatch: 3000 # 批量提交的大小
# Mirror schema synchronize config
dataSourceKey: defaultDS
destination: mariadb
groupId: g1
outerAdapterKey: obmysql
concurrent: true
dbMapping:
  mirrorDb: true
  database: tpch
  commitBatch: 1000

其中,destination 指定的是 canal instance 名称;outerAdapterKey 是前面定义的 keymirrorDb 指定数据库级别 DDL 和 DML 镜像同步。

导入的类型以目标表的元类型为准, 将自动进行类型转换。

  • 启动 RDB

如果使用了 OceanBase 的驱动,则将目标库 OceanBase 驱动包放入 lib文件夹。

启动 canal-adapter 启动器。

bin/startup.sh

验证修改 mysql mytest.user 表的数据, 将会自动同步到 MySQL 的 MYTEST.TB_USER 表下面, 并会打出 DML 的 log。

  • 停止 RDB
bin/stop.sh
  • 查看 RDB 日志
tail -f logs/adapter/adapter.log
2021-12-09 09:56:04.148 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"s_suppkey":99995,"s_name":null,"s_address":null,"s_nationkey":null,"s_phone":null,"s_acctbal":null,"s_comment":null},"database":"tpch","destination":"mariadb","old":null,"table":"supplier2","type":"INSERT"}
2021-12-09 09:56:04.149 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"s_suppkey":99998,"s_name":null,"s_address":null,"s_nationkey":null,"s_phone":null,"s_acctbal":null,"s_comment":null},"database":"tpch","destination":"mariadb","old":null,"table":"supplier2","type":"INSERT"}
2021-12-09 10:13:35.915 [Thread-3] INFO  c.a.o.canal.client.adapter.rdb.monitor.RdbConfigMonitor - Change a rdb mapping config: mytest_user.yml of canal adapter

同步测试

可以对源端 MySQL 数据库 tpch 做 DML 和 DDL 测试,都可以同步到目标端。这里就不详细展开,只介绍一些已知的功能限制。

  • 同步的表必须有主键。否则,源端删除无主键表的任意一笔记录,同步到目标端会导致整个表被删除。
  • DDL 支持新建表、新增列。但受 OceanBase MySQL 租户功能限制,不支持后期加主键、修改列的类型(指大类型变更,如数值、字符串、日期之间类型变化)。

附录:

  • 4.1 OceanBase 的 MySQL 兼容性简介
  • 4.2 如何使用 mysqldump 迁移 MySQL 表 OceanBase
  • 4.3 如何使用 dbcat 迁移 MySQL 表结构到 OceanBase
  • 4.4 如何把 MySQL 表数据导出到 CSV 文件
  • 4.5 如何使用 OceanBase 的 LOAD 命令加载 csv 数据文件 OceanBase
  • 4.6 如何使用 DataX 加载 CSV 数据文件到 OceanBase
  • 4.7 如何使用 DATAX 迁移 MySQL数据到 OceanBase
  • 4.8 如何使用 OBDUMPER / OBLOADER 工具导出/导入 OceanBase 数据
  • 4.9 如何使用 DATAX 迁移 OceanBase 数据到 MySQL/ORACLE
  • 4.10 如何使用 CANAL 将 MySQL 数据实时同步到 OceanBase

结束语

加入教程直播群方式一:钉钉群号3255 4020

加入教程直播群方式二:扫码下方二维码加入

实战教程第四章4.10:如何使用 CANAL 将 MySQL 数据实时同步到 OceanBase-6

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论