数据中间件如何与MySQL数据同步?

2023年 8月 15日 60.1k 0

  • 1.引入
  • 2.传统方案介绍
  • 3.监控binlog实现"同步"更新
  • 4.总结

1.引入

先前介绍了ElasticSearch,以及ES配合MySQL的问题,这种方案是让ES上的数据根据MySQL的数据做对照从而形成对应的索引,再将数据通过处理和封装存放在ES当中。(可回顾:技术分析 | 浅析MySQL与ElasticSearch的组合使用)回到生产环境,我们如何保证MySQL中与ES对照的数据发生更新的时候ES也进行更新呢?就以ES为例。

2.传统方案介绍

2.1直接的"同步"更新

第一种方式十分直接,当发生对MySQL数据更新操作时,由服务器对MySQL和ES同时进行更新操作,如图:这种方式实现起来十分“简单粗暴”,容易理解。这种实现方法显然可以解决这个问题,但绝不是最优解,原因如下:

  • 首先,这种方法使得我们进行数据库的数据写入、修改、删除等操作,后面都要跟上ES的同步操作,代码书写也过于冗长,且大大加大了业务的耦合度。
  • 其次,这种方法不能很好解决“同步”的问题,如果在执行对应操作的时候,发生了断电等情况,就有可能导致数据不同步的问题。
  • 最后,为了保证两者的更新要么同时完成要么都不完成,需要开启事务来处理,系统的性能有所降低,同时,在高并发情况下,有可能造成服务的“雪崩”。

2.2异步的"同步"更新

针对前面的方案,可以考虑加入消息队列的中间件来优化,与第一种方法不同的是当发生对MySQL数据更新操作时,服务器会完成MySQL数据的更新,并通过MQ的队列通过设置好的交换机发送更新ES的消息给对应的接收更新消息的队列,然后完成对应ES数据更新的实现。如图:这种方案将直接的更新方式转换为异步的更新方式,性能上显然提高了,同时降低了业务耦合度,也优化了数据“同步”的问题。但是,这种方案会出现MQ的消费者在消费时可能会因为网络等原因导致用户数据有延时。同时,从编码角度上看,每次系统要进行同步的时候都要编写MQ代码,仍然存在业务的耦合,同时系统架构的设计也因为加入新的中间件要重新考虑维护的问题。

3.监控binlog实现"同步"更新

上面两种方案中都存在硬编码问题,同时存在强的业务耦合,以至于实现MySQL数据更新后的数据同步问题的代价要么是植入ES更新代码,要么替换为MQ代码,代码的侵入性太强,且性能降低。因此可以通过监控MySQL的binlog来实现数据的同步。

3.1问题分析

binlog,该日志存在于Server层次中,是使用存储引擎都可以使用的日志模块,binlog是逻辑日志,记录的是这个语句的原始逻辑,比如“给test表id=5这一行的col1字段值加1”。binlog的日志文件是可以追加写入的。“追加写入”是指binlog日志文件写到一定大小后会切换到下一个文件进行写入,可以设置sync_binlog为1,让每次事务的binlog都持久化保存到磁盘中。binlog在ROW模式下会记录每次操作后每行记录的变化。虽然此模式下所占用的空间较大,但此模式可以保持数据的一致性。因此不管SQL是什么,引用了什么函数,他记录的是执行后的效果。

3.2使用Canal来监控binlog

Canal是阿里用Java开发的基于数据库增量的日志解析,是提供增量数据订阅&消费的中间件。目前,Canal主要支持了 MySQLbinlog 解析,解析后可利用 Canal Client 来处理获得的相关数据。

详细可参考:https://github.com/alibaba/canal/wiki

Canal的实现原理基于MySQL主从复制进行设计:

  • Master主库将改变记录到逻辑日志(binary log)中(这些记录叫做逻辑日志事件,binary log events,可以通过show binlog events进行查看);
  • Slave从库将Master主库的binary log events拷贝到它的中继日志(relay log);
  • Slave从库读取从重做中继日志中的事件,将改变反映它自己的数据同步到数据库中。

而Canal就是将自身伪装成一个Slave从库,假装从Master主库复制数据:

  • Canal模拟MySQL Slave的交互协议,伪装自己为MySQL Slave,向MySQL Master发送dump协议;
  • MySQL Master收到dump请求,开始推送binary log给Slave(也就是Canal);
  • Canal解析binary log对象(原始为byte流)。

这种方案的好处是程序中没有代码侵入、没有硬编码。同时,原有系统不需要任何变化对原方案的高耦合进行了业务解耦,不需要关注原来系统的业务逻辑。

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:

