实战教程第四章4.10:如何使用 CANAL 将 MySQL 数据实时同步到 OceanBase
Canal 是 Alibaba 开源的一个产品,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。开源项目地址:https://github.com/alibaba/canal 。
架构原理
canal 主要提供了 4 个组件:
canal deployer:canal的 server 端,进行binlog到CanalEntry的转换。canal admin:canal的配置管理服务,提供 web 页面管理canal的server端服务。canal adapter:canal的客户端适配器,解析CanalEntry并将增量变动同步到目的端。canal example:canal的client端示例,用户可以基于该部分代码实现自己的消费逻辑。

图中 canal-admin 是部署用的,可选的。canal-server 就是 canal deployer 软件,adapter 就是canal adapter 软件。源端数据库是 mariadb, 目标端是 oceanbase 的 mysql 租户。
Canal Deployer
Canal Deployer 的服务中有 Server 和 Instance 的概念,一个 server 代表一个 deployer 服务,一个 instance 代表一个实际的数据同步通路,在 Canal Server 中,一个 Server 可以有多个 Instance。
Canal Instance 由 Spring 在运行时创建,其配置信息 canal deployer 的 conf/canal.properties 中指定。Canal 本身提供了几种可以直接使用的配置,存放在 conf/spring 目录下。
Canal Instance 在解析完日志信息后,得到的 CanalEntry 数据会放入内存等待消费。Canal 提供了两种消费方式供用户选择:
- TCP 模式:直接使用客户端连接Canal消费数据。
- MQ 模式:先将
Canal内存中的数据写入 MQ ,用户可以使用客户端连接 MQ 进行数据消费。
Canal 中有两种位点信息,一个是解析位点,即日志转化 Entry 的过程记录的位点,由 LogPositionManager 管理,另一个是客户端消费的位点,由 MetaManager 管理。两者同样是在 instance 的 spring xml 文件进行配置。
Canal Adapter
Canal Adapter 用于消费 CanalEntry,并写入对应的目的容器。adapter 与 deployer 一样有 instance 的概念,实际运行时,adapter 本身由 adapter launcher 服务启动,并根据用户配置生成 adapter instance,由 instance 执行具体的 CanalEntry 读取和目的端写入的工作。
Canal Admin
Canal Admin 就是为了简化部署操作而引入的一个管理平台服务。Canal Deployer 和 Canal Instance 都分别支持单机部署和高可用集群部署两种模式,通过 Canal Admin,用户可以通过 web 页面来方便地管理 Canal Deployer 和 Canal Instance 的部署,同样也是支持单机部署和高可用集群化部署。
部署示例
MySQL 准备
这里使用的是 mariadb。
yum install mariadb mariadb-server
- 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置
binlog-format为 ROW 模式,my.cnf中配置如下:
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
- 授权 canal 链接 MySQL 账号具有作为
MySQL slave的权限, 如果已有账户可直接grant。
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
然后初始化了一个业务数据库(TPCH,后面介绍性能测试的时候再重点介绍)。
部署 Canal Admin
用 Canal Admin 部署 Canal Deployer 会方便一些,这个不是必须的。
本文就使用 Canal Admin。
- 下载
Canal Admin,访问地址:https://github.com/alibaba/canal/releases
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz
- 解压缩到指定目录
mkdir ~/canal-admin && tar zxvf canal.admin-1.1.5.tar.gz -C ~/canal-admin/
- 修改配置文件
cd ~/canal-admin && vim conf/application.yml
内容如下:
server:port: 8089
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8
spring.datasource:address: 127.0.0.1:3306database: canal_managerusername: canalpassword: canaldriver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=falsehikari:maximum-pool-size: 30minimum-idle: 1
canal:adminUser: adminadminPasswd: admin
初次测试时,不要修改上面密码 adminPasswd,以免后面密码修改不对导致连接不上。
- 初始化元数据库
mysql -h127.1 -uroot -P3306 -p source conf/canal_manager.sql
脚本会自动创建相关表,如下:
show tables; MariaDB [tpch]> use canal_manager; Database changed MariaDB [canal_manager]> show tables; +-------------------------+ | Tables_in_canal_manager | +-------------------------+ | canal_adapter_config | | canal_cluster | | canal_config | | canal_instance_config | | canal_node_server | | canal_user | +-------------------------+ 6 rows in set (0.00 sec)
- 启动 web 服务
cd ~/canal-admin && bin/startup.sh
正常情况下,启动成功会监听 8089 端口。
[root@obce00 adapter]# netstat -ntlp |grep 15973 tcp 0 0 0.0.0.0:8089 0.0.0.0:* LISTEN 15973/java
启动如果有问题,可以查看日志。
vim logs/admin.log +
- 登录 web 界面
Canal Admin 的 web 访问地址:http://127.0.0.1:8089/
登录用户名:admin
登录密码:123456
部署 Canal Deployer
- 下载
canal,访问地址:https://github.com/alibaba/canal/releases。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
- 解压缩
mkdir ~/canal && tar zxvf canal.deployer-1.1.5.tar.gz -C ~/canal
- 修改配置
如果不使用 Canal Admin 部署,则使用默认的配置文件 conf/canal.properties 和 conf/example/instance.properties。这个是默认创建了一个 instance 叫 example。需要修改 example 的实例配置文件,修改数据库连接地址、用户名和密码。
vi conf/example/instance.properties # mysql serverId canal.instance.mysql.slaveId = 1234 #position info,需要改成自己的数据库信息 canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #username/password,需要改成自己的数据库信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 #table regex canal.instance.filter.regex = .\*\\\\..\*
canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK,ISO-8859-1 。 如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false 。
如果使用 Canal Admin 部署 server 和 instance,则使用配置文件 conf/canal_local.properties 替换 conf/canal.properties。需要修改 conf/canal.properties 里的 manager 地址,其他参数值可以保持默认。
[root@obce00 canal]# cat conf/canal.properties # register ip canal.register.ip = # canal admin config canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register canal.admin.register.auto = true canal.admin.register.cluster = canal.admin.register.name =
注意:passwd 后面的字符串是 admin 在 MySQL 里的密文。这个密码跟前面 Canal Admin 配置文件里的密码保持一致。如果前面密码改了,这里也要相应修改,密文的值可以通过 MySQL 的 password 方法获取。
MariaDB [canal_manager]> select password('admin');
+-------------------------------------------+
| password('admin') |
+-------------------------------------------+
| *4ACFE3202A5FF5CF467898FC58AAB1D615029441 |
+-------------------------------------------+
1 row in set (0.00 sec)
- 启动 Cananl Server
不管是那种部署方法,配置文件修改好后,就可以启动服务。
sh bin/startup.sh
- 图形化部署 Canal server 和 Canal instance
如果使用 Canal Admin 管理 Canal server,则登录 admin 的管理界面http://172.24.50.39:8089/#/canalServer/nodeServers,选择 Canal Server>Server 管理,单击 新建 Server。

