反射+jdbc实现orm框架 泛型加多数据源 实现sqlserver迁移pgsql

2023年 10月 14日 115.2k 0

反射+jdbc实现orm框架 泛型加多数据源 实现sqlserver迁移pgsql,关注同步过程,不在编写代码

在对公司数据库进行热迁移的时候,遇到了很多的问题,针对如下两个大问题分析解决问题。

问题1.需要编写大量类似代码(严重影响编写效率)

在进行数据迁移需要编写大量的类似代码,使用原生jdbc时,编写增删改查业务时, 编写查询的时候,需要指定返回的类型,编写新增修改时 需要指定出每一个字段,就导致需要编写大量的类似代码。为了解决这个问题定义出公共类,写一套orm框架

问题1.解决方案1:

为了避免编写类似的代码,最初的想法是替换一套orm框架,比如jpa ,mybatis-plus ,但是使用别人的orm框架,避免编写大量sql语句,但是会遇到数据源之间切换,可能代码会更加复杂,且别人的orm框架还是要遵守三层规范,虽然基础增删改查不用编写,数据持久层并没有帮我省去,还是要出复制粘贴,对其传输实体类进行修改,一张表进行迁移要写两套dao层,新旧表结构不同。

问题1.解决方案2

根据设计模式,编写一套自己的orm框架,彻底砍掉dao层,定义业务工具类关联dao抽象工具。 不在需要编写任何一行sql语句。 具体代码如下

public class PgsqlDaoBase {

    public  final static ConcurrentHashMap map = new ConcurrentHashMap();
    private String UPDATE="UPDATE";
    private String INSERT="INSERT";

    @Resource(name = SaasBusinessDsConfig.SaasBusinessDbNamedParameterJdbcTemplateName)
    protected NamedParameterJdbcTemplate namedParameterJdbcTemplate;

    protected int[] updateExistingRecords(List existingRecords ,String tableName) {
        Class entityClass = existingRecords.get(0).getClass();
        Field[] fields = entityClass.getDeclaredFields();
        List fieldNames = new ArrayList();
        String sql = "";
        if ( map.get(UPDATE +tableName) ==null ||  map.get(UPDATE+tableName).equals("")){
            for (Field field : fields) {
                fieldNames.add(field.getName());
            }
            StringBuilder sb = new StringBuilder();
            sb.append("UPDATE ").append(tableName).append(" SET ");
            for (String fieldName : fieldNames) {
                sb.append(fieldName).append(" = :").append(fieldName).append(", ");
            }
            sb.delete(sb.length() - 2, sb.length()); // 移除最后一个逗号和空格
            sb.append(" WHERE id = :id and accId = :accId");
            sql = sb.toString();
        }else{
            sql=map.get(UPDATE+tableName);
        }
        SqlParameterSource[] batchParams = SqlParameterSourceUtils.createBatch(existingRecords);
        return namedParameterJdbcTemplate.batchUpdate(sql, batchParams);
    }


    public int [] insertNewRecords(List newRecords,String name) {
        // 获取实体类的表名
        Class entityClass = newRecords.get(0).getClass();
        // 获取实体类的字段信息
        Field[] fields = entityClass.getDeclaredFields();
        List fieldNames = new ArrayList();
        String sql = "";
        if ( map.get(INSERT+name) ==null ||  map.get(INSERT+name).equals("")){

            for (Field field : fields) {
                fieldNames.add(field.getName());
            }
            StringBuilder sb = new StringBuilder();
            sb.append("INSERT INTO ").append(name).append(" (");
            for (String fieldName : fieldNames) {
                sb.append(fieldName).append(", ");
            }
            sb.delete(sb.length() - 2, sb.length());
            sb.append(") VALUES (");
            for (String fieldName : fieldNames) {
                sb.append(":").append(fieldName).append(", ");
            }
            sb.delete(sb.length() - 2, sb.length());
            sb.append(")");
        }else{
            sql=map.get(UPDATE+name);
        }
        // 执行插入操作
        SqlParameterSource[] batchParams = SqlParameterSourceUtils.createBatch(newRecords);
        return namedParameterJdbcTemplate.batchUpdate(sql, batchParams);
    }
    public boolean isRecordExists(Integer id ,String name) {
        String sql = "SELECT COUNT(id) FROM "+name+"  WHERE id = :id";
        Map paramMap = Collections.singletonMap("id", id);
        int count = namedParameterJdbcTemplate.queryForObject(sql, paramMap, Integer.class);
        return count > 0;
    }

