使用Canal实现MySQL到MySQL的全量、增量同步
一、简要说明
canal的使用需要一个服务端canal-deploy 和客户端canal-adapter,简单来说,服务端来监听源数据库的bin-log变化并解析为 sql等待客户端消费;客户端连接服务端来进行sql消费。
二、MySQL配置
参考官方文档:https://github.com/alibaba/canal/wiki/QuickStart?utm_source=www.jeeinn.com
2.1 安装 MySQL
Mysql8.0安装部分-主备库都需要安装MySQL
一、环境准备:
0.服务器规划
服务器名称 服务器IP 安装组件
master-db1 192.168.100.202 master, canal-deploy,canal-adapter,canal-admin
slave-db2 192.168.100.203 slave
1.目录创建:
mkdir -p /data/{software,mysql}
mkdir -p /data/mysql/{data,log,tmp}
2.用户创建:
useradd mysql
3.安装依赖包:
yum install perl perl-devel perl-Data-Dumper libaio-devel -y
二、安装:
1.下载包文件:
cd /data/software/
wget https://cdn.mysql.com/archives/mysql-8.0/mysql-8.0.26-el7-x86_64.tar.gz
tar -xvf /data/software/mysql-8.0.26-el7-x86_64.tar.gz
ln -s /data/software/mysql-8.0.26-el7-x86_64 /usr/local/mysql
2.权限赋值:
chown -R mysql:mysql /usr/local/mysql /data/mysql/*
三、配置文件:
参数可以根据实际需求进行调配
vi /etc/my.cnf
[client]
port = 3307
socket = /data/mysql/tmp/mysql.sock
[mysql]
prompt="\u@db \R:\m:\s [\d]> "
no-auto-rehash
[mysqld]
user = mysql
port = 3307
basedir = /usr/local/mysql
datadir = /data/mysql/data
tmpdir = /data/mysql/tmp
socket = /data/mysql/tmp/mysql.sock
pid-file = /data/mysql/tmp/mysql.pid
character-set-server=utf8mb4
collation-server = utf8mb4_general_ci
sql_mode=‘NO_UNSIGNED_SUBTRACTION,NO_ENGINE_SUBSTITUTION’
open_files_limit = 65535
innodb_open_files = 65535
back_log=1024
max_connections = 512
max_connect_errors=1000000
interactive_timeout=300
wait_timeout=300
max_allowed_packet = 1024M
secure_file_priv=’’
log-error=/data/mysql/log/error.log
slow_query_log=ON
slow_query_log_file=/data/mysql/log/slow_mysql.log
long_query_time=2
innodb_flush_log_at_trx_commit=1
innodb_log_file_size =1G
innodb_log_files_in_group=3
innodb_log_group_home_dir=./
log-bin-trust-function-creators=1
sync_binlog = 1
binlog_cache_size = 16M
max_binlog_cache_size = 1G
max_binlog_size=1G
expire_logs_days = 30
log-bin= /data/mysql/log/binlog-mysql
binlog_format=row
binlog_row_image=full
server-id = 1
default_authentication_plugin =mysql_native_password
# 大小根据实际系统内存情况而定
innodb_buffer_pool_size=4G
innodb_buffer_pool_instances=2
四、初始化
1初始化操作
/usr/local/mysql/bin/mysqld --defaults-file=/etc/my.cnf --basedir=/usr/local/mysql --datadir=/data/mysql/data --user=mysql --initialize
#查看初始化后数据库文件:
[root@localhost log]# ls /data/mysql/data
auto.cnf ca.pem client-key.pem ibdata1 ib_logfile1 #innodb_temp mysql.ibd private_key.pem server-cert.pem sys undo_002
ca-key.pem client-cert.pem ib_buffer_pool ib_logfile0 ib_logfile2 mysql performance_schema public_key.pem server-key.pem undo_001
2查看随机密码:可以看到我们的初始密码为 qsD#h/qwP4p?
[root@localhost redis]# cat /data/mysql/log/error.log | grep password
[Note] [MY-010454] [Server] A temporary password is generated for root@localhost: wAdJXLFSV7>P
五、最后我们启动mysql:
/usr/local/mysql/bin/mysqld_safe --defaults-file=/etc/my.cnf &
#建议软链接
ln -s /data/mysql/tmp/mysql.sock /tmp/mysql.sock
六、修改密码,设置环境变量以及自启动
# 配置环境变量
[root@localhost log]# echo ‘export PATH=/usr/local/mysql/bin:$PATH’ >> /etc/profile
[root@localhost ~]# source /etc/profile
# 修改密码
[root@localhost ~]# /usr/local/mysql/bin/mysqladmin -uroot -p password
Enter password: 8y/,JHui*y;a
New password: Password123
Confirm new password: Password123
# 设置自启动
[root@localhost ~]# cp /usr/local/mysql/support-files/mysql.server /etc/init.d/mysqld
[root@localhost ~]# chkconfig --add mysqld
[root@localhost ~]# chkconfig mysqld on
# 登入Mysql
[root@localhost bin]# mysql -uroot -pPassword123
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 12
Server version: 8.0.26 MySQL Community Server - GPL
Copyright © 2000, 2021, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type ‘help;’ or ‘\h’ for help. Type ‘\c’ to clear the current input statement.
#赋予远程访问权限
select host, user, authentication_string, plugin from mysql.user;
update mysql.user set host=’%’ where user=‘root’;
grant all privileges on *.* to root@’%’ with grant option; //执行两次
#刷新权限
flush privileges;
#登陆
mysql -uroot -pPassword123 -h192.168.100.202 -P3307
2.2 修改 my.ini 配置文件
[mysqld]
log-bin=mysql-bin # 开启binlog
binlog-format=ROW # 选择row模式
server_id=1 # serverid唯一
2.3 授权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO ‘canal’@’%’ IDENTIFIED BY ‘canal’;
FLUSH PRIVILEGES;
2.4 重启数据库
service mysqld restart
三、部署 canal
3.1 安装 java
yum -y install java
验证安装
java -version
openjdk version “1.8.0_312”
OpenJDK Runtime Environment (build 1.8.0_312-b07)
OpenJDK 64-Bit Server VM (build 25.312-b07, mixed mode)
3.2 下载 canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
mkdir -p /opt/canal/deploy
tar xf canal.deployer-1.1.5.tar.gz -C /opt/canal/deploy
3.3 配置 canal
cd /opt/canal/deploy
vim conf/example/instance.properties
需要修改的参数:canal.instance.master.address,canal.instance.dbUsername,canal.instance.dbPassword,canal.instance.filter.regex
#################################################
## mysql serverId , v1.0.26+ will autoGen server-id 自动生成
# canal.instance.mysql.slaveId=0
# enable gtid use true/false 是否启用gtid
canal.instance.gtidon=false
# position info 位置信息
canal.instance.master.address=192.168.100.202:3307 #deploy服务器的IP和端口
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog rds oss信息
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info 时序数据库相关信息
canal.instance.tsdb.enable=true # 启用时序数据库,默认使用h2数据库保存
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
# canal 备机信息
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password 账号密码
canal.instance.dbUsername=canal # 用户名
canal.instance.dbPassword=canal # 密码
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=mytest.t1 #需要同步的表
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\…*,.*\\…*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\…*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
3.4 运行
./bin/startup.sh
运行之后会监听11110、11111、11112端口,检查
[root@master-db1 rdb]# ss -tanpl |grep java
LISTEN 0 100 *:8089 *😗 users:((“java”,pid=6453,fd=106))
LISTEN 0 50 *:11110 *😗 users:((“java”,pid=9528,fd=93))
LISTEN 0 50 *:11111 *😗 users:((“java”,pid=9528,fd=91))
LISTEN 0 3 *:11112 *😗 users:((“java”,pid=9528,fd=74))
LISTEN 0 100 *:8081 *😗 users:((“java”,pid=13288,fd=107))
四、部署 canal-apapter
主库的配置通过 adapter 进行管理
4.1 下载 canal-adapter
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz
mkdir /opt/canal/adapter
tar xf canal.adapter-1.1.5.tar.gz -C /opt/canal/adapter
4.2 配置 canal-adapter
cd /opt/canal/adapter
需要修改的参数:canal.tcp.server.host,kafka.bootstrap.servers,rocketmq.namesrv.addr,srcDataSources的url,username,password;
canalAdapters的srcDataSources的url,username,password;
vim conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 192.168.100.202:11111 #填写canal-adapter服务器的ip地址加端口号
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 192.168.100.202:9092 #填写canal-adapter服务器的ip地址加端口号
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 192.168.100.202:9876 #填写canal-adapter服务器的ip地址加端口号
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
…
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.100.202:3307/mytest?useUnicode=true #填写源端数据库的ip加端口以及数据库名称
username: root #登录源端数据库用户名
password: ‘Password123’ #登录源端数据库密码
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://192.168.100.203:3307/mytest?useUnicode=true #填写目标端数据库的ip加端口以及数据库名称 #源端和目标端数据库一致,确保同步无报错
jdbc.username: root #登录目标端数据库用户名
jdbc.password: ‘Password123’ #登录目标端数据库密码
记得给用户授权,请确保上面的账号有写入权限
这里同步整个 test 数据库为例
需要修改参数值database,table,targetTable,id,mapAll
vim conf/rdb/mytest_user.yml
...
dataSourceKey: defaultDS
destination: example #参数不变
groupId: g1 #参数不变
outerAdapterKey: mysql1 #参数不变
concurrent: true
dbMapping:
database: mytest #源端和目标端数据库名一致
table: t1 #源端同步表
targetTable: t1 #目标端同步表
targetPk:
id: id
mapAll: true #源端和目标端表结构一致
#targetColumns:
# id:
# name:
# role_id:
# c_time:
# test1:
#etlCondition: “where c_time>={}”
#commitBatch: 3000 # 批量提交的大小
…
4.3 运行 canal-adapter
./bin/startup.sh
程序运行起来之后监听 8081 端口。
4.4 测试数据同步
CREATE TABLE t1(id INT auto_increment PRIMARY KEY, name VARCHAR(32));
INSERT INTO t1(name) VALUES(“tom”),(“jerry”),(“jane”);
观察数据同步情况;
同步任务执行前:
master-db1:
root@db 14:43: [(none)]> show databases;
+--------------------+
| Database |
+--------------------+
| canal_manager |
| information_schema |
| mysql |
| mytest |
| performance_schema |
| sys |
+--------------------+
6 rows in set (0.04 sec)
root@db 14:43: [(none)]>
root@db 14:43: [(none)]> use mytest;
Database changed
root@db 14:43: [mytest]> CREATE TABLE t1(id INT auto_increment PRIMARY KEY, name VARCHAR(32));
Query OK, 0 rows affected (0.07 sec)
同步任务执行后:
root@db 14:44: [mytest]> INSERT INTO t1(name) VALUES("tom"),("jerry"),("jane");
Query OK, 3 rows affected (0.02 sec)
Records: 3 Duplicates: 0 Warnings: 0
root@db 14:44: [mytest]> select * from t1;
+----+-------+
| id | name |
+----+-------+
| 1 | tom |
| 2 | jerry |
| 3 | jane |
+----+-------+
3 rows in set (0.00 sec)
slave-db2:
同步任务执行前:
root@db 14:42: [mytest]> CREATE TABLE t1(id INT auto_increment PRIMARY KEY, name VARCHAR(32));
Query OK, 0 rows affected (0.07 sec)
root@db 14:43: [mytest]> select * from t1;
Empty set (0.00 sec)
同步任务执行后:
root@db 14:43: [mytest]>
root@db 14:43: [mytest]> select * from t1;
+----+-------+
| id | name |
+----+-------+
| 1 | tom |
| 2 | jerry |
| 3 | jane |
+----+-------+
3 rows in set (0.00 sec)
五、部署 canal-admin
canal-admin 是一款 web 管理工具(可选)
5.1 下载 canal-admin
最好是跟 canal 版本保持一致
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz
mkdir /opt/canal/admin
tar xf canal.admin-1.1.5.tar.gz -C /opt/canal/admin
5.2 配置 canal-admin
cd /opt/canal/admin
vim conf/application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 127.0.0.1:3306
database: canal_manager
username: canal
password: IeT6Cohc.
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://spring.datasource.address/{spring.datasource.address}/spring.datasource.address/{spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
5.3 初始化 canal-admin
从上面的配置文件我们可以看到需要一个 canal_manager 的数据库,对应的 sql 文件在 conf 目录下
[root@sdb2 ~]# cd /opt/canal/admin/
source conf/canal_manager.sql
GRANT ALL ON canal_manager.* TO canal@’%’;
FLUSH PRIVILEGES;
该 sql 会创建数据库和初始化库表结构并初始化一条记录到表中,用于登录
5.4 运行
./bin/startup.sh
默认监听 8089 端口
5.5 访问
http://192.168.100.202:8089/
默认账号密码admin/123456
5.6 web配置
Server 管理 -> 新建 Server
输入服务器IP信息和端口,添加本机的Server,添加完后看到状态为启动状态。
Instance 管理->新建 Instance
输入Instance 名称,选择刚新建的Server,点击载入模板,填写相关信息,最后点击保存。
六、canal 整合 kafka
6.1 部署 kafka
部署 Zookeeper 和 Kafka 略
6.2 配置 canal
修改 canal.properties 配置文件
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka # 默认为tcp
…
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092 # kafka服务器地址,多个用逗号分隔
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = “…/conf/kerberos/krb5.conf”
kafka.kerberos.jaas.file = “…/conf/kerberos/jaas.conf”
…
6.3 重启 canal 服务
./bin/restart.sh
6.4 配置 adapter
vim conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: kafka #tcp kafka rocketMQ rabbitMQ
…
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092 # 修改kafka服务器地址信息,多个服务器用逗号分隔
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
…
6.5 重启 canal-adapter
./bin/restart.sh
七、全量数据同步
方法一:
7.1 修改 instance.properties 配置文件
vim conf/example/instance.properties
# position info
canal.instance.master.address=127.0.0.1:3306
# mysql binlog文件
canal.instance.master.journal.name=mysql-bin.0001
# binlog起始位置
canal.instance.master.position=0
# 获取binlog的起始时间戳
canal.instance.master.timestamp=1638005221
canal.instance.master.gtid=
7.2 删除记录文件 meta.dat
因为这个文件记录 canal 已经消费的位置等信息。
7.3 重启服务
./bin/restart.sh
方法二:
curl http://192.168.100.202:8081/etl/rdb/mysql1/mytest_user.yml -X POST -d “params=2024-05-28 15:00:00”
[root@master-db1 bin]# curl http://192.168.100.202:8081/etl/rdb/mysql1/mytest_user.yml -X POST -d "params=2024-05-28 15:00:00"
{"succeeded":true,"resultMessage":"导入RDB 数据:3 条"}
使用canal同步参考链接:
https://help.aliyun.com/zh/tablestore/use-cases/use-canal-to-synchronize-data#section-frt-eel-58y