[mysqld]# 开启 binloglog-bin=mysql-bin # 选择 ROW 模式binlog-format=ROW # 指定开启binlog的数据库,不指定则全部数据库开启binlog-do-db=databasename# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复server_id=1 

创建canal账户,授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限

[mysqld]CREATE USER canal IDENTIFIED BY 'canal';  GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;FLUSH PRIVILEGES;

下载并启动Canal

wget https://github.com/alibaba/canal/releases/download/canal-1.1.2/canal.deployer-1.1.2.tar.gzmkdir /tmp/canaltar zxvf canal.deployer-1.1.2.tar.gz  -C /tmp/canal

修改Canal的配置文件

vi conf/example/instance.properties## mysql serverIdcanal.instance.mysql.slaveId = 1234#position info,需要改成自己的数据库信息canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name =#canal.instance.standby.position = #canal.instance.standby.timestamp = #username/password,需要改成自己的数据库信息canal.instance.dbUsername = canal  canal.instance.dbPassword = canalcanal.instance.defaultDatabaseName =canal.instance.connectionCharset = UTF-8#table regexcanal.instance.filter.regex = .*\\..*

Canal操作

# 启动sh bin/startup.sh# 查看server日志vi logs/canal/canal.log</pre># 查看 instance 的日志vi logs/example/example.log# 关闭sh bin/stop.sh

以Java为例,创建测试项目Maven工程,导入应用开发场景:

    <dependencies>        <dependency>            <groupId>com.alibaba.otter</groupId>            <artifactId>canal.client</artifactId>            <version>1.1.2</version>        </dependency>        <dependency>            <groupId>org.apache.kafka</groupId>            <artifactId>kafka-clients</artifactId>            <version>2.4.1</version>        </dependency>    </dependencies>

编写日志监视类CanalClient来从日志中抓取信息,首先,获取canal的连接对象并连接:

//获取 canal 连接对象CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),"example", "canal","canal");//连接canalConnector.connect();

指定需要监控的数据库,并根据数据量来获取 Message :

//指定要监控的数据库canalConnector.subscribe("databasename.*");//获取 MessageMessage message = canalConnector.get(100);

接着就可以通过处理 Message 来得到监控信息内容了:

List<CanalEntry.Entry> entries = message.getEntries();    if (entries.size() > 0) {    for (CanalEntry.Entry entry : entries) {        //获取表名        String tableName = entry.getHeader().getTableName();        //Entry 类型        CanalEntry.EntryType entryType = entry.getEntryType();        //判断 entryType 是否为 ROWDATA        if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {            //序列化数据            ByteString storeValue = entry.getStoreValue();            //反序列化            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);            //获取事件类型            CanalEntry.EventType eventType = rowChange.getEventType();            //获取具体的数据            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();            //遍历并打印数据            for (CanalEntry.RowData rowData : rowDatasList) {                List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();                JSONObject beforeData = new JSONObject();                for (CanalEntry.Column column : beforeColumnsList) {                    beforeData.put(column.getName(), column.getValue());                }                JSONObject afterData = new JSONObject();                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();                for (CanalEntry.Column column : afterColumnsList) {                    afterData.put(column.getName(), column.getValue());                }                System.out.println("TableName:" + tableName                        +                        ",EventType:" + eventType +                        ",Before:" + beforeData +                        ",After:" + afterData);            }        }    }}

从代码中可以看出,当系统与Canal建立连接后可以获取Message来监控数据库的操作,Message是一次Canal从MySQL的 bin log 中抓取的信息,一个Message中可以有多个SQL执行的结果,每个SQL执行结果(SQL命令)称为Entry,如图:

Entry中包含 TableName 、 EntryType 和 StoreValue ,其中 StoreValue 包含了数据变化的内容。如下:

要想进行使用还需要进行反序列化操作才可以进行使用,如下:

当然,实际生产环境Canal可以配置MQ模式,配合RocketMQ或者Kafka,canal会把数据发送到MQ的topic中,然后通过消息队列的消费者进行处理。首先需要修改canal.properties文件,这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111,修改 canal 的输出 model,默认 tcp,改为输出到 kafka,instance.properties文件输出得到主题为kafka,可配置集群,再次启动canal就可以启动 Kafka 消费客户端测试,查看消费情况了。

4.总结

本文介绍了三种方式使得中间件的数据与MySQL的数据保存同步,前两种方法在使用性能和设计上都存在较大漏洞,而第三种通过读取MySQL的bin log日志,获取指定表的日志信息来实现数据同步的方法,在编码上看没有代码侵入,业务耦合度低,且原有系统不需要任何变化,但,构建bin log监控系统需要做好规划,不多赘述了。

Enjoy GreatSQL 🙂

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论