实践练习四(必选):迁移 MySQL 数据到 OceanBase 集群
历史实践题回顾
- 实践1:OceanBase Docker 体验
- 实践2:手动部署 OceanBase 集群
- 实践3:使用OBD 部署一个 三副本OceanBase 集群
练习目的
本次练习目的掌握从 MySQL 向 OceanBase 迁移数据的基本方法:mysqldump、datax 、canal 等。
练习内容
请记录并分享下列内容:
- (必选)使用 mysqldump 将 mysql的表结构和数据同步到 OceanBase 的MySQL 租户中。
- (必选)使用 datax 配置至少一个表的 MySQL 到 OceanBase 的 MySQL 租户的离线同步。
- (可选)使用 datax 配置至少一个表的 OceanBase 到 CSV 以及 CSV 到 OceanBase 的离线同步。
- (可选)使用 canal 配置 MySQL 的增量同步到 OceanBase 的 MySQL 租户。
具体实现
准备实验数据
unzip -qo tpcc-mysql-master.zip cd tpcc-mysql-master/src make
生成tpcc_load和 tpcc_start 二进制文件
将数据导入到源端(MySQL)
创建 tpcc 数据库
CREATE DATABASE `tpcc` DEFAULT CHARACTER SET utf8;
导入表对象
# grep -ci 'create table' create_table.sql mycli tpcc <./create_table.sql mycli tpcc -e "show tables;"
加载批量数据(tpcc_load)
./tpcc_load -h progs -P 3308 -d tpcc -u root -w 1 ## 用法 ]# ./tpcc_load -h ************************************* *** TPCC-mysql Data Loader *** ************************************* ./tpcc_load: option requires an argument -- 'h' Usage: tpcc_load -h server_host -P port -d database_name -u mysql_user -p mysql_password -w warehouses -l part -m min_wh -n max_wh * [part]: 1=ITEMS 2=WAREHOUSE 3=CUSTOMER 4=ORDERS
- 查看表记录数
SELECT TABLE_NAME,DATA_LENGTH,INDEX_LENGTH,(DATA_LENGTH+INDEX_LENGTH) as length,TABLE_ROWS,concat(round((DATA_LENGTH+INDEX_LENGTH)/1024/1024,2), 'MB') as total_size FROM information_schema.TABLES WHERE TABLE_SCHEMA='tpcc' order by length desc ;
使用 mysqldump 将 mysql 的表结构和数据同步到 OceanBase 的MySQL 租户
仅导出表结构( MySQL 源端)
mysqldump -h progs -uroot -proot -P3308 -d tpcc --set-gtid-purged=OFF --compact > tpcc_ddl.sql
检查文件中是否存在特殊语法 | 变量等
grep -Ei "SQL_NOTES|DEFINER|MAX_ROWS" tpcc_ddl.sql
仅导出表数据( MySQL 源端)
mysqldump -h progs -uroot -P3308 --single-transaction --set-gtid-purged=OFF -t tpcc > tpcc_data.sql
数据同步到 Oceanbase MySQL租户
obclient 连接到 MySQL 租户中并创建数据库 tpcc
obclient -hprogs -uroot@pay_mysql_tat#obce -P2883 -c -A create database tpcc;
导入表对象
obclient -hprogs -uroot@pay_mysql_tat#obce -P2883 -c -A tpcc MySQL [tpcc]> source /tmp/tpcc_ddl.sql
导入表数据
-- 先禁用外键约束 MySQL [tpcc]> set global foreign_key_checks=off; MySQL [tpcc]> show global variables like '%foreign%'; MySQL [tpcc]> source /tmp/tpcc_data.sql
检查两端数据量
使用 datax (离线)从 MySQL 同步表数据到 OceanBase
使用 datax 配置至少一个表的 MySQL 到 OceanBase 的 MySQL 租户的离线同步
部署 datax 软件
# 下载 datax wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz # 解压 tar -xf datax.tar.gz cd datax # 删除datax中的隐藏文件 find ./datax/plugin -name ".*" | xargs rm -f
创建同步作业配置文件
生成模板文件
python ./datax/bin/datax.py -r mysqlreader -w oceanbasev10writer
修改同步作业文件
# 生成模板文件 python ./datax/bin/datax.py -r mysqlreader -w oceanbasev10writer > jog/my2ob.json # 根据实际环境修改 vi job/my2obce.json
- 修改完成作业 json 文件
{ "job": { "setting": { "speed": { "channel": 2, }, "errorLimit": { "record": 10 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "root", "splitPk": "no_o_id", "column": [ "*" ], "connection": [ { "jdbcUrl": [ "jdbc:mysql://progs:3308/tpcc" ], "table": [ "new_orders" ] } ] } }, "writer": { "name": "oceanbasev10writer", "parameter": { "writeMode": "insert", "username": "root", "password": "root", "writerThreadCount": 5, "column": [ "*" ], "connection": [ { "jdbcUrl": "||_dsc_ob10_dsc_||obce:pay_mysql_tat||_dsc_ob10_dsc_||jdbc:mysql://progs:2883/tpcc?useUnicode=true&characterEncoding=utf-8", "table": [ "new_orders" ] } ] } } } ] } }
启动同步作业
python ./bin/datax.py ./job/my2obce.json
检查确认同步情况
使用 DataX 实现 CSV 与 OceanBase 间的数据离线同步
实现 OceanBase 到 CSV 同步
编辑作业配置文件
- 生成模板
python ${DATAX_HOME}/bin/datax.py -r oceanbasev10writer -w txtfilereader > ${DATAX_HOME}/job/ob2csv.json
- 根据实际环境修改对应值
vi ${DATAX_HOME}/job/ob2csv.json
{ "job":{ "setting":{ "speed":{ "channel":10 }, "errorLimit":{ "record":0, "percentage": 0.02 } }, "content":[ { "reader":{ "name":"oceanbasev10reader", "parameter":{ "where":"", "timeout":10000, "readBatchSize":10000, "readByPartition":"true", "column": [ "actor_id", "first_name", "last_name", "last_update" ], "connection":[ { "jdbcUrl":["||_dsc_ob10_dsc_||obce:dev_mysql_tnt||_dsc_ob10_dsc_||jdbc:oceanbase://progs:2883/devdb?useUnicode=true&characterEncoding=utf-8"], "table":["actor"] } ], "username": "syncer", "password": "Sy#r3210" } }, "writer":{ "name": "txtfilewriter", "parameter": { "path": "/tmp/datax/", "fileName": "actor", "writeMode": "truncate", "dateFormat": "yyyy-MM-dd hh:mm:ss", "charset": "UTF-8", "nullFormat": "\\N", "fileDelimiter": "," } } } ] } }
启动同步作业
python ${DATAX_HOME}/bin/datax.py ${DATAX_HOME}/job/ob2csv.json
检查确认
实现 CSV 到 OceanBase 同步
编辑作业配置文件
- 生成模板
python ${DATAX_HOME}/bin/datax.py -r txtfilereader -w oceanbasev10writer > ${DATAX_HOME}/job/csv2ob.json
- 根据实际环境修改对应值
vi ${DATAX_HOME}/job/csv2ob.json
{ "job": { "setting": { "speed": { "channel": 4 }, "errorLimit": { "record": 0, "percentage": 0.1 } }, "content": [ { "reader": { "name": "txtfilereader", "parameter": { "path": ["/tmp/datax"], "fileName": "sakila_actor", "encoding": "UTF-8", "column": ["*"], "dateFormat": "yyyy-MM-dd hh:mm:ss" , "nullFormat": "\\N" , "fieldDelimiter": "," } }, "writer": { "name": "oceanbasev10writer", "parameter": { "obWriteMode": "insert", "column": [ "actor_id", "first_name", "last_name", "last_update" ], "preSql": [ "truncate table actor" ], "connection": [ { "jdbcUrl": "||_dsc_ob10_dsc_||obce:dev_mysql_tnt||_dsc_ob10_dsc_||jdbc:oceanbase://progs:2883/devdb?useLocalSessionState=true&allowBatch=true&allowMultiQueries=true&rewriteBatchedStatements=true", "table": [ "actor" ] } ], "username": "syncer", "password": "Sy#r3210", "writerThreadCount": 10, "batchSize": 100, "memstoreThreshold": "0.9" } } } ] } }
注意:
path
: 指定的CSV等文件目录仅存储待导入的文件,其它文件尽量删除避免受到干扰导致作业失败fileName
:文件前缀fieldDelimiter
:指定csv
文件的列分隔符
启动同步作业
python ${DATAX_HOME}/bin/datax.py ${DATAX_HOME}/job/csv2ob.json
检查
select * from actor limit 20; select count(*) cnt from actor;
使用 canal 配置 MySQL 的增量同步到 OceanBase 的 MySQL 租户
部署 server
解压软件
mkdir deployer && tar -xf canal.deployer-for-ob-rc2.tar.gz -C deployer
编辑配置文件
- 配置 canal-server 主配置文件:
vi conf/canal.properties
################################################# ######### common argument ############# ################################################# # tcp bind ip canal.ip = 192.168.10.181 # register ip to zookeeper canal.register.ip = 192.168.10.181 # 运行canal-server服务的主机IP canal.port = 11111 # canal-server监听的端口(TCP模式下,非TCP模式不监听1111端口) canal.metrics.pull.port = 11112 # canal-server metrics.pull监听的端口 # canal instance user/passwd # canal.user = canal # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 # canal admin config #canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register #canal.admin.register.auto = true #canal.admin.register.cluster = #canal.admin.register.name = canal.zkServers = # canal server链接zookeeper集群的链接信息,集群模式下要配置zookeeper进行协调配置,单机模式可以不用配置 # flush data to zk canal.zookeeper.flush.period = 1000 # canal持久化数据到zookeeper上的更新频率,单位毫秒 canal.withoutNetty = false # tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ canal.serverMode = tcp # canal-server运行的模式,TCP模式就是直连客户端,不经过中间件。kafka和mq是消息队列的模式 # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} # 存放数据的路径 canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 1024 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true ## detecing config # 心跳检查的配置,做HA时会用到 canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60 # network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # binlog filter config # binlog过滤的配置,指定过滤那些SQL canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false # 是否忽略DCL的query语句,比如grant/create user等,默认false canal.instance.filter.query.dml = false # 是否忽略DML的query语句,比如insert/update/delete table canal.instance.filter.query.ddl = false # 是否忽略DDL的query语句,比如create table/alater table/drop table/rename table/create index/drop index canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false canal.instance.filter.dml.insert = false canal.instance.filter.dml.update = false canal.instance.filter.dml.delete = false # binlog format/image check # binlog格式检测,使用ROW模式,非ROW模式也不会报错,但是同步不到数据 canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # binlog ddl isolation canal.instance.get.ddl.isolation = false # parallel parser config canal.instance.parser.parallel = true # 并行解析配置,若单CPU,需改为 false ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors() #canal.instance.parser.parallelThreadSize = 16 ## disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize = 256 # table meta tsdb info canal.instance.tsdb.enable = true canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; canal.instance.tsdb.dbUsername = canal canal.instance.tsdb.dbPassword = canal # dump snapshot interval, default 24 hour canal.instance.tsdb.snapshot.interval = 24 # purge snapshot expire , default 360 hour(15 days) canal.instance.tsdb.snapshot.expire = 360 ################################################# ######### destinations ############# ################################################# canal.destinations = example # canal-server创建的实例名称 # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 # set this value to 'true' means that when binlog pos not found, skip to latest. # WARN: pls keep 'false' in production env, or if you know what you want. canal.auto.reset.latest.pos.mode = false canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode = spring canal.instance.global.lazy = false canal.instance.global.manager.address = ${canal.admin.manager} #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml canal.instance.global.spring.xml = classpath:spring/file-instance.xml #canal.instance.global.spring.xml = classpath:spring/default-instance.xml ################################################## ######### MQ Properties ############# ################################################## # aliyun ak/sk , support rds/mq canal.aliyun.accessKey = canal.aliyun.secretKey = canal.aliyun.uid= canal.mq.flatMessage = true canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 # Set this value to "cloud", if you want open message trace feature in aliyun. canal.mq.accessChannel = local canal.mq.database.hash = true canal.mq.send.thread.size = 30 canal.mq.build.thread.size = 8 ################################################## ######### Kafka ############# ################################################## kafka.bootstrap.servers = 127.0.0.1:9092 kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 1 kafka.max.request.size = 1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 0 kafka.kerberos.enable = false kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf" kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf" ################################################## ######### RocketMQ ############# ################################################## rocketmq.producer.group = test rocketmq.enable.message.trace = false rocketmq.customized.trace.topic = rocketmq.namespace = rocketmq.namesrv.addr = 127.0.0.1:9876 rocketmq.retry.times.when.send.failed = 0 rocketmq.vip.channel.enabled = false rocketmq.tag = ################################################## ######### RabbitMQ ############# ################################################## rabbitmq.host = rabbitmq.virtual.host = rabbitmq.exchange = rabbitmq.username = rabbitmq.password = rabbitmq.deliveryMode = ################################################## ######### Pulsar ############# ################################################## pulsarmq.serverUrl = pulsarmq.roleToken = pulsarmq.topicTenantPrefix =
- 若文件中
canal.destinations
指定了多个实例名称,需要在./deployer/conf
同级目录下创建对应的实例,按下面步骤进行 - 示例:
grep 'canal.destinations' ./deployer/conf/canal.properties canal.destinations = example, order
- 创建对应实例目录
cd ./deployer/conf cp -a example order
- 配置实例配置文件(以example实例举例说明)
以example实例配置进行说明,其它实例参照修改
vi conf/example/instance.properties
################################################# ## mysql serverId , v1.0.26+ will autoGen canal.instance.mysql.slaveId=2000 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=192.168.10.181:3308 # 指定要读取binlog的源端MySQL的IP地址和端口 canal.instance.master.journal.name= # 指定开始读取数据的binlog文件名称 canal.instance.master.position= # 指定偏移量 canal.instance.master.timestamp= # 指定起始的binlog的时间戳 canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=syncer # 指定连接mysql的用户密码 canal.instance.dbPassword=Sy#r3210 canal.instance.connectionCharset = UTF-8 # 字符集 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=devdb\\..* # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 #################################################
注意:
- 上面的中文注释说明只是为了文档记录说明作用,运行环境中需要删除
启动服务
./bin/startup.sh
检查服务状态
# 查看 Server 日志 tail -100f logs/canal/canal.log tail -100f logs/canal/canal_stdout.log # 查看 instance 的日志 tail -10f logs/example/example.log
部署 adapter
解压软件
mkdir adapter && tar -xf canal.adapter-for-ob-rc2.tar.gz -C adapter
配置 adapter
- 修改启动器配置:
vi conf/application.yml
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8default-property-inclusion: non_null canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000retries: 0timeout: accessKey: secretKey: consumerProperties: # canal tcp consumer # 指定canal-server的地址和端口 canal.tcp.server.host: 192.168.10.181:11111 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger - name: rdb key: obmysql properties: jdbc.driverClassName: com.mysql.jdbc.Driver jdbc.url: jdbc:mysql://192.168.10.181:2883/devdb?useUnicode=true jdbc.username: syncer@dev_mysql_tnt#obce jdbc.password: Sy#r3210
- 修改 RDB 映射文件:
conf/rdb/mytest_user.yml
dataSourceKey: defaultDS destination: example groupId: g1 outerAdapterKey: obmysql concurrent: true dbMapping:mirrorDb: truedatabase: devdb
- destination : 指定的是
canal instance
名称 - outerAdapterKey : 指定的是 application.yml 文件中的 key
启动服务
cd adapter && ./bin/startup.sh
检查
# 检查日志 tail -100f logs/adapter/adapter.log
验证增量数据的同步情况
上游 MySQL(master) 端进行 DML 和 DDL 变更
- DDL 测试
create table t2(id int primary key, name varchar(32)) partition by hash(id);
- DML 测试
insert into t2 values(1, 'go'),(2, 'python'),(3, 'C++'); commit; update t2 set name='C' where id = 3; commit; delete from t2 where id = 2;
检查验证目标端数据同步情况
select * from t2;
- adapter 日志
附录
参考资料
- 社区版官网-文档-学习中心-入门教程:4.2 如何使用 mysqldump 迁移 MySQL 表 OceanBase 。
- 社区版官网-博客-入门实战:4.2:如何使用 mysqldump 迁移 MySQL 表 OceanBase
- https://open.oceanbase.com/blog/8600178