优化 EasyExcel 将 Excel 中海量的数据存入 Mysql
正常的Excel数据导入Mysql暂且不讲,本文主要讲的是关于效率的优化。
常用导入方式的缺点
一般我们会一次性读取Excel中的所有数据,循环塞入默认值(比如 create_user 字段等),然后调用 mybatis_plus 中的 saveBatch(List entityList) 方法存入 mysql。如果Excel中的数据量过大会遇到以下几个问题:
解决方案
针对以上缺点,我们从以下几个方面优化
- Excel数据分片读取(每次读取1000、2048.. 选择一个适合自己机器的数量)
- 线程池异步处理数据
- Mybatis-Plus批量存储优化(不用saveBatch, 自己手写批量插入)
实现过程如下
package com.puffer.user.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* 线程池配置
* @author puffer
* @date 2023-10-08
*/
@Configuration
public class ThreadPoolConfig {
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(8);
// 设置最大线程数,当核心线程数满了且队列满了,会创建新的线程,直到达到最大线程数。而当前队列是无界队列,所以不会满,此处设置无效。直接使用默认值,无限大
//executor.setMaxPoolSize(16);
// 设置队列容量,默认为Integer.MAX_VALUE
//executor.setQueueCapacity(10000);
// 设置线程活跃时间(秒)
executor.setKeepAliveSeconds(60);
// 设置默认线程名称
executor.setThreadNamePrefix("puffer-thread-");
// 设置拒绝策略。无界队列不会出现满的情况,所以不配置存储策略。直接抛出 RejectedExecutionException 异常
//executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.read.listener.ReadListener;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
/**
* @author puffer
* @date 2023-10-08
*/
@Slf4j
public class ExcelReadLister implements ReadListener {
/**
* 单个处理数据量,经测试1000条数据效果较好(最好在自己的电脑上试一下,我电脑是2048比较快)
*/
public static int BATCH_COUNT = 2048;
/**
* 数据的临时存储
*/
private List tempDataList = new ArrayList(BATCH_COUNT);
/**
* consumer
*/
private final Consumer consumer;
public ExcelReadLister(Consumer consumer) {
this.consumer = consumer;
}
@Override
public void invoke(T data, AnalysisContext context) {
tempDataList.add(data);
if (tempDataList.size() >= BATCH_COUNT) {
log.info("读取数据量:{}", tempDataList.size());
consumer.accept(tempDataList);
tempDataList = new ArrayList(BATCH_COUNT);
}
}
/**
* 所有数据读取完成之后调用
*
* @param context
*/
@Override
public void doAfterAllAnalysed(AnalysisContext context) {
if (CollectionUtil.isNotEmpty(tempDataList)) {
//处理剩余的数据
log.info("读取数据量:{}", tempDataList.size());
consumer.accept(tempDataList);
}
}
}
3. controller层代码
``` ```
package com.puffer.user.controller;
import com.puffer.user.service.UserService;
import com.puffer.user.vo.ExcelImportResultVo;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import java.net.http.HttpResponse;
/**
* 用户 Api
*
* @author puffer
* @date 2023-10-06
*/
@RestController
@RequestMapping(value = "/users")
@Slf4j
public class UserController {
@Resource
private UserService userService;
@PostMapping(value = "/importUserList")
public ExcelImportResultVo importUserList(@RequestBody MultipartFile file) {
long filesize = file.getSize();
log.info("导入文件大小:{} k", filesize / 1024);
return userService.importUserList(file);
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public ExcelImportResultVo importUserList(MultipartFile file) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
ExcelImportResultVo resultVo = new ExcelImportResultVo();
try {
resultVo = this.readExcelAndSaveAsync(UserDto.class, file, data -> {
// 导入的账号明文密码加密
data.setUserPassword(MD5Utils.encode(data.getUserPassword()));
return data;
}, userMapper::insertList);
} catch (Exception e) {
log.error("导入失败原因:{}", e.getMessage(), e);
}
stopWatch.stop();
resultVo.setCost(stopWatch.getLastTaskTimeMillis());
return resultVo;
}
/**
* 读取Excel并保存
*
* @param head 要导入的实体类型
* @param file 文件
* @param function 操作函数
* @param dbFunction 数据库操作方法
* @param 实体
* @param 方法
* @return 导入的结果
* @throws IOException
* @throws ExecutionException
* @throws InterruptedException
*/
private ExcelImportResultVo readExcelAndSaveAsync(Class head, MultipartFile file, Function function, Function dbFunction) throws IOException, ExecutionException, InterruptedException {
// 导入成功的数据数量
int successCount = 0;
// 导入失败的数据
int failCount = 0;
// 存储异步线程的执行结果
Collection futures = new ArrayList();
EasyExcel.read(file.getInputStream(), head, new ExcelReadLister(dataList -> {
// 转换DO,并设置数据源id
List list = dataList.parallelStream().map(function).collect(Collectors.toList());
// 异步批量插入
futures.add(saveAsyncBatch(list, dbFunction));
})).sheet().doRead();
// 等待异步线程执行完毕
for (Future future : futures) {
int[] counts = future.get();
successCount += counts[0];
failCount += counts[1];
}
log.info("存储成功总数据量:{},存储失败总数据量:{}", successCount, failCount);
return ExcelImportResultVo.builder().successCount(successCount).failCount(failCount).build();
}
/**
* 批量插入
*
* @param list 要分批处理的数据
* @param dbFunction 数据库操作的方法
* @param 数据库实体类
* @return 返回处理结果
*/
@Async
public Future saveAsyncBatch(List list, Function dbFunction) {
int size = list.size();
int[] result = new int[2];
log.info("saveAsyncBatch当前数据分片大小 size:{}", size);
try {
if (dbFunction.apply(list) > 0) {
result[0] = size;
log.info("{} 分片存储数据成功,数据量:{}", Thread.currentThread().getName(), size);
} else {
result[1] = size;
log.info("{} 分片存储数据失败:{}", Thread.currentThread().getName(), size);
}
} catch (Exception e) {
result[1] = size;
log.error("{} 分片存储数据出现异常,{}", Thread.currentThread().getName(), e.getMessage());
}
return new AsyncResult(result);
}
int insertList(@Param("userList") List userList);
insert into t_user(user_name, user_password, user_birthday)
values
(#{item.userName}, #{item.userPassword}, #{item.userBirthday})
package com.puffer.user.vo;
import lombok.*;
/**
* excel数据导入结果
* @author puffer
* @date 2023-10-08
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class ExcelImportResultVo {
/**
* 成功数量
*/
private Integer successCount;
/**
* 失败数量
*/
private Integer failCount;
/**
* 耗费时间 单位:ms
*/
private Long cost;
}
针对结果简单测试
三个字段,7w+条数据,2.7s,以此估算 100w 数据大概 40s
本文借鉴文章(原文写的比我写的清晰很多)
blog.xgblack.cn/spring-boot…