✨这里是第七人格的博客✨小七,欢迎您的到来~✨
🍅系列专栏:【架构思想】🍅
✈️本篇内容: 百万数据导入导出✈️
🍱本篇收录完整代码地址:gitee.com/diqirenge/i…
楔子
在我们的日常开发中,导入导出的需求是经常见到的,特别是针对后台管理系统,那你平时是怎么完成这个需求的呢?接下来不同时空的小七,将会陪着你一步步思考完善这个功能。废话少说,开干!
开发环境
1、JDK 1.8 + MySQL 8.0
2、idea + maven
3、spring-boot + mybatis-plus + easyexcel
测试环境
基础框架搭建
首先搭建好我们的项目,读者可以通过分支名称找到当前章节的代码。
分支名称
230928-52javaee.com-Init
仓库地址
gitee.com/diqirenge/i…
分支描述
初始化项目
项目结构
代码实现
添加依赖
org.springframework.boot
spring-boot-parent
2.6.5
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-aop
com.baomidou
mybatis-plus-boot-starter
3.5.3.1
mysql
mysql-connector-java
com.alibaba
easyexcel
3.2.0
编写mvc3层结构
controller
@RestController
public class SalariesController {
}
domain
public class Salaries {
private Integer empNo;
private Integer salary;
private Date fromDate;
private Date toDate;
public Integer getEmpNo() {
return empNo;
}
public void setEmpNo(Integer empNo) {
this.empNo = empNo;
}
public Integer getSalary() {
return salary;
}
public void setSalary(Integer salary) {
this.salary = salary;
}
public Date getFromDate() {
return fromDate;
}
public void setFromDate(Date fromDate) {
this.fromDate = fromDate;
}
public Date getToDate() {
return toDate;
}
public void setToDate(Date toDate) {
this.toDate = toDate;
}
}
Mapper
@Mapper
public interface SalariesMapper extends BaseMapper {
}
service
@Service
public class ExportService {
}
@Service
public class ImportService {
}
添加时间统计切面
@Component
@Aspect
public class DurationAspect {
private static final Log logger = LogFactory.getLog(DurationAspect.class);
@Around("execution(public void com.run2code.controller.SalariesController.exportExcel*(..))")
public void exportExcel(ProceedingJoinPoint joinPoint) {
long startTime = System.nanoTime();
logger.info("开始导出:" + joinPoint.getSignature().getName());
try {
joinPoint.proceed();
} catch (Throwable e) {
throw new RuntimeException(e);
} finally {
Duration time = Duration.ofNanos(System.nanoTime() - startTime);
logger.info("导出结束,消耗了:" + time.getSeconds() + "s");
}
}
@Around("execution(public void com.run2code.controller.SalariesController.importExcel*(..))")
public void importExcel(ProceedingJoinPoint joinPoint) {
long startTime = System.nanoTime();
logger.info("开始导入:" + joinPoint.getSignature().getName());
try {
joinPoint.proceed();
} catch (Throwable e) {
throw new RuntimeException(e);
} finally {
Duration time = Duration.ofNanos(System.nanoTime() - startTime);
logger.info("导入结束,消耗了:" + time.getSeconds() + "s");
}
}
}
改造启动类
添加MybatisPlus分页插件
@SpringBootApplication
@EnableTransactionManagement
public class SalariesApplication {
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
// 添加分页拦截器
interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
return interceptor;
}
public static void main(String[] args) {
SpringApplication.run(SalariesApplication.class);
}
}
添加配置文件
spring:
servlet:
multipart:
max-request-size: 30MB
max-file-size: 1024MB
datasource:
username: root
password: 123456
url: jdbc:mysql://127.0.0.1:3306/employees?characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
main:
allow-circular-references: true
执行sql脚本
SET NAMES utf8mb4;
SET
FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for salaries
-- ----------------------------
DROP TABLE IF EXISTS `salaries`;
CREATE TABLE `salaries`
(
`emp_no` int(0) NOT NULL COMMENT '编号',
`salary` int(0) NULL DEFAULT NULL COMMENT '薪资',
`from_date` datetime(0) NULL DEFAULT NULL COMMENT '开始日期',
`to_date` datetime(0) NULL DEFAULT NULL COMMENT '结束日期'
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
测试
执行启动类SalariesApplication,输出以下代码表示基础框架搭建完成
Tomcat started on port(s): 8080 (http) with context path ''
导入功能实现
刚入职的小七
刚入职的小七,第一次接到需要,超级兴奋,看到项目中使用的是easyexcel,查询easyexcel的官方文档,知道了要对easyexcel读取的数据进行处理,只需要实现ReadListener接口,重写它的invoke和doAfterAllAnalysed方法就行了,那还等什么呢?开干!
分支名称
230928-52javaee.com-SingleThreadSingleInsert
仓库地址
gitee.com/diqirenge/i…
分支描述
一个简单的导入例子,一次性加载所有数据,单线程处理数据,单条插入
代码实现
添加薪资服务
@Service
public class SalariesService extends ServiceImpl implements IService {
}
添加薪资导入监听器
@Component
public class SalariesListener implements ReadListener{
@Resource
private SalariesService salariesService;
private static final Log logger = LogFactory.getLog(SalariesListener.class);
/**
* 处理数据的回调
* @param data
* @param context
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void invoke(Salaries data, AnalysisContext context) {
saveOne(data);
}
public void saveOne(Salaries data){
salariesService.save(data);
}
/**
* 数据处理完的回调
* @param context
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void doAfterAllAnalysed(AnalysisContext context) {
logger.info("所有数据处理完毕");
}
}
修改导入服务,增加导入方法
@Service
public class ImportService {
@Resource
private SalariesListener salariesListener;
public void importExcel(MultipartFile file) throws IOException {
EasyExcel.read(file.getInputStream(), Salaries.class, salariesListener).doReadAll();
}
}
测试
1、启动项目
2、打开postman录入以下信息
3、选择src/main/resources/xlsx下的52javaee_1000.xlsx文件
4、请求接口
小结
完成需求。1000条数据,5秒钟导入完毕(#^.^#)。
工作一年的小七
工作一年的小七,看着导入代码中一条一条的插入数据库,不经感叹:”真他娘的是个人才,我看看是谁写的“。结果第七人格几个大字跃然纸上,悄悄地,
这一坨必须要改掉o( ̄︶ ̄)o那还等什么?开干!
分支名称
230928-52javaee.com-SingleThreadBatchInsert
仓库地址
gitee.com/diqirenge/i…
分支描述
一次性加载所有数据,单线程处理数据,批量插入
代码实现
修改SalariesListener
/**
* 作为类的属性,我们需要他是线程安全的
*/
private CopyOnWriteArrayList salariesList = new CopyOnWriteArrayList();
private static final int batchSize = 1000;
/**
* 处理数据的回调
*
* @param data
* @param context
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void invoke(Salaries data, AnalysisContext context) {
salariesList.add(data);
if (salariesList.size() >= batchSize) {
saveBatch();
}
}
public void saveBatch() {
if (!salariesList.isEmpty()) {
// 批量保存
salariesService.saveBatch(salariesList, salariesList.size());
// 保存完了,需要清空这个集合,方便下次使用
salariesList.clear();
}
}
/**
* 数据处理完的回调
*
* @param context
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void doAfterAllAnalysed(AnalysisContext context) {
if (!salariesList.isEmpty()) {
salariesService.saveBatch(salariesList);
}
logger.info("所有数据处理完毕");
}
修改yml配置
spring:
servlet:
multipart:
max-request-size: 30MB
max-file-size: 1024MB
datasource:
username: root
password: 123456
#`rewriteBatchedStatements=true` 是 MySQL 数据库连接配置中的一个参数,用于控制是否启用批量语句重写。
#在默认情况下,MySQL 的批量语句处理方式是逐条执行 SQL 语句。然而,这可能会导致性能问题,特别是在处理大量数据时。因此,MySQL 提供了一种可以提高性能的批量处理方式,即批量语句重写。
#当 `rewriteBatchedStatements=true` 时,MySQL 会自动将多个 SQL 语句合并成一条语句,以提高执行效率。这可以显著提高数据库操作的性能,特别是在处理大量数据时。
url: jdbc:mysql://127.0.0.1:3306/employees?rewriteBatchedStatements=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
main:
allow-circular-references: true
测试
1、启动项目
2、打开postman录入以下信息
3、选择src/main/resources/xlsx下的52javaee_1000.xlsx文件
4、请求接口
小结
通过将单条插入,改为批量插入,减少了网络I/O开销,提升了执行效率。
注:新增数据库连接参数 rewriteBatchedStatements=true,是为了实现真正的批量插入,否则mybatis-plus提交到数据库的sql语句是不会被合并成批量插入的,也就是说就算你用了saveBatch方法,最终还是单条插入。
工作三年的小七
小七工作三年了,今天又被运营吐槽,导入太慢,正好最近加深学习了多线程的知识,仔细一想如果可以分批处理sheet页,那不就可以使用多线程了吗?速度不是杠杆的?那还等什么,开干!
分支名称
230928-52javaee.com-MultiThreadSheet_BatchInsert
仓库地址
gitee.com/diqirenge/i…
分支描述
多线程处理sheet页,单线程处理数据,批量插入
代码实现
修改ImportService,并发读取sheet页进行处理
private final ExecutorService executorService = Executors.newFixedThreadPool(20);
public void importExcel(MultipartFile file) throws IOException {
// 开20个线程分别处理20个sheet
List tasks = new ArrayList();
List readSheets = EasyExcelFactory.read(file.getInputStream()).build().excelExecutor().sheetList();
int size = readSheets.size();
for (int i = 0; i {
EasyExcel.read(file.getInputStream(), Salaries.class, salariesListener)
.sheet(num).doRead();
return null;
});
}
try {
executorService.invokeAll(tasks);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
修改SalariesListener
/**
* 作为类的属性,我们需要他是线程安全的
* 并且是针对20个sheet页,有20个线程,所以需要使用ThreadLocal,保证每个线程的salariesList都是独立的
*/
private ThreadLocal salariesList = ThreadLocal.withInitial(ArrayList::new);
private static final int batchSize = 1000;
/**
* 处理数据的回调
*
* @param data
* @param context
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void invoke(Salaries data, AnalysisContext context) {
salariesList.get().add(data);
if (salariesList.get().size() >= batchSize) {
saveBatch();
}
}
public void saveBatch() {
if (!salariesList.get().isEmpty()) {
// 批量保存
salariesService.saveBatch(salariesList.get(), salariesList.get().size());
// 保存完了,需要清空这个集合,方便下次使用
salariesList.get().clear();
}
}
测试
1、启动项目
2、打开postman录入以下信息
3、选择src/main/resources/xlsx下的52javaee_5w.xlsx文件
4、请求接口
小结
执行结果
多线程处理sheet页,单线程处理数据,批量插入
导入结束,消耗了:122s
一个一个处理sheet页,是串行的,处理完一个sheet页才能接着处理下一个;而现在我们使用多线程,那么同一时间我们可以处理多个sheet页。最终我们终于可以尝试导入5w条数据了。
工作五年的小七
小七工作五年了,老板的公司越做越大,现在运营爸爸需要导入100w数据,天啊,100w啊,1后面那么多0......嘿嘿,想想都兴奋呢,又有时间优化代码了,既然读sheet页是用的多线程,那么我们批量插入的时候是不是也可以使用多线程呢?那还等什么?开干!
分支名称
230928-52javaee.com-MultiThreadSheet_MultiThreadBatchInsert
仓库地址
gitee.com/diqirenge/i…
分支描述
多线程处理sheet页,多线程处理数据,批量插入
代码实现
修改SalariesListener
private final ExecutorService executorService = Executors.newFixedThreadPool(20);
public void saveBatch() {
if (!salariesList.get().isEmpty()) {
// 先从副本中copy一份数据出来
ArrayList salaries = (ArrayList) salariesList.get().clone();
// 并发执行
executorService.execute(new SaveTask(salaries, salariesService));
// 执行完了之后需要清空数据,方便下次使用
salariesList.get().clear();
}
}
static class SaveTask implements Runnable {
private List salariesList;
private SalariesService salariesService;
public SaveTask(List salariesList, SalariesService salariesService) {
this.salariesList = salariesList;
this.salariesService = salariesService;
}
@Override
public void run() {
salariesService.saveBatch(salariesList);
}
}
测试
1、启动项目
2、打开postman录入以下信息
3、选择src/main/resources/xlsx下的52javaee_100w.xlsx文件
4、请求接口
小结
经过分析,我们知道,批量插入也可以使用多线程处理,这种将串行变为并行处理的逻辑,是我们优化代码的一大帮手。这一次100w数据,30秒就解决了,运营爸爸很满意。
工作七年的小七
为了更简单方便的使用并发编程,我们将SalariesListener做成了单例的并且交给了Spring管理,还使用了一些线程安全的类,为了进一步提升效率,我们是不是可以考虑不把SalariesListener交给Spring管理,如果每一个都是new出来的新对象,那么也可以不再考虑共享变量的问题了~那还等什么呢?开干!
分支名称
230928-52javaee.com-Import_NoThreadSafety
仓库地址
gitee.com/diqirenge/i…
分支描述
多线程处理sheet页,多线程处理数据,批量插入,换一种思路解决并发安全问题
代码实现
修改SalariesListener
public class SalariesListener implements ReadListener {
private SalariesService salariesService;
/**
* 通过构造方法注入的方式,我们得到的SalariesService其实就是Spring的代理对象
*
* @param salariesService
*/
public SalariesListener(SalariesService salariesService) {
this.salariesService = salariesService;
}
private static final Log logger = LogFactory.getLog(SalariesListener.class);
/**
* 因为每一个SalariesListener都是我们自己new出来的,所以不存在并发安全问题,替换成ArrayList即可
*/
ArrayList salariesList = new ArrayList();
private final ExecutorService executorService = Executors.newFixedThreadPool(20);
private static final int batchSize = 10000;
/**
* 处理数据的回调
*
* @param data
* @param context
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void invoke(Salaries data, AnalysisContext context) {
salariesList.add(data);
if (salariesList.size() >= batchSize) {
saveBatch();
}
}
public void saveBatch() {
if (!salariesList.isEmpty()) {
// 先从副本中copy一份数据出来
ArrayList salaries = (ArrayList) salariesList.clone();
// 并发执行
executorService.execute(new SaveTask(salaries, salariesService));
// 执行完了之后需要清空数据,方便下次使用
salariesList.clear();
}
}
static class SaveTask implements Runnable {
private List salariesList;
private SalariesService salariesService;
public SaveTask(List salariesList, SalariesService salariesService) {
this.salariesList = salariesList;
this.salariesService = salariesService;
}
@Override
public void run() {
salariesService.saveBatch(salariesList);
}
}
/**
* 数据处理完的回调
*
* @param context
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void doAfterAllAnalysed(AnalysisContext context) {
if (!salariesList.isEmpty()) {
salariesService.saveBatch(salariesList);
}
logger.info("所有数据处理完毕");
}
}
修改ImportService
public class ImportService {
@Resource
private SalariesService salariesService;
private final ExecutorService executorService = Executors.newFixedThreadPool(20);
public void importExcel(MultipartFile file) throws IOException {
// 开20个线程分别处理20个sheet
List tasks = new ArrayList();
List readSheets = EasyExcelFactory.read(file.getInputStream()).build().excelExecutor().sheetList();
int size = readSheets.size();
for (int i = 0; i {
EasyExcel.read(file.getInputStream(), Salaries.class, new SalariesListener(salariesService))
.sheet(num).doRead();
return null;
});
}
try {
executorService.invokeAll(tasks);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
测试
1、启动项目
2、打开postman录入以下信息
3、选择src/main/resources/xlsx下的52javaee_100w.xlsx文件
4、请求接口
小结
以空间换时间,通过每次new SalariesListener的方式,解决了线程安全问题,避免了共享变量的使用。100w数据导入,进一步优化到11秒。
导出功能实现
针对导出功能,其实本质上和导入是一样的,就是能够使用多线程的地方就尝试使用多线程。那么导出有几个可以优化的点呢?我们列出以下思考过程
1、全量导出,一个sheet页
2、单线程分页查询,单线程写
3、多线程分页查询,单线程写
4、多线程分页查询,多线程写
因为easyExcel 不支持多线程写(github.com/alibaba/eas…,所以我们采用第三种方式进行实现。那还等什么呢?开干!
分支名称
231007-52javaee.com-Export
仓库地址
gitee.com/diqirenge/i…
分支描述
多线程分页查询,单线程写
代码实现
修改导出服务类ExportService
@Service
public class ExportService {
public static final String CONTENT_TYPE = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet";
@Resource
private SalariesMapper salariesMapper;
public void exportExcel(HttpServletResponse response) throws IOException, InterruptedException {
// 设置响应头
setResponseHeader(response);
// 多线程分页查询
Map pageMap = getIntegerPageMap();
// 导出
doExport(response, pageMap);
}
private static void setResponseHeader(HttpServletResponse response) {
response.setContentType(CONTENT_TYPE);
response.setCharacterEncoding(StandardCharsets.UTF_8.name());
response.setHeader("Content-disposition", "attachment;filename*=utf-8''" + "52javaee.com_export_100w.xlsx");
}
private static void doExport(HttpServletResponse response, Map pageMap) throws IOException {
// 导出,导出为什么不使用多线程呢?因为easyExcel 没有提供这样的能力
// https://github.com/alibaba/easyexcel/issues/1040
try (ExcelWriter excelWriter = EasyExcelFactory.write(response.getOutputStream(), Salaries.class).build()) {
for (Map.Entry entry : pageMap.entrySet()) {
Integer num = entry.getKey();
Page salariesPage = entry.getValue();
WriteSheet writeSheet = EasyExcelFactory.writerSheet(num, "sheet-" + num).build();
excelWriter.write(salariesPage.getRecords(), writeSheet);
}
}
}
private Map getIntegerPageMap() throws InterruptedException {
Long count = salariesMapper.selectCount(null);
Integer pages = 20;
Long size = count / pages;
ExecutorService executorService = Executors.newFixedThreadPool(pages);
CountDownLatch countDownLatch = new CountDownLatch(pages);
// 这里直接放到内存中了,生产的话需要评估数据量的大小会不会造成内存溢出
Map pageMap = new HashMap();
for (int i = 0; i {
Page page = new Page();
page.setCurrent(finalI + 1);
page.setSize(size);
Page selectPage = salariesMapper.selectPage(page, null);
pageMap.put(finalI, selectPage);
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
return pageMap;
}
}
修改SalariesController,增加导出服务
/**
* 导出服务
*/
@Resource
private ExportService exportService;
/**
* 导出excel
*
* @param response
* @throws IOException
* @throws InterruptedException
*/
@GetMapping("export")
public void exportExcel(HttpServletResponse response) throws IOException, InterruptedException {
exportService.exportExcel(response);
}
测试
1、启动项目
2、浏览器输入 http://localhost:8080/export
小结
经过大脑思考的代码,一百万数据导出,仅耗时21s
导出结束,消耗了:21s
总结
本文提供了百万级数据导入/导出的高效架构思路,从0到1实现了代码的架构演进。其关键思路总结如下:
1、减少网络I/O开销
2、找到能使用并发编程的点
3、减少共享变量的使用
本文只提供一个技术思路,因个人能力有限,如有错误和不足,望请指正,谢谢~
附录