    public int deleteBatch(List filteredIds,String name) {
        if (filteredIds == null || filteredIds.isEmpty()) {
            return 0;
        }
        String sql = "delete from "+name+" where id in (:ids)";
        SqlParameterSource params = new MapSqlParameterSource("ids", filteredIds);
        return namedParameterJdbcTemplate.update(sql, params);
    }
}

看到这里大家可能会觉得并没有省去持久层,这里只是定义了一个供所有数据库表增删改查的的工具

public abstract class BaseSyncService extends DataSyncHelper2 {

    protected Boolean isLock = true;

    @Autowired
    protected DiscounDaoBase payRecordDaos;

    @Autowired
    protected PageBaseMiddle payClassDao;
    //1.0 计算总页说
    //2.0 将页进行拆分,获取页中的数据List
    //3.0 将查询到的数据转换未目标对象
    //4.0 先创建数据源
    //4.1 再对插入数据源的数据进行编写sql
    public String NAMES;

    public abstract void setTableNAME();

    public String LOCK_KEY = NAMES + "-task-sync-lock";
    public String DATA_KEY = NAMES + "-data-task-sync";

    public String INCREMENT = NAMES + "-INCREMENT";

    public void unSync() {
        RedisLockDao.unlock(LOCK_KEY);
    }

    public void sync(SyncDataDto args, Class type) {
        syncStart(args, Constants.MAX_PAGE_SIZE, type, NAMES);
    }

    public void taskSync(Class type) {
        log.info("准备同步");
        if (!acquireLock(LOCK_KEY)) {
            log.info("同步失败,请清理缓存");
            return;
        }
        try {
            SyncDataDto syncDataDto = new SyncDataDto();
            long startId = syncDataDto.getStartId();
            long endId = syncDataDto.getEndId();
            // 进行同步操作,使用 startId 和 endId 进行同步
            // 更新同步状态
            String jsonStr = RedisDao.get(DATA_KEY);
            startId = JsonUtils.toObject(jsonStr == null ? "{}" : jsonStr, SyncDataDto.class).getStartId();
            endId = startId + Constants.MAX_PAGE_SIZE;

            Long pClassMaxId = payRecordDaos.getMaxId(NAMES);

            while (startId < pClassMaxId) {

                // 存储同步状态到 Redis
                syncDataDto.setStartId(startId);
                syncDataDto.setEndId(endId);

                sync(syncDataDto, type);

                startId += Constants.MAX_PAGE_SIZE;
                endId = startId + Constants.MAX_PAGE_SIZE;

                syncDataDto.setStartId(startId);
                syncDataDto.setEndId(endId);

                RedisDao.set(DATA_KEY, JsonUtils.toJsonStr(syncDataDto));

                log.info("同步进度 start {}  end {}", startId, endId);
            }
        } catch (Exception ex) {
            log.error(ex.getMessage(), ex);
        } finally {
            RedisLockDao.unlock(LOCK_KEY);
        }
    }

    @Override
    public void setVariable(String value) {
        this.KEYVALUE = this.LOCK_KEY;
    }

    @Override
    public void claneCache() {

        clearCache(LOCK_KEY, DATA_KEY, INCREMENT);
    }

    @Override
    protected V fetchById(int Id) {
        return null;
    }

    @Override
    protected int update(V elem) {
        return 0;
    }

    @Override
    protected int removeById(V ele) {
        return 0;
    }

