告别数据困扰:百万级数据导入导出的高效架构思路以及代码示例

2023年 10月 8日 32.5k 0

✨这里是第七人格的博客✨小七,欢迎您的到来~✨

🍅系列专栏:【架构思想】🍅

✈️本篇内容: 百万数据导入导出✈️

🍱本篇收录完整代码地址:gitee.com/diqirenge/i…

楔子

在我们的日常开发中,导入导出的需求是经常见到的,特别是针对后台管理系统,那你平时是怎么完成这个需求的呢?接下来不同时空的小七,将会陪着你一步步思考完善这个功能。废话少说,开干!

开发环境

1、JDK 1.8 + MySQL 8.0

2、idea + maven

3、spring-boot + mybatis-plus + easyexcel

测试环境

image-20231007151758118.png

基础框架搭建

首先搭建好我们的项目,读者可以通过分支名称找到当前章节的代码。

分支名称

230928-52javaee.com-Init

仓库地址

gitee.com/diqirenge/i…

分支描述

初始化项目

项目结构

项目结构

代码实现

添加依赖


    org.springframework.boot
    spring-boot-parent
    2.6.5



    
        org.springframework.boot
        spring-boot-starter-web
    

    
        org.springframework.boot
        spring-boot-starter-aop
    

    
        com.baomidou
        mybatis-plus-boot-starter
        3.5.3.1
    

    
        mysql
        mysql-connector-java
    

    
        com.alibaba
        easyexcel
        3.2.0
    

编写mvc3层结构

controller

@RestController
public class SalariesController {

}

domain

public class Salaries {

    private Integer empNo;
    private Integer salary;
    private Date fromDate;
    private Date toDate;

    public Integer getEmpNo() {
        return empNo;
    }

    public void setEmpNo(Integer empNo) {
        this.empNo = empNo;
    }

    public Integer getSalary() {
        return salary;
    }

    public void setSalary(Integer salary) {
        this.salary = salary;
    }

    public Date getFromDate() {
        return fromDate;
    }

    public void setFromDate(Date fromDate) {
        this.fromDate = fromDate;
    }

    public Date getToDate() {
        return toDate;
    }

    public void setToDate(Date toDate) {
        this.toDate = toDate;
    }
}

Mapper

@Mapper
public interface SalariesMapper extends BaseMapper {

}

service

@Service
public class ExportService {


}
@Service
public class ImportService {


}

添加时间统计切面

@Component
@Aspect
public class DurationAspect {

    private static final Log logger = LogFactory.getLog(DurationAspect.class);

    @Around("execution(public void com.run2code.controller.SalariesController.exportExcel*(..))")
    public void exportExcel(ProceedingJoinPoint joinPoint) {
        long startTime = System.nanoTime();
        logger.info("开始导出:" + joinPoint.getSignature().getName());
        try {
            joinPoint.proceed();
        } catch (Throwable e) {
            throw new RuntimeException(e);
        } finally {
            Duration time = Duration.ofNanos(System.nanoTime() - startTime);
            logger.info("导出结束,消耗了:" + time.getSeconds() + "s");
        }

    }

    @Around("execution(public void com.run2code.controller.SalariesController.importExcel*(..))")
    public void importExcel(ProceedingJoinPoint joinPoint) {
        long startTime = System.nanoTime();
        logger.info("开始导入:" + joinPoint.getSignature().getName());
        try {
            joinPoint.proceed();
        } catch (Throwable e) {
            throw new RuntimeException(e);
        } finally {
            Duration time = Duration.ofNanos(System.nanoTime() - startTime);
            logger.info("导入结束,消耗了:" + time.getSeconds() + "s");
        }

    }
}

改造启动类

添加MybatisPlus分页插件

@SpringBootApplication
@EnableTransactionManagement
public class SalariesApplication {

    @Bean
    public MybatisPlusInterceptor mybatisPlusInterceptor() {
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        // 添加分页拦截器
        interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
        return interceptor;
    }

    public static void main(String[] args) {
        SpringApplication.run(SalariesApplication.class);
    }
}

添加配置文件

spring:
  servlet:
    multipart:
      max-request-size: 30MB
      max-file-size: 1024MB
  datasource:
    username: root
    password: 123456
    url: jdbc:mysql://127.0.0.1:3306/employees?characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
  main:
    allow-circular-references: true

执行sql脚本

