使用 Canal 同步数据
目标
操作步骤
说明
软件版本:canal.deployer-1.1.7、canal.admin-1.1.7、canal-adapter-1.1.7
执行过程中发现该版本无法直接在 admin 启停 canal-server,确认为 bug,不想升级或更换版本的话,手动启停 canal-server 即可。
本地安装方式
1、安装启动canal-admin。
成功后,可以通过 http://127.0.0.1:8089/ */ 访问,默认密码:admin/123456
2、安装启动canal-server
这里使用远程配置方式(即通过canal-admin进行配置的编辑管理,数据保存在数据库)。
下载软件包到本地后,对 conf/canary.properties 进行必要的修改,如下:
#
canal.register.ip = 127.0.0.1
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
# 对应密码123456
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
#admin auto register
# canal-server 启动后会从 canal-admin 拉取配置信息(canal.properties),如果为true,当配置不存在时会自动注册 canal-server 的信息,并自动在数据库中创建一个关联的配置记录
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name = local_demo
按照上面的配置,在 canal-server 启动成功后,会自动注册 server 到 canal-admin,然后就可以直接在 canal-admin 管理和配置该 canal-server 了。
注意:如果 canal-server 启动时拉取配置失败,记得检查 canal.register.ip 的配置,确保是 canal-admin 可以访问的 IP。
3、安装启动 canal-adapter
前面安装 canal-admin 时已经创建了 canal-manager 数据库,所以这里依然选择使用远程配置的方式:
- 在 canal_config 表中创建 id=2 的数据对应 adapter 下的 application.yml 文件。
- 在 canal_adapter_config 表对应每个adapter的子配置文件。
注意: 目前这个远程配置功能似乎并没有实装,所以后面依然采用修改本地配置的方式。
4、测试同步(全量+增量)
准备两个数据库:test_a 和 test_b,在 test_a 创建一张 user 表并插入一批数据,在 test_b 创建同样的表但是不写入任何数据。现在需要将 test_a 中 user 表的数据同步到 test_b 的对应表,然后测试增量同步。
操作步骤:
outerAdapterKey : syncUser
关联到 adaptercreate schema test_a;
-- 创建表
CREATE TABLE test_a.user (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) NOT NULL,
password VARCHAR(50) NOT NULL,
email VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 插入数据
INSERT INTO test_a.user (username, password, email) VALUES
('john_doe', 'password123', 'john.doe@example.com'),
('jane_smith', 'securepass', 'jane.smith@example.com'),
('alice_jones', 'alice123', 'alice.jones@example.com'),
('bob_brown', 'password456', 'bob.brown@example.com'),
('carol_gray', 'carolpass', 'carol.gray@example.com');
-- 创建备份库和表
create schema test_b;
CREATE TABLE test_b.user (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) NOT NULL,
password VARCHAR(50) NOT NULL,
email VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: -1
timeout: 100
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/test_a?useUnicode=true
username: test
password: test
canalAdapters: # 适配器列表。
- instance: instance_test_a # canal instance Name or mq topic name
groups: # 一份数据可以被多个group同时消费, group之间并行执行, 一个group内部串行执行多个outerAdapters
- groupId: g1
outerAdapters:
- name: rdb # 指定为rdb类型同步,同步到test_b库
key: syncUser # 指定adapter的唯一key, 与表映射配置中outerAdapterKey对应
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://127.0.0.1:3306/test_b?useUnicode=true
jdbc.username: test
jdbc.password: test
druid.stat.enable: false
druid.stat.slowSqlMillis: 1000
test_user.yml
dataSourceKey: defaultDS
destination: instance_test_a
groupId: g1
outerAdapterKey: syncUser
concurrent: true
dbMapping:
database: test_a # 源数据库
table: user # 源表
targetTable: user #目标表
targetPk:
id: id
mapAll: true
commitBatch: 3000 # 批量提交的大小
注意
测试发现当前版本(v1.1.7)adapter 远程配置不生效,所以依然采用编辑本地配置文件(application.yml 和 conf/rdb/test_user.yml)的方式。
手动执行ETL:
curl http://127.0.0.1:8081/etl/rdb/test_user.yml -X POST
> {"succeeded":true,"resultMessage":"导入RDB 数据:6 条"}
后面直接在 test_a.user 表执行 DML 操作就会自动同步到 test_b.user 了。注意,DDL 不会同步哦!