    @Override
    protected int getCount(SyncDataDto syncData) {
        setVariable(LOCK_KEY);
        return payRecordDaos.counts(syncData, NAMES);
    }

    @Override
    protected List getList(SyncDataDto syncData, Class type) {
        return payRecordDaos.getListPage(syncData, NAMES, type);
    }

    @Override
    protected List convertTo(List dataList) {
        ArrayList objects = new ArrayList();
        for (V sqlServerEntity : dataList) {
            K entity = convertToMemberEntity(sqlServerEntity);
            if (entity != null) {
                objects.add(entity);
            }
        }
        return objects;
    }

    protected abstract K convertToMemberEntity(V sqlServerEntity);


    //取消增量
    @Override
    public void StopSyncThenSync() {
        isLock = false;
    }

    @Override
    protected int[] insertBatch(List elems, String name) {
        return payClassDao.insertBatch((List) elems, name);
    }

    @Override
    protected int[] insertBatchs(List elems, String name) {
        return payClassDao.insertNewRecords((List) elems, name);
    }

    @Override
    public void SyncThenSync(Class type) {

        if (RedisLockDao.lock(INCREMENT)) {
            try {
                //获取最大的版本号
                while (isLock) {
                    SqlSyncVersionEntity name = sqlSyncVersionDao.getName(INCREMENT);

                    int s = Integer.parseInt(name == null ? "0" : String.valueOf(name.getVersion()));
                    //查询到最大的版本进行保存
                    int maxversion = payRecordDaos.maxVersion(s - 1, NAMES);
                    if (maxversion != s) {
                        List lastChangeVersions = payRecordDaos.getLastChangeVersions(s - 1, NAMES, type);
                        //保存查询到的数据    //一次处理一千条数据
                        //再次进行一次转换
                        List memberEntities = convertTo(lastChangeVersions);
                        payClassDao.insertBatch(memberEntities, NAMES);
                        //删除过的数据永远没有id  只能通过最后一条日志进行删除
                        //删除     新表
                        List filteredIds = payRecordDaos.getDeleteIds(s - 1, NAMES);
                        payClassDao.deleteBatch(filteredIds, NAMES);
                        sqlSyncVersionDao.insertandupdateVersions(INCREMENT, maxversion);
                    }
                    Thread.sleep(5 * 60 * 1000);
                }
            } catch (Exception ex) {
                log.error(ex.getMessage(), ex);
            } finally {
                RedisLockDao.unlock(INCREMENT);
            }
        }
        log.error("正圯同步中.........");
    }
}

@Autowired 注入了一个泛型的的dao 这个dao层遇到不同的实体就会进行随意的切换,这里定义的两个抽象方法,需要自己的去实现 实体之间转换的操作 ,你想要把那个字段值的赋值给哪一个字段 ,填充数据库表的抽象 ,

最终成就:完成工具类后 ,就不是关注迁移代码的编写,而是迁移的过程中。
问题2 性能问题:

在上篇我们通过观察ct日志,进行数据同步的时候,几乎每时每刻都在查看ct中是否有数据,准备往新的数据库中同步,在一直监听ct的过程中原表数据在修改,此时一直监听会进行等待,导致查询ct超时,同步功能异常结束。

解决方式: 减少查询的频率,减少系统的压力。增加稳定性,减少资源消耗。

public List getListPage(SyncDataDto args, String name ,Class type ) {
    MapSqlParameterSource params = new MapSqlParameterSource();
    StringBuilder wheres = builderWheres(args, params);
    StringBuilder builder = new StringBuilder();
    builder.append(" SELECT * FROM( ")
            .append(" SELECT ROW_NUMBER() OVER(ORDER BY id ) as rowNumber, ")
            .append(" * ")
            .append(" FROM  " +name)
            .append(" WHERE 1=1 ")
            .append(wheres)
            .append(" ) t ")
            .append(" WHERE t.rowNumber >= :bgNumber AND t.rowNumber < :edNumber ");

对查询的条件进行优化,尽量分批查询,且一定要保证数据是顺序查询不丢失。

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论