SET NAMES utf8mb4;
SET
FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for salaries
-- ----------------------------
DROP TABLE IF EXISTS `salaries`;
CREATE TABLE `salaries`
(
    `emp_no`    int(0) NOT NULL COMMENT '编号',
    `salary`    int(0) NULL DEFAULT NULL COMMENT '薪资',
    `from_date` datetime(0) NULL DEFAULT NULL COMMENT '开始日期',
    `to_date`   datetime(0) NULL DEFAULT NULL COMMENT '结束日期'
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;

测试

执行启动类SalariesApplication,输出以下代码表示基础框架搭建完成

Tomcat started on port(s): 8080 (http) with context path ''

导入功能实现

刚入职的小七

刚入职的小七,第一次接到需要,超级兴奋,看到项目中使用的是easyexcel,查询easyexcel的官方文档,知道了要对easyexcel读取的数据进行处理,只需要实现ReadListener接口,重写它的invoke和doAfterAllAnalysed方法就行了,那还等什么呢?开干!

分支名称

230928-52javaee.com-SingleThreadSingleInsert

仓库地址

gitee.com/diqirenge/i…

分支描述

一个简单的导入例子,一次性加载所有数据,单线程处理数据,单条插入

代码实现

添加薪资服务

@Service
public class SalariesService extends ServiceImpl implements IService {

}

添加薪资导入监听器

@Component
public class SalariesListener implements ReadListener{

    @Resource
    private SalariesService salariesService;

    private static final Log logger = LogFactory.getLog(SalariesListener.class);

    /**
     * 处理数据的回调
     * @param data
     * @param context
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void invoke(Salaries data, AnalysisContext context) {
        saveOne(data);
    }

    public void saveOne(Salaries data){
        salariesService.save(data);
    }

    /**
     * 数据处理完的回调
     * @param context
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void doAfterAllAnalysed(AnalysisContext context) {
        logger.info("所有数据处理完毕");
    }
}

修改导入服务,增加导入方法

@Service
public class ImportService {
    @Resource
    private SalariesListener salariesListener;
    public void importExcel(MultipartFile file) throws IOException {
        EasyExcel.read(file.getInputStream(), Salaries.class, salariesListener).doReadAll();
    }

}

测试

1、启动项目

2、打开postman录入以下信息

image-20231007121543813.png

3、选择src/main/resources/xlsx下的52javaee_1000.xlsx文件

4、请求接口

小结

完成需求。1000条数据,5秒钟导入完毕(#^.^#)。

工作一年的小七

工作一年的小七,看着导入代码中一条一条的插入数据库,不经感叹:”真他娘的是个人才,我看看是谁写的“。结果第七人格几个大字跃然纸上,悄悄地,

这一坨必须要改掉o( ̄︶ ̄)o那还等什么?开干!

分支名称

230928-52javaee.com-SingleThreadBatchInsert

仓库地址

gitee.com/diqirenge/i…

分支描述

一次性加载所有数据,单线程处理数据,批量插入

代码实现

修改SalariesListener

    /**
     * 作为类的属性,我们需要他是线程安全的
     */
    private CopyOnWriteArrayList salariesList = new CopyOnWriteArrayList();

    private static final int batchSize = 1000;

    /**
     * 处理数据的回调
     *
     * @param data
     * @param context
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void invoke(Salaries data, AnalysisContext context) {
        salariesList.add(data);
        if (salariesList.size() >= batchSize) {
            saveBatch();
        }
    }

    public void saveBatch() {
        if (!salariesList.isEmpty()) {
            // 批量保存
            salariesService.saveBatch(salariesList, salariesList.size());
            // 保存完了,需要清空这个集合,方便下次使用
            salariesList.clear();
        }
    }

    /**
     * 数据处理完的回调
     *
     * @param context
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void doAfterAllAnalysed(AnalysisContext context) {
        if (!salariesList.isEmpty()) {
            salariesService.saveBatch(salariesList);
        }
        logger.info("所有数据处理完毕");
    }

修改yml配置

spring:
  servlet:
    multipart:
      max-request-size: 30MB
      max-file-size: 1024MB
  datasource:
    username: root
    password: 123456
    #`rewriteBatchedStatements=true` 是 MySQL 数据库连接配置中的一个参数,用于控制是否启用批量语句重写。
    #在默认情况下,MySQL 的批量语句处理方式是逐条执行 SQL 语句。然而,这可能会导致性能问题,特别是在处理大量数据时。因此,MySQL 提供了一种可以提高性能的批量处理方式,即批量语句重写。
    #当 `rewriteBatchedStatements=true` 时,MySQL 会自动将多个 SQL 语句合并成一条语句,以提高执行效率。这可以显著提高数据库操作的性能,特别是在处理大量数据时。
    url: jdbc:mysql://127.0.0.1:3306/employees?rewriteBatchedStatements=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
  main:
    allow-circular-references: true

测试

1、启动项目

2、打开postman录入以下信息

image-20231007121543813.png

3、选择src/main/resources/xlsx下的52javaee_1000.xlsx文件

4、请求接口

小结

通过将单条插入,改为批量插入,减少了网络I/O开销,提升了执行效率。

注:新增数据库连接参数 rewriteBatchedStatements=true,是为了实现真正的批量插入,否则mybatis-plus提交到数据库的sql语句是不会被合并成批量插入的,也就是说就算你用了saveBatch方法,最终还是单条插入。

工作三年的小七

小七工作三年了,今天又被运营吐槽,导入太慢,正好最近加深学习了多线程的知识,仔细一想如果可以分批处理sheet页,那不就可以使用多线程了吗?速度不是杠杆的?那还等什么,开干!

分支名称

230928-52javaee.com-MultiThreadSheet_BatchInsert

仓库地址

gitee.com/diqirenge/i…

分支描述

多线程处理sheet页,单线程处理数据,批量插入

代码实现

修改ImportService,并发读取sheet页进行处理

private final ExecutorService executorService = Executors.newFixedThreadPool(20);

public void importExcel(MultipartFile file) throws IOException {
    // 开20个线程分别处理20个sheet
    List tasks = new ArrayList();
    List readSheets = EasyExcelFactory.read(file.getInputStream()).build().excelExecutor().sheetList();
    int size = readSheets.size();
    for (int i = 0; i  {
            EasyExcel.read(file.getInputStream(), Salaries.class, salariesListener)
                    .sheet(num).doRead();
            return null;
        });
    }

    try {
        executorService.invokeAll(tasks);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

修改SalariesListener

    /**
     * 作为类的属性,我们需要他是线程安全的
     * 并且是针对20个sheet页,有20个线程,所以需要使用ThreadLocal,保证每个线程的salariesList都是独立的
     */
    private ThreadLocal salariesList = ThreadLocal.withInitial(ArrayList::new);

    private static final int batchSize = 1000;

    /**
     * 处理数据的回调
     *
     * @param data
     * @param context
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void invoke(Salaries data, AnalysisContext context) {
        salariesList.get().add(data);
        if (salariesList.get().size() >= batchSize) {
            saveBatch();
        }
    }

    public void saveBatch() {
        if (!salariesList.get().isEmpty()) {
            // 批量保存
            salariesService.saveBatch(salariesList.get(), salariesList.get().size());
            // 保存完了,需要清空这个集合,方便下次使用
            salariesList.get().clear();
        }
    }

测试

1、启动项目

2、打开postman录入以下信息

image-20231007121543813.png

3、选择src/main/resources/xlsx下的52javaee_5w.xlsx文件

4、请求接口

小结

执行结果

多线程处理sheet页,单线程处理数据,批量插入

导入结束,消耗了:122s

一个一个处理sheet页,是串行的,处理完一个sheet页才能接着处理下一个;而现在我们使用多线程,那么同一时间我们可以处理多个sheet页。最终我们终于可以尝试导入5w条数据了。

工作五年的小七

小七工作五年了,老板的公司越做越大,现在运营爸爸需要导入100w数据,天啊,100w啊,1后面那么多0......嘿嘿,想想都兴奋呢,又有时间优化代码了,既然读sheet页是用的多线程,那么我们批量插入的时候是不是也可以使用多线程呢?那还等什么?开干!

分支名称

230928-52javaee.com-MultiThreadSheet_MultiThreadBatchInsert

仓库地址

gitee.com/diqirenge/i…

分支描述

多线程处理sheet页,多线程处理数据,批量插入

代码实现

修改SalariesListener

private final ExecutorService executorService = Executors.newFixedThreadPool(20);

public void saveBatch() {
    if (!salariesList.get().isEmpty()) {
        // 先从副本中copy一份数据出来
        ArrayList salaries = (ArrayList) salariesList.get().clone();
        // 并发执行
        executorService.execute(new SaveTask(salaries, salariesService));
        // 执行完了之后需要清空数据,方便下次使用
        salariesList.get().clear();
    }
}

static class SaveTask implements Runnable {

    private List salariesList;
    private SalariesService salariesService;

    public SaveTask(List salariesList, SalariesService salariesService) {
        this.salariesList = salariesList;
        this.salariesService = salariesService;
    }

    @Override
    public void run() {
        salariesService.saveBatch(salariesList);
    }
}

测试

1、启动项目

2、打开postman录入以下信息

image-20231007121543813.png

3、选择src/main/resources/xlsx下的52javaee_100w.xlsx文件

4、请求接口

小结

经过分析,我们知道,批量插入也可以使用多线程处理,这种将串行变为并行处理的逻辑,是我们优化代码的一大帮手。这一次100w数据,30秒就解决了,运营爸爸很满意。

工作七年的小七

为了更简单方便的使用并发编程,我们将SalariesListener做成了单例的并且交给了Spring管理,还使用了一些线程安全的类,为了进一步提升效率,我们是不是可以考虑不把SalariesListener交给Spring管理,如果每一个都是new出来的新对象,那么也可以不再考虑共享变量的问题了~那还等什么呢?开干!

分支名称

230928-52javaee.com-Import_NoThreadSafety

仓库地址

gitee.com/diqirenge/i…

分支描述

多线程处理sheet页,多线程处理数据,批量插入,换一种思路解决并发安全问题

代码实现

修改SalariesListener

public class SalariesListener implements ReadListener {

    private SalariesService salariesService;

    /**
     * 通过构造方法注入的方式,我们得到的SalariesService其实就是Spring的代理对象
     *
     * @param salariesService
     */
    public SalariesListener(SalariesService salariesService) {
        this.salariesService = salariesService;
    }

    private static final Log logger = LogFactory.getLog(SalariesListener.class);

    /**
     * 因为每一个SalariesListener都是我们自己new出来的,所以不存在并发安全问题,替换成ArrayList即可
     */
    ArrayList salariesList = new ArrayList();

    private final ExecutorService executorService = Executors.newFixedThreadPool(20);

    private static final int batchSize = 10000;

    /**
     * 处理数据的回调
     *
     * @param data
     * @param context
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void invoke(Salaries data, AnalysisContext context) {
        salariesList.add(data);
        if (salariesList.size() >= batchSize) {
            saveBatch();
        }
    }

    public void saveBatch() {
        if (!salariesList.isEmpty()) {
            // 先从副本中copy一份数据出来
            ArrayList salaries = (ArrayList) salariesList.clone();
            // 并发执行
            executorService.execute(new SaveTask(salaries, salariesService));
            // 执行完了之后需要清空数据,方便下次使用
            salariesList.clear();
        }
    }

    static class SaveTask implements Runnable {

        private List salariesList;
        private SalariesService salariesService;

        public SaveTask(List salariesList, SalariesService salariesService) {
            this.salariesList = salariesList;
            this.salariesService = salariesService;
        }

        @Override
        public void run() {
            salariesService.saveBatch(salariesList);
        }
    }

    /**
     * 数据处理完的回调
     *
     * @param context
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void doAfterAllAnalysed(AnalysisContext context) {
        if (!salariesList.isEmpty()) {
            salariesService.saveBatch(salariesList);
        }
        logger.info("所有数据处理完毕");
    }
}

修改ImportService

public class ImportService {
    @Resource
    private SalariesService salariesService;

    private final ExecutorService executorService = Executors.newFixedThreadPool(20);

    public void importExcel(MultipartFile file) throws IOException {
        // 开20个线程分别处理20个sheet
        List tasks = new ArrayList();
        List readSheets = EasyExcelFactory.read(file.getInputStream()).build().excelExecutor().sheetList();
        int size = readSheets.size();
        for (int i = 0; i  {
                EasyExcel.read(file.getInputStream(), Salaries.class, new SalariesListener(salariesService))
                        .sheet(num).doRead();
                return null;
            });
        }

        try {
            executorService.invokeAll(tasks);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}

测试

1、启动项目

2、打开postman录入以下信息

image-20231007121543813.png

3、选择src/main/resources/xlsx下的52javaee_100w.xlsx文件

4、请求接口

小结

以空间换时间,通过每次new SalariesListener的方式,解决了线程安全问题,避免了共享变量的使用。100w数据导入,进一步优化到11秒。

导出功能实现

针对导出功能,其实本质上和导入是一样的,就是能够使用多线程的地方就尝试使用多线程。那么导出有几个可以优化的点呢?我们列出以下思考过程

1、全量导出,一个sheet页

2、单线程分页查询,单线程写

3、多线程分页查询,单线程写

4、多线程分页查询,多线程写

因为easyExcel 不支持多线程写(github.com/alibaba/eas…,所以我们采用第三种方式进行实现。那还等什么呢?开干!

分支名称

231007-52javaee.com-Export

仓库地址

gitee.com/diqirenge/i…

分支描述

多线程分页查询,单线程写

代码实现

修改导出服务类ExportService

@Service
public class ExportService {
    public static final String CONTENT_TYPE = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet";

    @Resource
    private SalariesMapper salariesMapper;


    public void exportExcel(HttpServletResponse response) throws IOException, InterruptedException {
        // 设置响应头
        setResponseHeader(response);

        // 多线程分页查询
        Map pageMap = getIntegerPageMap();

        // 导出
        doExport(response, pageMap);
    }

    private static void setResponseHeader(HttpServletResponse response) {
        response.setContentType(CONTENT_TYPE);
        response.setCharacterEncoding(StandardCharsets.UTF_8.name());
        response.setHeader("Content-disposition", "attachment;filename*=utf-8''" + "52javaee.com_export_100w.xlsx");
    }

    private static void doExport(HttpServletResponse response, Map pageMap) throws IOException {
        // 导出,导出为什么不使用多线程呢?因为easyExcel 没有提供这样的能力
        // https://github.com/alibaba/easyexcel/issues/1040
        try (ExcelWriter excelWriter = EasyExcelFactory.write(response.getOutputStream(), Salaries.class).build()) {
            for (Map.Entry entry : pageMap.entrySet()) {
                Integer num = entry.getKey();
                Page salariesPage = entry.getValue();
                WriteSheet writeSheet = EasyExcelFactory.writerSheet(num, "sheet-" + num).build();
                excelWriter.write(salariesPage.getRecords(), writeSheet);
            }
        }
    }

    private Map getIntegerPageMap() throws InterruptedException {
        Long count = salariesMapper.selectCount(null);

        Integer pages = 20;
        Long size = count / pages;

        ExecutorService executorService = Executors.newFixedThreadPool(pages);
        CountDownLatch countDownLatch = new CountDownLatch(pages);

        // 这里直接放到内存中了,生产的话需要评估数据量的大小会不会造成内存溢出
        Map pageMap = new HashMap();
        for (int i = 0; i  {
                Page page = new Page();
                page.setCurrent(finalI + 1);
                page.setSize(size);
                Page selectPage = salariesMapper.selectPage(page, null);

                pageMap.put(finalI, selectPage);
                countDownLatch.countDown();
            });
        }

        countDownLatch.await();

        executorService.shutdown();
        return pageMap;
    }

}

修改SalariesController,增加导出服务

/**
 * 导出服务
 */
@Resource
private ExportService exportService;
/**
 * 导出excel
 *
 * @param response
 * @throws IOException
 * @throws InterruptedException
 */
@GetMapping("export")
public void exportExcel(HttpServletResponse response) throws IOException, InterruptedException {
    exportService.exportExcel(response);
}

测试

1、启动项目

2、浏览器输入 http://localhost:8080/export

小结

经过大脑思考的代码,一百万数据导出,仅耗时21s

导出结束,消耗了:21s

总结

本文提供了百万级数据导入/导出的高效架构思路,从0到1实现了代码的架构演进。其关键思路总结如下:

1、减少网络I/O开销

2、找到能使用并发编程的点

3、减少共享变量的使用

本文只提供一个技术思路,因个人能力有限,如有错误和不足,望请指正,谢谢~

附录

  • easyExcel 不支持多线程写 github.com/alibaba/eas…
  • 本文代码完整地址 master分支 gitee.com/diqirenge/i…
  • 一次性加载所有数据,单线程处理数据,单条插入 gitee.com/diqirenge/i…
  • 一次性加载所有数据,单线程处理数据,批量插入 gitee.com/diqirenge/i…
  • 多线程处理sheet页,单线程处理数据,批量插入 gitee.com/diqirenge/i…
  • 多线程处理sheet页,多线程处理数据,批量插入 gitee.com/diqirenge/i…
  • 多线程处理sheet页,多线程处理数据,批量插入,换一种思路解决并发安全问题 gitee.com/diqirenge/i…
  • 多线程分页查询,单线程写 gitee.com/diqirenge/i…
  • 相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论