Excel大数据量导入优化实战
📊 优化成果:将Excel大数据量导入接口性能从 300秒 优化到 8秒
在企业级应用中,Excel大数据量导入是一个常见但充满挑战的需求。看似简单的功能,实际上涉及内存管理、并发处理、事务控制、数据库优化等多个技术难点。本文将分享我在实际项目中的优化实践,从最初的性能瓶颈到最终的高效解决方案。
🎯 核心挑战
在处理Excel大数据量导入时,主要面临以下挑战:
- 内存溢出(OOM):大量数据加载到内存可能导致JVM崩溃
- 执行速度慢:单线程处理大量数据效率低下
- 事务一致性:多线程环境下的事务管理复杂
- 数据库性能:大批量数据写入的数据库瓶颈
🔧 解决方案详解
1. 内存优化:分批处理防止OOM
问题分析:如果将10万条Excel数据一次性加载到内存,极易导致内存溢出。
解决方案:使用 EasyExcel 进行分批处理
/**
* 分批读取Excel数据,避免内存溢出
* 将10万条数据分成每批1万条处理
*/
@Component
public class ExcelBatchProcessor {
private static final int BATCH_SIZE = 10000;
public void processExcelFile(String filePath) {
EasyExcel.read(filePath, YourDataModel.class, new ExcelDataListener())
.sheet()
.headRowNumber(1)
.doRead();
}
@Component
public class ExcelDataListener extends AnalysisEventListener<YourDataModel> {
private List<YourDataModel> dataList = new ArrayList<>();
@Override
public void invoke(YourDataModel data, AnalysisContext context) {
dataList.add(data);
// 达到批处理大小时进行处理
if (dataList.size() >= BATCH_SIZE) {
processBatch(dataList);
dataList.clear();
}
}
@Override
public void doAfterAllAnalysed(AnalysisContext context) {
// 处理最后一批数据
if (!dataList.isEmpty()) {
processBatch(dataList);
}
}
private void processBatch(List<YourDataModel> batch) {
// 业务逻辑处理:数据验证、格式转换等
validateAndTransform(batch);
// 多线程插入数据库
parallelUpdateBatch(batch);
}
}
}
2. 多线程优化:提升处理速度
核心思路:基于CPU核心数动态分配线程,并行处理数据插入
/**
* 多线程批量更新数据
* 支持事务一致性控制
*/
public void parallelUpdateBatch(List<PauseAndReuseUpdateDto> list) throws InterruptedException {
// 事务回滚控制器
DataRollBack dataRollBack = new DataRollBack(false);
// 主线程等待闭锁
CountDownLatch mainCountDownLatch = new CountDownLatch(1);
// 动态计算线程数(CPU核心数 * 2)
Integer threadNum = Runtime.getRuntime().availableProcessors() * 2;
// 数据分片:根据线程数平均分配任务
List<List<PauseAndReuseUpdateDto>> dataChunks = ListUtil.averageAssign(list, threadNum);
// 子线程执行结果收集
List<Boolean> executionResults = Collections.synchronizedList(new ArrayList<>());
// 过滤空列表
dataChunks = dataChunks.stream()
.filter(chunk -> !ObjectUtils.isEmpty(chunk))
.collect(Collectors.toList());
// 子线程计数器
CountDownLatch childCountDownLatch = new CountDownLatch(dataChunks.size());
// 🚀 启动多线程任务
for (List<PauseAndReuseUpdateDto> dataChunk : dataChunks) {
CompletableFuture.runAsync(() -> {
parallelUpdate(dataChunk, mainCountDownLatch, childCountDownLatch,
executionResults, dataRollBack);
}, threadPoolExecutor);
}
// 等待所有子线程完成
childCountDownLatch.await();
// 📊 检查执行结果
boolean hasFailure = executionResults.stream().anyMatch(result -> !result);
if (hasFailure) {
log.warn("=== 多线程插入执行失败,准备回滚 ===");
dataRollBack.setIsRollBack(true);
}
// 🔓 释放所有子线程,开始事务提交/回滚
mainCountDownLatch.countDown();
if (dataRollBack.getIsRollBack()) {
log.error("=== 主线程触发全局回滚 ===");
throw new SchedulingException("数据处理失败,已回滚所有操作");
}
log.info("=== ✅ 多线程数据插入成功完成 ===");
}
3. 事务控制:多线程环境下的一致性保证
核心难点:多线程环境下保证数据一致性,要么全部成功,要么全部回滚
/**
* 子线程数据处理逻辑
* 实现分布式事务控制
*/
public void parallelUpdate(List<PauseAndReuseUpdateDto> dataChunk,
CountDownLatch mainCountDownLatch,
CountDownLatch childCountDownLatch,
List<Boolean> executionResults,
DataRollBack dataRollBack) {
TransactionStatus transactionStatus = null;
String threadName = "MyTx" + Thread.currentThread().getName();
try {
// 🔄 创建独立事务
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
definition.setName(threadName);
definition.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
transactionStatus = transactionManager.getTransaction(definition);
// 📝 执行数据处理逻辑
for (PauseAndReuseUpdateDto dto : dataChunk) {
String carNo = dto.getKcProductionPlanEntity().getCarNo();
log.debug("子线程: {}, 处理车号: {}", threadName, carNo);
// 主表更新
this.saveOrUpdate(dto.getKcProductionPlanEntity());
// 关联表批量更新
updateRelatedData(dto, threadName, carNo);
}
// ✅ 标记当前线程执行成功
executionResults.add(Boolean.TRUE);
} catch (TransactionException e) {
log.error("子线程: {} 事务创建失败", threadName, e);
executionResults.add(Boolean.FALSE);
} catch (Exception e) {
log.error("子线程: {} 数据处理异常", threadName, e);
if (transactionStatus != null) {
transactionManager.rollback(transactionStatus);
}
executionResults.add(Boolean.FALSE);
} finally {
// 🔄 释放子线程计数
childCountDownLatch.countDown();
}
try {
// ⏳ 等待主线程协调决策
mainCountDownLatch.await();
// 🎯 根据全局状态决定提交或回滚
if (dataRollBack.getIsRollBack()) {
transactionManager.rollback(transactionStatus);
log.warn("子线程: {} 已回滚", threadName);
} else {
transactionManager.commit(transactionStatus);
log.info("子线程: {} 已提交", threadName);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("子线程: {} 等待中断", threadName, e);
}
}
/**
* 更新关联数据
*/
private void updateRelatedData(PauseAndReuseUpdateDto dto, String threadName, String carNo) {
// 更新生产计划数据
List<KcPlanProductionDataEntity> planDataList = dto.getKcPlanDataList();
if (ObjectUtil.isNotEmpty(planDataList)) {
boolean success = kcPlanProductionDataService.updateBatchById(planDataList, 3000);
if (!success) {
log.warn("更新岗位供件时间失败, 线程: {}, 车号: {}", threadName, carNo);
throw new RuntimeException("更新岗位供件时间失败");
}
}
// 更新领料单数据
List<PiGetMaterialsEntity> materialsList = dto.getPiGetList();
if (ObjectUtil.isNotEmpty(materialsList)) {
boolean success = piGetMaterialsService.updateBatchById(materialsList, 3000);
if (!success) {
log.warn("更新领料单失败, 线程: {}, 车号: {}", threadName, carNo);
throw new RuntimeException("更新领料单失败");
}
}
}
4. 数据库优化:MySQL参数调优
经过多线程优化后,瓶颈转移到了数据库写入。通过调整以下MySQL参数显著提升性能:
🎯 关键参数调优
| 参数名 | 推荐值 | 作用说明 |
|---|---|---|
innodb_buffer_pool_size |
系统内存的70-80% | 增加InnoDB缓冲池,减少磁盘I/O |
innodb_log_file_size |
256MB-1GB | 增大日志文件,减少日志切换频率 |
innodb_log_buffer_size |
16MB-64MB | 优化日志缓冲区大小 |
bulk_insert_buffer_size |
64MB-256MB | 优化批量插入性能 |
max_allowed_packet |
64MB-1GB | 支持大数据包传输 |
📋 具体配置示例
-- InnoDB 核心优化
SET GLOBAL innodb_buffer_pool_size = 2147483648; -- 2GB
SET GLOBAL innodb_log_file_size = 268435456; -- 256MB
SET GLOBAL innodb_log_buffer_size = 67108864; -- 64MB
-- 批量插入优化
SET GLOBAL bulk_insert_buffer_size = 67108864; -- 64MB
SET GLOBAL max_allowed_packet = 1073741824; -- 1GB
-- 事务优化
SET SESSION autocommit = 0; -- 关闭自动提交,手动控制事务
SET SESSION transaction_isolation = 'READ-COMMITTED'; -- 降低隔离级别
-- MyISAM 引擎优化(如果使用)
SET GLOBAL key_buffer_size = 268435456; -- 256MB
SET GLOBAL myisam_sort_buffer_size = 134217728; -- 128MB
⚠️ 生产环境问题与解决方案
5.1 并发导入OOM问题
现象:多用户同时导入Excel时仍然出现内存溢出
原因分析:
- 单个导入已经分批处理,但多个并发导入累积内存占用过大
- JVM堆内存设置不足以支撑高并发场景
解决方案:
/**
* 使用分布式锁限制并发导入数量
*/
@Component
public class ExcelImportLockManager {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String IMPORT_LOCK_KEY = "excel:import:lock";
private static final int MAX_CONCURRENT_IMPORTS = 3; // 最大并发导入数
private static final int LOCK_TIMEOUT = 30; // 锁超时时间(分钟)
/**
* 获取导入许可
*/
public boolean acquireImportPermit(String userId) {
String lockKey = IMPORT_LOCK_KEY + ":" + userId;
// 检查当前并发数
Set<String> currentLocks = redisTemplate.keys(IMPORT_LOCK_KEY + ":*");
if (currentLocks != null && currentLocks.size() >= MAX_CONCURRENT_IMPORTS) {
log.warn("当前导入并发数已达上限: {}", MAX_CONCURRENT_IMPORTS);
return false;
}
// 获取锁
Boolean acquired = redisTemplate.opsForValue()
.setIfAbsent(lockKey, "importing", Duration.ofMinutes(LOCK_TIMEOUT));
return Boolean.TRUE.equals(acquired);
}
/**
* 释放导入许可
*/
public void releaseImportPermit(String userId) {
String lockKey = IMPORT_LOCK_KEY + ":" + userId;
redisTemplate.delete(lockKey);
}
}
配置优化:
# JVM 堆内存调优
-Xms4g -Xmx8g # 根据服务器配置调整
-XX:+UseG1GC # 使用G1垃圾回收器
-XX:MaxGCPauseMillis=200 # 控制GC停顿时间
5.2 事务提交超时问题
现象:数据量大时出现事务提交超时,线程一直阻塞
根本原因:线程池配置不当,核心线程数过少导致任务堆积
解决方案:
/**
* 线程池配置优化
*/
@Configuration
public class ThreadPoolConfig {
@Bean("excelProcessorThreadPool")
public ThreadPoolExecutor excelProcessorThreadPool() {
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
int maxPoolSize = corePoolSize * 2;
long keepAliveTime = 60L;
return new ThreadPoolExecutor(
corePoolSize, // 核心线程数
maxPoolSize, // 最大线程数
keepAliveTime, // 线程存活时间
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue<>(100), // 工作队列
new ThreadFactoryBuilder()
.setNameFormat("excel-processor-%d")
.setDaemon(false)
.build(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
}
}
关键改进点:
- 动态线程数计算:基于CPU核心数而非固定值
- 合理队列大小:避免任务无限堆积
- 拒绝策略优化:使用调用者线程执行,保证任务不丢失
📊 性能优化效果
| 优化阶段 | 处理方式 | 10万条数据耗时 | 内存占用 | 并发支持 |
|---|---|---|---|---|
| 初始版本 | 单线程全量加载 | ~300秒 | >2GB | 1个用户 |
| 分批优化 | EasyExcel分批 | ~180秒 | ~200MB | 1个用户 |
| 多线程优化 | 并行处理 | ~45秒 | ~300MB | 1个用户 |
| 数据库优化 | MySQL调优 | ~15秒 | ~300MB | 1个用户 |
| 最终版本 | 全链路优化 | ~8秒 | ~400MB | 3个用户 |
🎉 最终成果:性能提升 37.5倍,支持并发导入
🏆 最佳实践总结
✅ 核心优化策略
- 内存管理
- 使用流式读取,避免全量加载
- 合理设置批处理大小(推荐1万条/批)
- 及时释放临时对象,减少GC压力
- 并发控制
- 基于CPU核心数动态计算线程数
- 使用
CountDownLatch协调多线程 - 实现分布式锁限制并发数量
- 事务管理
- 每个线程独立事务,最后统一决策
- 使用适当的事务隔离级别
- 合理设置事务超时时间
- 数据库优化
- 调整InnoDB缓冲池大小
- 优化批量插入参数
- 使用合适的索引策略
🚀 性能监控指标
/**
* 性能监控埋点
*/
@Component
public class PerformanceMonitor {
private final MeterRegistry meterRegistry;
public void recordImportMetrics(String operation, long duration, int recordCount) {
// 记录处理耗时
Timer.Sample sample = Timer.start(meterRegistry);
sample.stop(Timer.builder("excel.import.duration")
.tag("operation", operation)
.register(meterRegistry));
// 记录处理数量
meterRegistry.counter("excel.import.records", "operation", operation)
.increment(recordCount);
// 记录处理速度(条/秒)
double rps = recordCount * 1000.0 / duration;
meterRegistry.gauge("excel.import.rps", "operation", operation, rps);
}
}
🔮 后续优化方向
- 异步化改造:使用MQ实现完全异步处理
- 分库分表:支持更大数据量的水平扩展
- 缓存优化:引入Redis缓存热点数据
- 文件分片:支持超大Excel文件的分片上传
- 实时进度:WebSocket实时反馈处理进度
通过以上优化方案,我们成功将Excel大数据量导入的性能提升了37.5倍,同时保证了数据一致性和系统稳定性。这套方案在生产环境中稳定运行,为用户提供了流畅的数据导入体验。
💡 关键启示:性能优化是一个系统工程,需要从内存、并发、事务、数据库等多个维度综合考虑,单点优化往往效果有限。