如何实现一个简单的数据同步组件

2023年 7月 19日 44.9k 0

目前关于数据同步的组件开源社区有很多,如:Flink CDC, DataX, seaTunel,Kattle 等等,大体上可以分为两种:基于日志的和基于 JDBC 的。这些同步组件在进行整库同步或者 schema 差异性不大的情况下通过可视化界面或者配置文件映射的方式可以直接达到我们库-库的同步诉求,但是对于一些定制化较大的场景处理起来还比较麻烦。为了满足这样的一个场景,笔者写了一个小的同步组件。

PS: 我们的业务场景比较特殊,源的种类比较多,有 Oracle、Mysql、文件以及 API 接口等。另外一点是需要同步的数据量不是很大,引入额外的数据同步组件对于我们来说会有额外的运维成本和学习成本,因此基于上述两个点,决定自己写一个小的组件。

业务框架

image.png

两个基本需求

  • 支持双写,即外部源经过数据同步组件,一方面是根据自定义的标准化模型转换成需要的专题数据;另一方面是需要将原始数据原封不动的重新落到我们自己的库中;
  • 能够支持将专题库数据通过逆向转换,写成不同的原始库的数据格式

组件模型

以 Mysql 为例,同步组件的主要模块如下:

image.png

  • 1、绿色背景是业务插件,每个业务都会对应一个业务插件,编写同步器 Syncer 和 转换器 Convertor
  • 2、点虚线是一条单独的通道,用于将目标库数据转换成不同的原始库数据
  • 3、借助了一些队列实现了简单的生产者消费者模型,队列作为一个缓冲区,以便于后续实现对于读取原始库和写目标库的控制。

项目模块划分如下:

image.png

  • api-web 对外提供 api 服务,能够发起数据同步任务、取消数据同步任务、暂停数据同步任务和取消数据同步任务等接口
  • common 是一些公共的工具、模型、枚举。
  • connectors 连接器的具体实现,比如 MysqlConnector 就是基于 JDBC API 实现对原始库的数据读。
  • core 核心包,主要包括一些接口的定义和逻辑处理的标准化流程
  • executors 执行器层,包括任务管理、线程池资源管理等
  • plugins 具体业务插件,主要使用 Syncer 同步器和 Convertor 模型转换器

核心问题

针对数据同步组件,解决的核心问题可以抽象成如下模型:A_DB -> A -> B -> B_DB;即将 A 数据库中的数据读取出来之后转换成 A class instance,然后将 A class instance 转换成 B class instance,再将 B class instance 写到 B 数据库。

解决 A_DB 到 A

从 A_DB -> A 或者 B -> B_DB 这个过程,就是我们所熟知的 ORM 解决的问题;不管是 hibernate、mybatis 还是 SpringBoot JPA 都是围绕着这个问题展开的。

在本篇的组件中,因没有引入 ORM,所以将数据库行映射成一个 java 对象也需要自己实现。DataX 中是通过配置文件来描述的,在本篇中没有才采用这种描述方式,而是通过语言耦合性更高的注解的方式来实现的(由业务属性决定);

如下是一个描述具体业务的 Java 对象的定义,@Table 注解用来描述 JmltModel 和哪个表是关联的, @Colum 注解用来描述属性是和哪个字段关联的.

@Data
// @Table 注解用来描述 JmltModel 和哪个表是关联 的
@Table(name = "user_info") 
public class JmltModel implements Serializable {
    // @Colum 注解用来描述属性是和哪个字段关联的
    @Colum(name = "id")
    private Long id;
    @Colum(name = "email")
    private String email;
    @Colum(name = "name")
    private String name;
    @Colum(name = "create_time")
    private Date create_time;
}

有了这个描述关系,即可以在 runtime 时通过泛型 + 反射来实现 A_DB -> A 过程的模板设计。

MysqlConnector 的实现来进行说明,下面抽取了 MysqlConnector 组件中的部分代码(做了一些删减);下面这段代码中有 1-6 6 的步骤,这部分属于生产端,即从原始 Mysql 表中分页读取数据,并将读取到的数据映射成实际的对象,再通过业务定义的 convertor 转换成目标的对象,最后丢到队列中去等待消费

// 1、originClass 是原始库对象,这里通过反射获取 Table 注解,从而拿到表名
Table table = (Table) originClass.getDeclaredAnnotation(Table.class);
String tableName = table.name();

// 2、计算所有的条数,然后按照分页的方式进行 fetch
SqlTemplate sqlTemplate = new SqlTemplate(this.originDataSource);
int totalCount = sqlTemplate.count();
RowBounds rowBounds = new RowBounds(totalCount);
int totalPage = rowBounds.getTotalPage();
// 3、这里是按分页批量拉取
for (int i = 1; i Map -> Java Object
,通过 ResultSet 的 getMetaData 可以取到所有的列名(K)和值(V),并将其存储到 Map 中,代码如下:

/**
* 将 resultSet 转成 Map
*
* @param resultSet
* @return
* @throws SQLException
*/
private Map resultSetToMap(ResultSet resultSet) throws Exception {
Map resultMap = new HashMap();
// 获取 ResultSet 的元数据
int columnCount = resultSet.getMetaData().getColumnCount();
// 遍历每一列,将列名和值存储到 Map 中
for (int i = 1; i B 的逻辑完成之后,就会将 B 的 List 丢到 Disruptor 的 ringBuffer 中等待消费。消费逻辑如下:

public void onEvent(RowObjectEvent rowObjectEvent, long sequence, boolean endOfBatch) {
// targetResult
List targetResult = (List) rowObjectEvent.getRowObject();
// 将 T 写到 目标库
Connection tc = null;
PreparedStatement pstm = null;
try {
// 下面即为获取连接,创建 prepareStatement 和执行
tc = this.targetDataSource.getConnection();
SqlTemplate sqlTemplate = new SqlTemplate(this.targetDataSource);
sqlTemplate.setObj(targetResult.get(0));
String sql = sqlTemplate.createBaseSql();
pstm = tc.prepareStatement(sql);
for (T item : targetResult) {
Object[] objects = sqlTemplate.createInsertSql(item);
for (int i = 1; i A -> B -> DB
这样的思路给出了每一步实现的代码,有兴趣的同学可以尝试自己实现一个数据同步组件。如果问题也欢迎交流。

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论