在弹出的 新建Server信息 界面单击 确定 按钮。
之后,选择 Canal Server>Instance 管理,单击 新建 Instance。

单击 载入模板,显示一个配置文件,跟前面看到的类似。修改配置文件中的跟源端 canal 和数据库有关的信息。

单击 保存,命名为 mariadb。
保存后的 instance 状态为 停止,单击 操作>启动 可启动 instance,启动后,instance 状态为 启动。

- 查看 server 日志
可以命令行下查看日志,或者在 Canal Admin 里查看 server 的日志。
vi logs/canal/canal.log
- 查看 instance 的日志
可以命令行下查看日志,或者在 Canal Admin 里查看 instance 的日志。
tail -f logs/canal/canal.log tail -f logs/example/example.log tail -f logs/mariadb/mariadb.log
- 停止服务
sh bin/stop.sh
部署 RDB 适配器
Canal Adapter 提供了对多种目标容器的支持,对于 OceanBase 社区版来说,主要使用它的 rdb 模块,目的端容器为 MySQL 或社区版 OceanBase 。
Adapter 的部署需要手动部署。
- 修改启动器配置: application.yml,这里以 OceanBase 目标库为例。
首先指定 adapter 源端类型,通过 mode 指定。这里选择 tcp。后面就要指定 canal.tcp 相关属性,包括 canal server 的 IP 和 端口,数据库的连接用户和密码。
然后指定 adapter 目标端连接信息。instance 是源端实例名称,在 canal 部署的时候定义的。如果没有用 Canal Admin 部署,沿用的是 example 这个名称;如果用了 Canal Admin 部署 instance,前面命名的是 mariadb。
key 是自定义,名字后面有用。jdbc 相关属性是目标端 OceanBase MySQL 的连接方式,可以使用 MySQL 自带的驱动。
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000retries: 0timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username: tpch
canal.tcp.password: Dg0gIexJAV
canalAdapters:
- instance: mariadb # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: rdb
key: obmysql
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://127.0.0.1:2883/tpch?useUnicode=true
jdbc.username: tpch@obmysql#obdemo
jdbc.password: Dg0gIexJAV
- RDB 映射文件
修改 conf/rdb/mytest_user.yml 文件。
映射有两种:一是按表映射;二是整库映射。下面示例是整库映射。
[root@obce00 adapter]# cat conf/rdb/mytest_user.yml
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
#dbMapping:
# database: mytest
# table: user
# targetTable: mytest2.user
# targetPk:
# id: id
## mapAll: true
# targetColumns:
# id:
# name:
# role_id:
# c_time:
# test1:
# etlCondition: "where c_time>={}"
# commitBatch: 3000 # 批量提交的大小
# Mirror schema synchronize config
dataSourceKey: defaultDS
destination: mariadb
groupId: g1
outerAdapterKey: obmysql
concurrent: true
dbMapping:
mirrorDb: true
database: tpch
commitBatch: 1000
其中,destination 指定的是 canal instance 名称;outerAdapterKey 是前面定义的 key;mirrorDb 指定数据库级别 DDL 和 DML 镜像同步。
导入的类型以目标表的元类型为准, 将自动进行类型转换。
- 启动 RDB
如果使用了 OceanBase 的驱动,则将目标库 OceanBase 驱动包放入 lib文件夹。
启动 canal-adapter 启动器。
bin/startup.sh
验证修改 mysql mytest.user 表的数据, 将会自动同步到 MySQL 的 MYTEST.TB_USER 表下面, 并会打出 DML 的 log。
- 停止 RDB
bin/stop.sh
- 查看 RDB 日志
tail -f logs/adapter/adapter.log
2021-12-09 09:56:04.148 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"s_suppkey":99995,"s_name":null,"s_address":null,"s_nationkey":null,"s_phone":null,"s_acctbal":null,"s_comment":null},"database":"tpch","destination":"mariadb","old":null,"table":"supplier2","type":"INSERT"}
2021-12-09 09:56:04.149 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"s_suppkey":99998,"s_name":null,"s_address":null,"s_nationkey":null,"s_phone":null,"s_acctbal":null,"s_comment":null},"database":"tpch","destination":"mariadb","old":null,"table":"supplier2","type":"INSERT"}
2021-12-09 10:13:35.915 [Thread-3] INFO c.a.o.canal.client.adapter.rdb.monitor.RdbConfigMonitor - Change a rdb mapping config: mytest_user.yml of canal adapter
同步测试
可以对源端 MySQL 数据库 tpch 做 DML 和 DDL 测试,都可以同步到目标端。这里就不详细展开,只介绍一些已知的功能限制。
- 同步的表必须有主键。否则,源端删除无主键表的任意一笔记录,同步到目标端会导致整个表被删除。
- DDL 支持新建表、新增列。但受 OceanBase MySQL 租户功能限制,不支持后期加主键、修改列的类型(指大类型变更,如数值、字符串、日期之间类型变化)。
附录:
- 4.1 OceanBase 的 MySQL 兼容性简介
- 4.2 如何使用 mysqldump 迁移 MySQL 表 OceanBase
- 4.3 如何使用 dbcat 迁移 MySQL 表结构到 OceanBase
- 4.4 如何把 MySQL 表数据导出到 CSV 文件
- 4.5 如何使用 OceanBase 的 LOAD 命令加载 csv 数据文件 OceanBase
- 4.6 如何使用 DataX 加载 CSV 数据文件到 OceanBase
- 4.7 如何使用 DATAX 迁移 MySQL数据到 OceanBase
- 4.8 如何使用 OBDUMPER / OBLOADER 工具导出/导入 OceanBase 数据
- 4.9 如何使用 DATAX 迁移 OceanBase 数据到 MySQL/ORACLE
- 4.10 如何使用 CANAL 将 MySQL 数据实时同步到 OceanBase
结束语
加入教程直播群方式一:钉钉群号3255 4020
加入教程直播群方式二:扫码下方二维码加入
