数据库热迁移(sqlserver pgsql)
在进行数据迁移前 我们要先清楚如何进行迁移对业务影响最小? 如何性能最高的进行迁移数据库? 迁移过程中会出现哪些问题。
如何进行迁移对业务影响最小
同步方式1
数据迁移的过程中,最简单暴力的方式直接导出数据库中所有的sqlserver语句,编写脚本,将千万数据量转换成pgsql语法,这种方式会产生很多的问题
缺点
同步方式2
编写业务进行同步
优点
如何性能最高的进行迁移数据库?
如上所示,我们首先采用第二种同步方式。编写合理的sql语句进行数据的同步,在批量请求数据的时候我们可能会遇到问题就是内存问题,如果数据量请求一旦过多就会导致数据库崩溃,cpu爆满,所以我们编写sql语句时,选择合理的索引。(比如深度分页问题 当你查询的数据靠后时,没有合适的索引就会造成全表扫描,导致数据库宕机)
当插入语句过多数据量庞大时, 造成B+树的深度过高,需要重复的扫描磁盘。所以再插入语句前为数据表建立合适的索引。
迁移过程中同步问题
在sqlserver中,有很种类型的日志,选择适合自己业务场景的日志进行使用,这里我们使用CT日志结合业务进行增量同步。(sql server 中还有cdc日志 记录了详细更改的数据),CT里面记录了原表中修改新增或删除的id 我们集体通过id找到原表中的数据,进行删除或者是修改新增。
开启CT
表开启CT
t_log 表名称
ALTER TABLE t_log
ENABLE CHANGE_TRACKING
查看某张表CT追踪
SELECT * FROM CHANGETABLE(CHANGES t_log, 0) as tbver
查询出来的字段有如下
//表示更改的版本号。每次更改都会递增版本号。
SYS_CHANGE_VERSION
//表示创建更改的版本号。对于插入操作,该字段的值等于 SYS_CHANGE_VERSION 的值;
//对于更新和删除操作,该字段的值表示最初创建更改的版本号
SYS_CHANGE_CREATION_VERSION
//表示更改的操作类型。可能的值包括 'I'(插入)、'U'(更新)和 'D'(删除)。
SYS_CHANGE_OPERATION
//表示发生更改的列的位图。每个位表示对应列是否发生了更改。
//如果对应位为 1,则表示该列发生了更改;如果对应位为 0,则表示该列没有发生更改
SYS_CHANGE_COLUMNS
//更改的上下文信息。该字段的值通常为空.
SYS_CHANGE_CONTEXT
//当前表主键
id
如下编写具体同步业务(数据层涉及内部安全不在此展示)
package biz.yuanbei.father;
import biz.yuanbei.dao.sqlserver.SqlSyncVersionDao;
import biz.yuanbei.framework.common.utils.SyncDataDto;
import biz.yuanbei.redis.dao.RedisDao;
import biz.yuanbei.redis.dao.RedisLockDao;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
@Slf4j
public abstract class DataSyncHelper {
@Autowired
protected SqlSyncVersionDao sqlSyncVersionDao;
@Resource
private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
protected String KEYVALUE;
public abstract void setVariable(String value);
//清除所有的缓存
public abstract void claneCache();
protected abstract T fetchById(int Id);
protected abstract int update(T elem);
protected abstract int removeById(T ele);
protected abstract int getCount(SyncDataDto syncData);
protected abstract List getList(SyncDataDto syncData);
protected abstract List convertTo(List dataList);
protected abstract void StopSyncThenSync();
protected abstract int[] insertBatch(List elems);
protected abstract int[] insertBatchs(List elems);
public void syncStart(SyncDataDto syncData, int pageSize) {
// 获取需要同步的数据总数
int count = getCount(syncData);
if (count