优化 EasyExcel 将 Excel 中海量的数据存入 Mysql

2023年 10月 10日 82.2k 0

优化 EasyExcel 将 Excel 中海量的数据存入 Mysql

正常的Excel数据导入Mysql暂且不讲,本文主要讲的是关于效率的优化。

常用导入方式的缺点

一般我们会一次性读取Excel中的所有数据,循环塞入默认值(比如 create_user 字段等),然后调用 mybatis_plus 中的 saveBatch(List entityList) 方法存入 mysql。如果Excel中的数据量过大会遇到以下几个问题:

  • 数据量过大,内存溢出
  • 全部读取后,循环塞入默认值,再向mysql存入,耗时较大
  • mybatis_plus 中的方法 saveBatch 实际上也是一条一条存入,只不过是在一个Sqlsession 里面一次性提交,数据量过大也很慢。
  • 解决方案

    针对以上缺点,我们从以下几个方面优化

    • Excel数据分片读取(每次读取1000、2048.. 选择一个适合自己机器的数量)
    • 线程池异步处理数据
    • Mybatis-Plus批量存储优化(不用saveBatch, 自己手写批量插入)

    实现过程如下

  • 配置线程池用于异步处理数据
  • package com.puffer.user.config;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.task.TaskExecutor;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    /**
     * 线程池配置
     * @author puffer
     * @date 2023-10-08
     */
    @Configuration
    public class ThreadPoolConfig {
        @Bean
        public TaskExecutor taskExecutor() {
    
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    
            // 设置核心线程数
            executor.setCorePoolSize(8);
    
            // 设置最大线程数,当核心线程数满了且队列满了,会创建新的线程,直到达到最大线程数。而当前队列是无界队列,所以不会满,此处设置无效。直接使用默认值,无限大
            //executor.setMaxPoolSize(16);
    
            // 设置队列容量,默认为Integer.MAX_VALUE
            //executor.setQueueCapacity(10000);
    
            // 设置线程活跃时间(秒)
            executor.setKeepAliveSeconds(60);
    
            // 设置默认线程名称
            executor.setThreadNamePrefix("puffer-thread-");
    
            // 设置拒绝策略。无界队列不会出现满的情况,所以不配置存储策略。直接抛出 RejectedExecutionException 异常
            //executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    
            // 等待所有任务结束后再关闭线程池
            executor.setWaitForTasksToCompleteOnShutdown(true);
    
            return executor;
        }
    }
    
  • 配置 EasyPoi 的 ReadListener 用于分片读取数据
  • 
    import cn.hutool.core.collection.CollectionUtil;
    import com.alibaba.excel.context.AnalysisContext;
    import com.alibaba.excel.read.listener.ReadListener;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.function.Consumer;
    
    /**
     * @author puffer
     * @date 2023-10-08
     */
    @Slf4j
    public class ExcelReadLister implements ReadListener {
    
        /**
         * 单个处理数据量,经测试1000条数据效果较好(最好在自己的电脑上试一下,我电脑是2048比较快)
         */
        public static int BATCH_COUNT = 2048;
    
        /**
         * 数据的临时存储
         */
        private List tempDataList = new ArrayList(BATCH_COUNT);
    
        /**
         * consumer
         */
        private final Consumer consumer;
    
        public ExcelReadLister(Consumer consumer) {
            this.consumer = consumer;
        }
    
        @Override
        public void invoke(T data, AnalysisContext context) {
            tempDataList.add(data);
            if (tempDataList.size() >= BATCH_COUNT) {
                log.info("读取数据量:{}", tempDataList.size());
                consumer.accept(tempDataList);
                tempDataList = new ArrayList(BATCH_COUNT);
            }
        }
    
        /**
         * 所有数据读取完成之后调用
         *
         * @param context
         */
        @Override
        public void doAfterAllAnalysed(AnalysisContext context) {
            if (CollectionUtil.isNotEmpty(tempDataList)) {
                //处理剩余的数据
                log.info("读取数据量:{}", tempDataList.size());
                consumer.accept(tempDataList);
            }
        }
    }
    3. controller层代码
    ``` ```
    package com.puffer.user.controller;
    
    import com.puffer.user.service.UserService;
    import com.puffer.user.vo.ExcelImportResultVo;
    import jakarta.annotation.Resource;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.web.bind.annotation.*;
    import org.springframework.web.multipart.MultipartFile;
    
    import java.net.http.HttpResponse;
    
    /**
     * 用户 Api
     *
     * @author puffer
     * @date 2023-10-06
     */
    @RestController
    @RequestMapping(value = "/users")
    @Slf4j
    public class UserController {
    
        @Resource
        private UserService userService;
    
        @PostMapping(value = "/importUserList")
        public ExcelImportResultVo importUserList(@RequestBody MultipartFile file) {
            long filesize = file.getSize();
            log.info("导入文件大小:{} k", filesize / 1024);
            return userService.importUserList(file);
        }
    
    }
    
  • service层代码
  • @Override
    @Transactional(rollbackFor = Exception.class)
    public ExcelImportResultVo importUserList(MultipartFile file) {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        ExcelImportResultVo resultVo = new ExcelImportResultVo();
        try {
            resultVo = this.readExcelAndSaveAsync(UserDto.class, file, data -> {
                // 导入的账号明文密码加密
                data.setUserPassword(MD5Utils.encode(data.getUserPassword()));
                return data;
            }, userMapper::insertList);
        } catch (Exception e) {
            log.error("导入失败原因:{}", e.getMessage(), e);
        }
    
        stopWatch.stop();
        resultVo.setCost(stopWatch.getLastTaskTimeMillis());
        return resultVo;
    }
    
    /**
     * 读取Excel并保存
     *
     * @param head       要导入的实体类型
     * @param file       文件
     * @param function   操作函数
     * @param dbFunction 数据库操作方法
     * @param         实体
     * @param         方法
     * @return 导入的结果
     * @throws IOException
     * @throws ExecutionException
     * @throws InterruptedException
     */
    private  ExcelImportResultVo readExcelAndSaveAsync(Class head, MultipartFile file, Function function, Function dbFunction) throws IOException, ExecutionException, InterruptedException {
    
        // 导入成功的数据数量
        int successCount = 0;
    
        // 导入失败的数据
        int failCount = 0;
    
        // 存储异步线程的执行结果
        Collection futures = new ArrayList();
    
        EasyExcel.read(file.getInputStream(), head, new ExcelReadLister(dataList -> {
            // 转换DO,并设置数据源id
            List list = dataList.parallelStream().map(function).collect(Collectors.toList());
            // 异步批量插入
            futures.add(saveAsyncBatch(list, dbFunction));
        })).sheet().doRead();
    
        // 等待异步线程执行完毕
        for (Future future : futures) {
            int[] counts = future.get();
            successCount += counts[0];
            failCount += counts[1];
        }
        log.info("存储成功总数据量:{},存储失败总数据量:{}", successCount, failCount);
        return ExcelImportResultVo.builder().successCount(successCount).failCount(failCount).build();
    }
    
    /**
     * 批量插入
     *
     * @param list       要分批处理的数据
     * @param dbFunction 数据库操作的方法
     * @param         数据库实体类
     * @return 返回处理结果
     */
    @Async
    public  Future saveAsyncBatch(List list, Function dbFunction) {
        int size = list.size();
        int[] result = new int[2];
        log.info("saveAsyncBatch当前数据分片大小 size:{}", size);
        try {
            if (dbFunction.apply(list) > 0) {
                result[0] = size;
                log.info("{} 分片存储数据成功,数据量:{}", Thread.currentThread().getName(), size);
            } else {
                result[1] = size;
                log.info("{} 分片存储数据失败:{}", Thread.currentThread().getName(), size);
            }
        } catch (Exception e) {
            result[1] = size;
            log.error("{} 分片存储数据出现异常,{}", Thread.currentThread().getName(), e.getMessage());
        }
    
        return new AsyncResult(result);
    }
    
  • mapper层代码(弃用saveBatch,手写sql insertList语句)
  •     
    int insertList(@Param("userList") List userList);
    
    
        insert into t_user(user_name, user_password, user_birthday)
        values
        
            (#{item.userName}, #{item.userPassword}, #{item.userBirthday})
        
    
    
  • vo
  • package com.puffer.user.vo;
    
    import lombok.*;
    
    /**
    * excel数据导入结果
    * @author puffer
    * @date 2023-10-08
    */
    @Getter
    @Setter
    @NoArgsConstructor
    @AllArgsConstructor
    @Builder
    public class ExcelImportResultVo {
    
       /**
        * 成功数量
        */
       private Integer successCount;
    
       /**
        * 失败数量
        */
       private Integer failCount;
    
       /**
        * 耗费时间 单位:ms
        */
       private Long cost;
    }
    

    针对结果简单测试

    三个字段,7w+条数据,2.7s,以此估算 100w 数据大概 40s
    image.png

    本文借鉴文章(原文写的比我写的清晰很多)

    blog.xgblack.cn/spring-boot…

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论