✅实现百万级数据从Excel导入到数据库的方式

2024年 4月 12日 117.3k 0

高手回答

场景分析

这个案例实际上涉及到多个方面,需要我们系统地分析。让我们首先看看,从Excel中读取百万级数据并将其插入数据库时可能遇到的问题:

  • 内存溢出风险
  • 加载如此庞大的Excel数据可能导致内存溢出,需要注意内存管理。

  • 性能瓶颈
  • 处理百万级数据的读取和插入操作可能很耗时,性能优化至关重要。

  • 异常处理策略
  • 读取和导入过程中会有各种潜在问题,我们需妥善处理各类异常情况。

    内存溢出问题

    处理百万级数据,直接加载到内存中显然不现实。解决之道在于采用流式读取,分批处理数据。

    在技术选型上,选择EasyExcel是明智之举。它专为处理大数据量和复杂Excel文件进行了优化。EasyExcel在解析Excel时,不会将整个文件一次性加载到内存中,而是按行从磁盘逐个读取数据并解析。

    性能问题

    针对百万级数据的处理,单线程显然效率低下。提升性能的关键在于多线程处理。

    多线程应用涉及两个场景:一是多线程读取文件,另一个是多线程实现数据插入。这涉及到生产者-消费者模式,多线程读取并多线程插入,以最大程度提升整体性能。

    在数据插入方面,除了利用多线程,还应当结合数据库的批量插入功能以进一步提升速度。

    错误处理

    在文件读取和数据库写入过程中,可能遇到诸多问题,如数据格式错误、不一致性和重复数据等。

    因此,应分两步处理。首先进行数据检查,在插入操作前检查数据格式等问题,然后在插入过程中处理异常情况。

    处理方式多种多样,可通过事务回滚或记录日志。一般不推荐直接回滚操作,而是自动重试,若尝试多次仍无效,则记录日志,随后重新插入数据。

    此外,在这一过程中,需考虑数据重复问题,可在Excel中设定若干字段为数据库唯一约束。遇到数据冲突时,可覆盖、跳过或报错处理。根据实际业务情况选择合适的处理方式,一般情况下,跳过并记录日志是相对合理的选择。

    解决思路

    所以,总体方案如下:

    利用EasyExcel进行Excel数据读取,因其逐行读取数据而非一次性加载整个文件至内存。为提高并发效率,将百万级数据分布在不同的工作表中,利用线程池和多线程同时读取各个工作表。在读取过程中,借助EasyExcel的ReadListener进行数据处理。

    在处理过程中,并非每条数据都直接操作数据库,以免对数据库造成过大压力。设定一个批次大小,例如每1000条数据,将从Excel中读取的数据临时存储在内存中(可使用List实现)。每读取1000条数据后,执行数据的批量插入操作,可简单地借助mybatis实现批量插入。

    此外,在处理过程中,需要考虑并发问题,因此我们将使用线程安全的队列来存储内存中的临时数据,如ConcurrentLinkedQueue。

    经验证,通过上述方案,读取并插入100万条数据的Excel所需时间约为100秒,不超过2分钟。

    具体实现

    为了提升并发处理能力,我们将百万级数据存储在同一个Excel文件的不同工作表中,然后通过EasyExcel并发地读取这些工作表数据。

    EasyExcel提供了ReadListener接口,允许在每批数据读取后进行自定义处理。我们可以基于这一功能实现文件的分批读取。

    pom依赖

    首先,需要添加以下依赖:

    <dependencies>
        <!-- EasyExcel -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>easyexcel</artifactId>
            <version>latest_version</version>
        </dependency>
    
        <!-- 数据库连接和线程池 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
    </dependencies>
    

    并发读取多个sheet

    然后实现并发读取多个sheet的代码:

    @Service
    public class ExcelImporterService {
    
        @Autowired
        private MyDataService myDataService;
        
        public void doImport() {
            // Excel文件的路径
            String filePath = "users/paidaxing/workspace/excel/test.xlsx";
    
            // 需要读取的sheet数量
            int numberOfSheets = 20;
    
            // 创建一个固定大小的线程池,大小与sheet数量相同
            ExecutorService executor = Executors.newFixedThreadPool(numberOfSheets);
    
            // 遍历所有sheets
            for (int sheetNo = 0; sheetNo < numberOfSheets; sheetNo++) {
                // 在Java lambda表达式中使用的变量需要是final
                int finalSheetNo = sheetNo;
    
                // 向线程池提交一个任务
                executor.submit(() -> {
                    // 使用EasyExcel读取指定的sheet
                    EasyExcel.read(filePath, MyDataModel.class, new MyDataModelListener(myDataService))
                             .sheet(finalSheetNo) // 指定sheet号
                             .doRead(); // 开始读取操作
                });
            }
    
            // 启动线程池的关闭序列
      executor.shutdown();
    
            // 等待所有任务完成,或者在等待超时前被中断
            try {
                executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
            } catch (InterruptedException e) {
                // 如果等待过程中线程被中断,打印异常信息
                e.printStackTrace();
            }
        }
    }
    

    这段代码通过创建一个固定大小的线程池来并发读取一个包含多个sheets的Excel文件。每个sheet的读取作为一个单独的任务提交给线程池。

    我们在代码中用了一个MyDataModelListener,这个类是ReadListener的一个实现类。当EasyExcel读取每一行数据时,它会自动调用我们传入的这个ReadListener实例的invoke方法。在这个方法中,我们就可以定义如何处理这些数据。

    MyDataModelListener还包含doAfterAllAnalysed方法,这个方法在所有数据都读取完毕后被调用。这里可以执行一些清理工作,或处理剩余的数据。

    ReadListener

    接下来,我们来实现这个我们的ReadListener:

    import com.alibaba.excel.context.AnalysisContext;
    import com.alibaba.excel.read.listener.ReadListener;
    import org.springframework.transaction.annotation.Transactional;
    import java.util.ArrayList;
    import java.util.List;
    
    // 自定义的ReadListener,用于处理从Excel读取的数据
    public class MyDataModelListener implements ReadListener<MyDataModel> {
        // 设置批量处理的数据大小
        private static final int BATCH_SIZE = 1000;
        // 用于暂存读取的数据,直到达到批量大小
        private List<MyDataModel> batch = new ArrayList<>();
    
        
        private MyDataService myDataService;
    
        // 构造函数,注入MyBatis的Mapper
        public MyDataModelListener(MyDataService myDataService) {
            this.myDataService = myDataService;
        }
    
        // 每读取一行数据都会调用此方法
        @Override
        public void invoke(MyDataModel data, AnalysisContext context) {
            //检查数据的合法性及有效性
            if (validateData(data)) {
                //有效数据添加到list中
                batch.add(data);
            } else {
                // 处理无效数据,例如记录日志或跳过
            }
            
            // 当达到批量大小时,处理这批数据
            if (batch.size() >= BATCH_SIZE) {
                processBatch();
            }
        }
    
        
        private boolean validateData(MyDataModel data) {
            // 调用mapper方法来检查数据库中是否已存在该数据
            int count = myDataService.countByColumn1(data.getColumn1());
            // 如果count为0,表示数据不存在,返回true;否则返回false
            if(count == 0){
             return true;
            }
            
            // 在这里实现数据验证逻辑
            return false;
        }
    
    
        // 所有数据读取完成后调用此方法
        @Override
        public void doAfterAllAnalysed(AnalysisContext context) {
            // 如果还有未处理的数据,进行处理
            if (!batch.isEmpty()) {
                processBatch();
            }
        }
    
        // 处理一批数据的方法
        private void processBatch() {
            int retryCount = 0;
            // 重试逻辑
            while (retryCount < 3) {
                try {
                    // 尝试批量插入
                    myDataService.batchInsert(batch);
                    // 清空批量数据,以便下一次批量处理
                    batch.clear();
                    break;
                } catch (Exception e) {
                    // 重试计数增加
                    retryCount++;
                    // 如果重试3次都失败,记录错误日志
                    if (retryCount >= 3) {
                        logError(e, batch);
                    }
    }
    

    通过自定义MyDataModelListener,在读取Excel文件过程中可实现数据处理。每读取一条数据后,将其加入列表,在列表累积达到1000条时,执行一次数据库批量插入操作。若插入失败,则进行重试;若多次尝试仍失败,则记录错误日志。

    批量插入

    这里批量插入,用到了MyBatis的批量插入,代码实现如下:

    import org.apache.ibatis.annotations.Mapper;
    import java.util.List;
    
    @Mapper
    public interface MyDataMapper {
        void batchInsert(List<MyDataModel> dataList);
    
        int countByColumn1(String column1);
    }
    

    mapper.xml文件:

    <insert id="batchInsert" parameterType="list">
        INSERT INTO paidaxing_test_table_name (column1, column2, ...)
        VALUES 
        <foreach collection="list" item="item" index="index" separator=",">
            (#{item.column1}, #{item.column2}, ...)
        </foreach>
    </insert>
    
    <select id="countByColumn1" resultType="int">
        SELECT COUNT(*) FROM your_table WHERE column1 = #{column1}
    </select>
    

    如有问题,欢迎加微信交流:w714771310,备注- 技术交流  。或微信搜索【码上遇见你】。

    免费的Chat GPT可微信搜索【AI贝塔】进行体验,无限使用。

    好了,本章节到此告一段落。希望对你有所帮助,祝学习顺利。

    相关文章

    Oracle如何使用授予和撤销权限的语法和示例
    Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
    下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
    社区版oceanbase安装
    Oracle 导出CSV工具-sqluldr2
    ETL数据集成丨快速将MySQL数据迁移至Doris数据库

    发布评论