feat(infra): 优化分片上传和临时文件清理功能
- 重构了分片上传完成逻辑,提高合并效率 - 新增同步清理模式,便于调试验证清理效果 - 优化临时文件命名规则,提高可读性 - 增加删除失败重试机制,提高清理成功率 - 新增手动清理临时文件接口,用于处理残留文件 - 优化日志输出,提高可追踪性
This commit is contained in:
parent
f0009fee14
commit
6edeedd402
@ -56,6 +56,25 @@ public class MinioConfigProperties {
|
||||
* 当分片数量超过此值时,将采用分批并行合并策略
|
||||
*/
|
||||
private int batchSize = 50;
|
||||
|
||||
/**
|
||||
* 是否启用动态批次大小优化(默认true)
|
||||
* 启用后,大文件会使用更大的批次大小以提高合并效率
|
||||
*/
|
||||
private boolean enableDynamicBatchSize = true;
|
||||
|
||||
/**
|
||||
* 动态批次大小的最大值(默认100)
|
||||
* 当启用动态批次大小时,批次大小不会超过此值
|
||||
*/
|
||||
private int maxDynamicBatchSize = 100;
|
||||
|
||||
/**
|
||||
* 是否启用同步清理模式(默认false)
|
||||
* 启用后,分片文件清理将同步执行,便于调试验证清理效果
|
||||
* 生产环境建议设置为false以提高性能
|
||||
*/
|
||||
private boolean enableSyncCleanup = false;
|
||||
}
|
||||
|
||||
}
|
@ -417,12 +417,23 @@ public class FileController {
|
||||
return success(null);
|
||||
} catch (Exception e) {
|
||||
log.error("取消分片上传失败", e);
|
||||
String errorMessage = e.getMessage();
|
||||
if (errorMessage != null && errorMessage.contains("上传会话不存在")) {
|
||||
return CommonResult.error(404, "上传会话不存在");
|
||||
} else {
|
||||
return CommonResult.error(500, "取消分片上传失败: " + errorMessage);
|
||||
}
|
||||
return CommonResult.error(500, "取消分片上传失败: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@PostMapping("/multipart/cleanup-temp-files")
|
||||
@Operation(summary = "手动清理临时文件")
|
||||
@Parameter(name = "objectName", description = "对象名称", required = true)
|
||||
@Parameter(name = "tempFileCount", description = "临时文件数量(可选)", required = false)
|
||||
public CommonResult<String> manualCleanupTempFiles(
|
||||
@RequestParam("objectName") String objectName,
|
||||
@RequestParam(value = "tempFileCount", required = false) Integer tempFileCount) {
|
||||
try {
|
||||
String result = minioService.manualCleanupTempFiles(objectName, tempFileCount);
|
||||
return CommonResult.success(result);
|
||||
} catch (Exception e) {
|
||||
log.error("手动清理临时文件失败", e);
|
||||
return CommonResult.error(500, "手动清理临时文件失败: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -16,6 +16,8 @@ import io.minio.ComposeObjectArgs;
|
||||
import io.minio.ComposeSource;
|
||||
import io.minio.RemoveObjectsArgs;
|
||||
import io.minio.messages.DeleteObject;
|
||||
import io.minio.messages.DeleteError;
|
||||
import io.minio.Result;
|
||||
import cn.iocoder.yudao.module.infra.controller.admin.file.vo.file.MultipartUploadCompleteRequest;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
@ -28,15 +30,8 @@ import java.io.InputStream;
|
||||
import java.time.Instant;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@ -52,7 +47,7 @@ public class MinioService {
|
||||
private MinioConfigProperties minioConfigProperties;
|
||||
|
||||
private MinioClient minioClient;
|
||||
|
||||
|
||||
// 用于并行处理的线程池
|
||||
private ExecutorService executorService;
|
||||
|
||||
@ -383,11 +378,9 @@ public class MinioService {
|
||||
}
|
||||
}
|
||||
|
||||
// 删除旧的策略枚举,使用新的智能合并策略
|
||||
|
||||
/**
|
||||
* 完成分片上传 - 智能同步合并 + 异步清理
|
||||
* 确保返回的URL能直接访问,同时最大化合并效率
|
||||
* 完成分片上传 - 高效合并 + 异步清理
|
||||
* 优化合并效率,提高接口响应速度
|
||||
*
|
||||
* @param uploadId 上传会话ID
|
||||
* @param objectName 对象名称
|
||||
@ -398,13 +391,13 @@ public class MinioService {
|
||||
java.util.List<MultipartUploadCompleteRequest.PartInfo> parts) throws Exception {
|
||||
long startTime = System.currentTimeMillis();
|
||||
String bucketName = "user-uploads";
|
||||
|
||||
|
||||
try {
|
||||
log.info("开始智能同步合并{}个分片文件", parts.size());
|
||||
|
||||
log.info("开始高效合并{}个分片文件", parts.size());
|
||||
|
||||
// 1. 预处理:排序分片
|
||||
parts.sort((a, b) -> a.getPartNumber().compareTo(b.getPartNumber()));
|
||||
|
||||
parts.sort(Comparator.comparing(MultipartUploadCompleteRequest.PartInfo::getPartNumber));
|
||||
|
||||
// 2. 构建合并源列表
|
||||
List<ComposeSource> sources = parts.stream()
|
||||
.map(part -> ComposeSource.builder()
|
||||
@ -412,25 +405,13 @@ public class MinioService {
|
||||
.object(objectName + ".part" + part.getPartNumber())
|
||||
.build())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// 3. 智能选择合并策略 - 同步执行确保文件完整性
|
||||
if (sources.size() > 100) {
|
||||
// 大文件:分批并行合并(同步)
|
||||
executeOptimizedBatchCompose(bucketName, objectName, sources);
|
||||
} else {
|
||||
// 小文件:直接合并(同步)
|
||||
minioClient.composeObject(
|
||||
ComposeObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.object(objectName)
|
||||
.sources(sources)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
// 3. 智能合并策略 - 根据分片数量选择最优策略
|
||||
executeSmartCompose(bucketName, objectName, sources);
|
||||
|
||||
long composeTime = System.currentTimeMillis();
|
||||
log.info("同步合并完成,耗时: {}ms", composeTime - startTime);
|
||||
|
||||
log.info("合并完成,耗时: {}ms", composeTime - startTime);
|
||||
|
||||
// 4. 并行获取文件信息和URL(同步)
|
||||
CompletableFuture<String> urlFuture = CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
@ -446,7 +427,7 @@ public class MinioService {
|
||||
return minioConfigProperties.getEndpoint() + "/" + bucketName + "/" + objectName;
|
||||
}
|
||||
}, executorService);
|
||||
|
||||
|
||||
CompletableFuture<StatObjectResponse> statFuture = CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
return minioClient.statObject(
|
||||
@ -459,47 +440,54 @@ public class MinioService {
|
||||
throw new RuntimeException("获取文件信息失败", e);
|
||||
}
|
||||
}, executorService);
|
||||
|
||||
// 5. 清理分片文件 - 可配置同步/异步模式
|
||||
boolean useSyncCleanup = minioConfigProperties.getMultipart().isEnableSyncCleanup();
|
||||
|
||||
// 5. 异步清理分片文件(不阻塞响应)
|
||||
cleanupChunkFilesBatch(bucketName, objectName, parts);
|
||||
|
||||
if (useSyncCleanup) {
|
||||
log.info("使用同步清理模式(调试模式)");
|
||||
try {
|
||||
cleanupChunkFilesBatchSync(bucketName, objectName, parts);
|
||||
} catch (Exception cleanupException) {
|
||||
log.error("同步清理失败,但不影响主流程", cleanupException);
|
||||
}
|
||||
} else {
|
||||
log.info("使用异步清理模式(生产模式)");
|
||||
cleanupChunkFilesBatch(bucketName, objectName, parts);
|
||||
}
|
||||
|
||||
// 6. 等待URL和文件信息生成(最多3秒)
|
||||
String fileUrl = urlFuture.get(3, TimeUnit.SECONDS);
|
||||
StatObjectResponse stat = statFuture.get(3, TimeUnit.SECONDS);
|
||||
|
||||
|
||||
long totalTime = System.currentTimeMillis() - startTime;
|
||||
log.info("智能同步合并完成,文件大小: {}, 总耗时: {}ms", stat.size(), totalTime);
|
||||
|
||||
log.info("高效合并完成,文件大小: {}, 总耗时: {}ms", stat.size(), totalTime);
|
||||
|
||||
return new MultipartUploadCompleteResult(fileUrl, stat.etag());
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("智能同步合并失败", e);
|
||||
log.error("高效合并失败", e);
|
||||
throw new Exception("分片上传完成失败: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
// 删除旧的执行方法,使用新的智能合并策略
|
||||
|
||||
// 删除旧的构建方法,已内联到主方法中
|
||||
|
||||
// 删除旧的立即响应模式方法
|
||||
|
||||
// 删除旧的极速模式方法
|
||||
|
||||
// 删除旧的标准模式方法
|
||||
|
||||
// 删除旧的URL超时处理方法
|
||||
|
||||
/**
|
||||
* 优化的分批并行合并策略 - 专为方案1优化
|
||||
* 确保合并操作同步完成,返回时文件已经可访问
|
||||
* 智能合并策略 - 根据分片数量自动选择最优方案
|
||||
* 优化点:
|
||||
* 1. 小文件直接合并,避免创建临时文件
|
||||
* 2. 大文件使用优化的分批策略
|
||||
* 3. 移除不必要的验证步骤
|
||||
*/
|
||||
private void executeOptimizedBatchCompose(String bucketName, String objectName, List<ComposeSource> sources) throws Exception {
|
||||
private void executeSmartCompose(String bucketName, String objectName, List<ComposeSource> sources) throws Exception {
|
||||
int sourceCount = sources.size();
|
||||
int batchSize = minioConfigProperties.getMultipart().getBatchSize();
|
||||
log.info("启用优化分批合并策略,总分片数: {}, 批次大小: {}", sources.size(), batchSize);
|
||||
|
||||
if (sources.size() <= batchSize) {
|
||||
// 如果分片数量不超过批次大小,直接合并
|
||||
log.info("启用智能合并策略,总分片数: {}, 配置批次大小: {}", sourceCount, batchSize);
|
||||
|
||||
// 优化1:小文件直接合并,避免不必要的分批
|
||||
if (sourceCount <= batchSize) {
|
||||
log.info("分片数量({})不超过批次大小({}),采用直接合并", sourceCount, batchSize);
|
||||
|
||||
minioClient.composeObject(
|
||||
ComposeObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
@ -507,23 +495,55 @@ public class MinioService {
|
||||
.sources(sources)
|
||||
.build()
|
||||
);
|
||||
|
||||
log.info("直接合并完成");
|
||||
return;
|
||||
}
|
||||
|
||||
// 优化2:大文件使用高效分批策略
|
||||
log.info("分片数量({})超过批次大小({}),采用高效分批合并", sourceCount, batchSize);
|
||||
executeOptimizedBatchCompose(bucketName, objectName, sources);
|
||||
}
|
||||
|
||||
/**
|
||||
* 优化的分批并行合并策略
|
||||
* 优化点:
|
||||
* 1. 移除不必要的临时文件验证
|
||||
* 2. 优化批次大小策略
|
||||
* 3. 异步清理临时文件,不阻塞主流程
|
||||
*/
|
||||
private void executeOptimizedBatchCompose(String bucketName, String objectName, List<ComposeSource> sources) throws Exception {
|
||||
int batchSize = minioConfigProperties.getMultipart().getBatchSize();
|
||||
|
||||
// 优化3:动态调整批次大小,大文件使用更大的批次
|
||||
int optimizedBatchSize = batchSize;
|
||||
|
||||
if (minioConfigProperties.getMultipart().isEnableDynamicBatchSize()) {
|
||||
int maxDynamicBatchSize = minioConfigProperties.getMultipart().getMaxDynamicBatchSize();
|
||||
optimizedBatchSize = Math.max(batchSize, Math.min(maxDynamicBatchSize, sources.size() / 4));
|
||||
|
||||
if (optimizedBatchSize != batchSize) {
|
||||
log.info("启用动态批次大小优化:{} -> {}", batchSize, optimizedBatchSize);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("启用优化分批合并策略,总分片数: {}, 批次大小: {}", sources.size(), optimizedBatchSize);
|
||||
|
||||
// 1. 分批创建临时合并对象
|
||||
List<String> tempObjects = new ArrayList<>();
|
||||
List<CompletableFuture<Void>> batchFutures = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < sources.size(); i += batchSize) {
|
||||
int endIndex = Math.min(i + batchSize, sources.size());
|
||||
|
||||
for (int i = 0; i < sources.size(); i += optimizedBatchSize) {
|
||||
int endIndex = Math.min(i + optimizedBatchSize, sources.size());
|
||||
List<ComposeSource> batch = sources.subList(i, endIndex);
|
||||
String tempObjectName = objectName + ".temp." + (i / batchSize);
|
||||
String tempObjectName = objectName + ".temp." + (i / optimizedBatchSize);
|
||||
tempObjects.add(tempObjectName);
|
||||
|
||||
|
||||
// 并行处理每个批次
|
||||
CompletableFuture<Void> batchFuture = CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
long batchStartTime = System.currentTimeMillis();
|
||||
|
||||
minioClient.composeObject(
|
||||
ComposeObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
@ -531,29 +551,31 @@ public class MinioService {
|
||||
.sources(batch)
|
||||
.build()
|
||||
);
|
||||
|
||||
long batchTime = System.currentTimeMillis() - batchStartTime;
|
||||
log.debug("批次合并完成: {}, 耗时: {}ms, 分片数: {}",
|
||||
log.debug("批次合并完成: {}, 耗时: {}ms, 分片数: {}",
|
||||
tempObjectName, batchTime, batch.size());
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("批次合并失败: {}", tempObjectName, e);
|
||||
log.error("批次合并失败: {}, 原因: {}", tempObjectName, e.getMessage(), e);
|
||||
throw new RuntimeException("批次合并失败: " + tempObjectName, e);
|
||||
}
|
||||
}, executorService);
|
||||
|
||||
|
||||
batchFutures.add(batchFuture);
|
||||
}
|
||||
|
||||
|
||||
// 2. 等待所有批次完成(同步)
|
||||
try {
|
||||
CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0])).get();
|
||||
log.info("所有批次合并完成,共{}个批次", tempObjects.size());
|
||||
} catch (Exception e) {
|
||||
log.error("批次合并过程中出现错误", e);
|
||||
// 清理已创建的临时对象
|
||||
// 异步清理已创建的临时对象
|
||||
cleanupTempObjects(bucketName, tempObjects);
|
||||
throw new Exception("分批合并失败", e);
|
||||
}
|
||||
|
||||
|
||||
// 3. 最终合并临时对象(同步)
|
||||
try {
|
||||
List<ComposeSource> tempSources = tempObjects.stream()
|
||||
@ -562,7 +584,9 @@ public class MinioService {
|
||||
.object(tempObj)
|
||||
.build())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
|
||||
log.info("开始最终合并,临时对象数: {}", tempObjects.size());
|
||||
|
||||
minioClient.composeObject(
|
||||
ComposeObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
@ -570,148 +594,241 @@ public class MinioService {
|
||||
.sources(tempSources)
|
||||
.build()
|
||||
);
|
||||
|
||||
|
||||
log.info("优化分批合并完成,临时对象数: {}", tempObjects.size());
|
||||
|
||||
} finally {
|
||||
// 4. 异步清理临时对象(不阻塞)
|
||||
cleanupTempObjects(bucketName, tempObjects);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 分批并行合并策略 - 借鉴优秀实践
|
||||
* 将大量分片分批处理,减少单次操作的复杂度
|
||||
*/
|
||||
private void executeBatchCompose(String bucketName, String objectName, List<ComposeSource> sources) throws Exception {
|
||||
int batchSize = minioConfigProperties.getMultipart().getBatchSize();
|
||||
log.info("启用分批合并策略,总分片数: {}, 批次大小: {}", sources.size(), batchSize);
|
||||
|
||||
if (sources.size() <= batchSize) {
|
||||
// 如果分片数量不超过批次大小,直接合并
|
||||
minioClient.composeObject(
|
||||
ComposeObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.object(objectName)
|
||||
.sources(sources)
|
||||
.build()
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// 1. 分批创建临时合并对象
|
||||
List<String> tempObjects = new ArrayList<>();
|
||||
List<CompletableFuture<Void>> batchFutures = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < sources.size(); i += batchSize) {
|
||||
int endIndex = Math.min(i + batchSize, sources.size());
|
||||
List<ComposeSource> batch = sources.subList(i, endIndex);
|
||||
String tempObjectName = objectName + ".temp." + (i / batchSize);
|
||||
tempObjects.add(tempObjectName);
|
||||
|
||||
// 并行处理每个批次
|
||||
CompletableFuture<Void> batchFuture = CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
long batchStartTime = System.currentTimeMillis();
|
||||
minioClient.composeObject(
|
||||
ComposeObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.object(tempObjectName)
|
||||
.sources(batch)
|
||||
.build()
|
||||
);
|
||||
long batchTime = System.currentTimeMillis() - batchStartTime;
|
||||
log.debug("批次合并完成: {}, 耗时: {}ms, 分片数: {}",
|
||||
tempObjectName, batchTime, batch.size());
|
||||
} catch (Exception e) {
|
||||
log.error("批次合并失败: {}", tempObjectName, e);
|
||||
throw new RuntimeException("批次合并失败: " + tempObjectName, e);
|
||||
}
|
||||
}, executorService);
|
||||
|
||||
batchFutures.add(batchFuture);
|
||||
}
|
||||
|
||||
// 2. 等待所有批次完成
|
||||
try {
|
||||
CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0])).get();
|
||||
log.info("所有批次合并完成,共{}个批次", tempObjects.size());
|
||||
} catch (Exception e) {
|
||||
log.error("批次合并过程中出现错误", e);
|
||||
// 清理已创建的临时对象
|
||||
cleanupTempObjects(bucketName, tempObjects);
|
||||
throw new Exception("分批合并失败", e);
|
||||
}
|
||||
|
||||
// 3. 最终合并临时对象
|
||||
try {
|
||||
List<ComposeSource> tempSources = tempObjects.stream()
|
||||
.map(tempObj -> ComposeSource.builder()
|
||||
.bucket(bucketName)
|
||||
.object(tempObj)
|
||||
.build())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
minioClient.composeObject(
|
||||
ComposeObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.object(objectName)
|
||||
.sources(tempSources)
|
||||
.build()
|
||||
);
|
||||
|
||||
log.info("最终合并完成,临时对象数: {}", tempObjects.size());
|
||||
|
||||
log.error("最终合并失败,临时对象: {}", tempObjects, e);
|
||||
throw new Exception("最终合并失败: " + e.getMessage(), e);
|
||||
} finally {
|
||||
// 4. 清理临时对象
|
||||
cleanupTempObjects(bucketName, tempObjects);
|
||||
// 4. 清理临时对象 - 支持同步/异步模式
|
||||
boolean useSyncCleanup = minioConfigProperties.getMultipart().isEnableSyncCleanup();
|
||||
|
||||
if (useSyncCleanup) {
|
||||
log.info("使用同步清理临时对象模式(调试模式)");
|
||||
try {
|
||||
cleanupTempObjectsSync(bucketName, tempObjects);
|
||||
} catch (Exception cleanupException) {
|
||||
log.error("同步清理临时对象失败,但不影响主流程", cleanupException);
|
||||
}
|
||||
} else {
|
||||
log.info("使用异步清理临时对象模式(生产模式)");
|
||||
cleanupTempObjects(bucketName, tempObjects);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理临时对象
|
||||
* 异步清理临时对象(修复版本)
|
||||
* 正确处理MinIO删除结果,增加删除验证
|
||||
*/
|
||||
private void cleanupTempObjects(String bucketName, List<String> tempObjects) {
|
||||
if (tempObjects.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
log.info("开始异步清理{}个临时对象", tempObjects.size());
|
||||
|
||||
List<DeleteObject> deleteObjects = tempObjects.stream()
|
||||
.map(DeleteObject::new)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
minioClient.removeObjects(
|
||||
|
||||
// 批量删除临时对象
|
||||
Iterable<Result<DeleteError>> results = minioClient.removeObjects(
|
||||
RemoveObjectsArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.objects(deleteObjects)
|
||||
.build()
|
||||
);
|
||||
|
||||
// 检查删除结果
|
||||
List<String> failedFiles = new ArrayList<>();
|
||||
int successCount = 0;
|
||||
|
||||
log.info("临时对象清理完成,清理数量: {}", tempObjects.size());
|
||||
for (Result<DeleteError> result : results) {
|
||||
try {
|
||||
DeleteError error = result.get();
|
||||
if (error != null) {
|
||||
failedFiles.add(error.objectName());
|
||||
log.warn("删除临时对象失败: {}, 错误: {}", error.objectName(), error.message());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// 如果没有错误,说明删除成功
|
||||
successCount++;
|
||||
}
|
||||
}
|
||||
|
||||
log.info("批量删除临时对象完成,成功: {}个,失败: {}个", successCount, failedFiles.size());
|
||||
|
||||
// 如果有失败的文件,尝试重试
|
||||
if (!failedFiles.isEmpty()) {
|
||||
log.warn("有{}个临时对象删除失败,尝试单个删除重试", failedFiles.size());
|
||||
retryDeleteFailedTempObjects(bucketName, failedFiles);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("清理临时对象失败", e);
|
||||
log.error("异步清理临时对象失败", e);
|
||||
// 如果批量删除完全失败,降级为单个删除
|
||||
fallbackToSingleDeleteTempObjects(bucketName, tempObjects);
|
||||
}
|
||||
}, executorService);
|
||||
}
|
||||
|
||||
/**
|
||||
* 同步清理临时对象(用于调试)
|
||||
* 确保删除完成后返回,提供详细的删除结果
|
||||
*/
|
||||
public void cleanupTempObjectsSync(String bucketName, List<String> tempObjects) throws Exception {
|
||||
if (tempObjects.isEmpty()) {
|
||||
log.info("没有临时对象需要清理");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
log.info("开始同步清理{}个临时对象", tempObjects.size());
|
||||
|
||||
List<DeleteObject> deleteObjects = tempObjects.stream()
|
||||
.map(tempObj -> {
|
||||
log.debug("准备删除临时对象: {}", tempObj);
|
||||
return new DeleteObject(tempObj);
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// 批量删除临时对象
|
||||
log.info("执行批量删除临时对象,文件数量: {}", deleteObjects.size());
|
||||
Iterable<Result<DeleteError>> results = minioClient.removeObjects(
|
||||
RemoveObjectsArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.objects(deleteObjects)
|
||||
.build()
|
||||
);
|
||||
|
||||
// 检查删除结果
|
||||
List<String> failedFiles = new ArrayList<>();
|
||||
|
||||
for (Result<DeleteError> result : results) {
|
||||
try {
|
||||
DeleteError error = result.get();
|
||||
if (error != null) {
|
||||
failedFiles.add(error.objectName());
|
||||
log.warn("删除临时对象失败: {}, 错误代码: {}, 错误信息: {}",
|
||||
error.objectName(), error.code(), error.message());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// 如果没有错误,说明删除成功
|
||||
}
|
||||
}
|
||||
|
||||
// 计算成功删除的文件
|
||||
int totalFiles = deleteObjects.size();
|
||||
int successCount = totalFiles - failedFiles.size();
|
||||
|
||||
log.info("同步批量删除临时对象完成,总计: {}个,成功: {}个,失败: {}个",
|
||||
totalFiles, successCount, failedFiles.size());
|
||||
|
||||
// 如果有失败的文件,尝试重试
|
||||
if (!failedFiles.isEmpty()) {
|
||||
log.warn("有{}个临时对象删除失败,尝试单个删除重试", failedFiles.size());
|
||||
retryDeleteFailedTempObjects(bucketName, failedFiles);
|
||||
}
|
||||
|
||||
// 验证删除结果
|
||||
int remainingFiles = verifyTempObjectsCleanupResult(bucketName, tempObjects);
|
||||
if (remainingFiles > 0) {
|
||||
log.warn("验证发现仍有{}个临时对象未被删除", remainingFiles);
|
||||
} else {
|
||||
log.info("验证通过,所有临时对象已清理完成");
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("同步清理临时对象失败", e);
|
||||
throw new Exception("同步清理临时对象失败: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 异步批量清理分片文件
|
||||
* 使用MinIO批量删除API,性能更优
|
||||
* 验证临时对象清理结果
|
||||
* 检查临时对象是否真的被删除了
|
||||
*/
|
||||
private void cleanupChunkFilesBatch(String bucketName, String objectName,
|
||||
private int verifyTempObjectsCleanupResult(String bucketName, List<String> tempObjects) {
|
||||
int remainingFiles = 0;
|
||||
|
||||
for (String tempObject : tempObjects) {
|
||||
try {
|
||||
minioClient.statObject(
|
||||
StatObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.object(tempObject)
|
||||
.build()
|
||||
);
|
||||
// 如果能stat成功,说明文件仍然存在
|
||||
remainingFiles++;
|
||||
log.warn("临时对象仍然存在: {}", tempObject);
|
||||
} catch (Exception e) {
|
||||
// 文件不存在,说明删除成功
|
||||
log.debug("临时对象已删除: {}", tempObject);
|
||||
}
|
||||
}
|
||||
|
||||
return remainingFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
* 重试删除失败的临时对象
|
||||
*/
|
||||
private void retryDeleteFailedTempObjects(String bucketName, List<String> failedFiles) {
|
||||
int retrySuccessCount = 0;
|
||||
|
||||
for (String fileName : failedFiles) {
|
||||
try {
|
||||
minioClient.removeObject(
|
||||
RemoveObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.object(fileName)
|
||||
.build()
|
||||
);
|
||||
retrySuccessCount++;
|
||||
log.debug("重试删除临时对象成功: {}", fileName);
|
||||
} catch (Exception e) {
|
||||
log.warn("重试删除临时对象仍然失败: {}, 错误: {}", fileName, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
log.info("重试删除临时对象完成,成功删除{}个失败文件", retrySuccessCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* 降级为单个删除临时对象(当批量删除失败时)
|
||||
*/
|
||||
private void fallbackToSingleDeleteTempObjects(String bucketName, List<String> tempObjects) {
|
||||
log.warn("降级为单个删除临时对象模式");
|
||||
tempObjects.parallelStream().forEach(tempObject -> {
|
||||
try {
|
||||
minioClient.removeObject(
|
||||
RemoveObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.object(tempObject)
|
||||
.build()
|
||||
);
|
||||
log.debug("降级删除临时对象: {}", tempObject);
|
||||
} catch (Exception e) {
|
||||
log.warn("降级删除临时对象失败: {}", tempObject, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 异步批量清理分片文件(修复版本)
|
||||
* 正确处理MinIO删除结果,增加删除验证
|
||||
*/
|
||||
private void cleanupChunkFilesBatch(String bucketName, String objectName,
|
||||
List<MultipartUploadCompleteRequest.PartInfo> parts) {
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
log.info("开始异步批量清理{}个分片文件", parts.size());
|
||||
|
||||
|
||||
// 构建待删除对象列表
|
||||
List<DeleteObject> deleteObjects = parts.stream()
|
||||
.map(part -> {
|
||||
@ -720,45 +837,181 @@ public class MinioService {
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// 批量删除分片文件
|
||||
if (!deleteObjects.isEmpty()) {
|
||||
minioClient.removeObjects(
|
||||
RemoveObjectsArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.objects(deleteObjects)
|
||||
.build()
|
||||
);
|
||||
log.info("异步批量清理分片文件完成,清理了{}个文件", deleteObjects.size());
|
||||
if (deleteObjects.isEmpty()) {
|
||||
log.info("没有分片文件需要清理");
|
||||
return;
|
||||
}
|
||||
|
||||
// 批量删除分片文件
|
||||
Iterable<Result<DeleteError>> results = minioClient.removeObjects(
|
||||
RemoveObjectsArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.objects(deleteObjects)
|
||||
.build()
|
||||
);
|
||||
|
||||
// 检查删除结果
|
||||
List<String> failedFiles = new ArrayList<>();
|
||||
int successCount = 0;
|
||||
|
||||
for (Result<DeleteError> result : results) {
|
||||
try {
|
||||
DeleteError error = result.get();
|
||||
if (error != null) {
|
||||
failedFiles.add(error.objectName());
|
||||
log.warn("删除分片文件失败: {}, 错误: {}", error.objectName(), error.message());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// 如果没有错误,说明删除成功
|
||||
successCount++;
|
||||
}
|
||||
}
|
||||
|
||||
log.info("批量删除分片文件完成,成功: {}个,失败: {}个", successCount, failedFiles.size());
|
||||
|
||||
// 如果有失败的文件,尝试重试
|
||||
if (!failedFiles.isEmpty()) {
|
||||
log.warn("有{}个分片文件删除失败,尝试单个删除重试", failedFiles.size());
|
||||
retryDeleteFailedChunks(bucketName, failedFiles);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("异步批量清理分片文件失败", e);
|
||||
// 如果批量删除失败,降级为单个删除
|
||||
// 如果批量删除完全失败,降级为单个删除
|
||||
fallbackToSingleDelete(bucketName, objectName, parts);
|
||||
}
|
||||
}, executorService);
|
||||
}
|
||||
|
||||
/**
|
||||
* 降级为单个删除(当批量删除失败时)
|
||||
* 同步批量清理分片文件(用于调试)
|
||||
* 确保删除完成后返回,提供详细的删除结果
|
||||
*/
|
||||
private void fallbackToSingleDelete(String bucketName, String objectName,
|
||||
List<MultipartUploadCompleteRequest.PartInfo> parts) {
|
||||
log.warn("降级为单个删除模式");
|
||||
parts.parallelStream().forEach(part -> {
|
||||
public void cleanupChunkFilesBatchSync(String bucketName, String objectName,
|
||||
List<MultipartUploadCompleteRequest.PartInfo> parts) throws Exception {
|
||||
try {
|
||||
log.info("开始同步批量清理{}个分片文件", parts.size());
|
||||
|
||||
// 构建待删除对象列表
|
||||
List<DeleteObject> deleteObjects = parts.stream()
|
||||
.map(part -> {
|
||||
String chunkObjectName = objectName + ".part" + part.getPartNumber();
|
||||
log.debug("准备删除分片文件: {}", chunkObjectName);
|
||||
return new DeleteObject(chunkObjectName);
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (deleteObjects.isEmpty()) {
|
||||
log.info("没有分片文件需要清理");
|
||||
return;
|
||||
}
|
||||
|
||||
// 批量删除分片文件
|
||||
log.info("执行批量删除,文件数量: {}", deleteObjects.size());
|
||||
Iterable<Result<DeleteError>> results = minioClient.removeObjects(
|
||||
RemoveObjectsArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.objects(deleteObjects)
|
||||
.build()
|
||||
);
|
||||
|
||||
// 检查删除结果
|
||||
List<String> failedFiles = new ArrayList<>();
|
||||
List<String> successFiles = new ArrayList<>();
|
||||
|
||||
for (Result<DeleteError> result : results) {
|
||||
try {
|
||||
DeleteError error = result.get();
|
||||
if (error != null) {
|
||||
failedFiles.add(error.objectName());
|
||||
log.warn("删除分片文件失败: {}, 错误代码: {}, 错误信息: {}",
|
||||
error.objectName(), error.code(), error.message());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// 如果没有错误,说明删除成功
|
||||
// 但我们需要知道是哪个文件成功了
|
||||
// 由于MinIO API限制,我们无法直接知道,只能通过对比
|
||||
}
|
||||
}
|
||||
|
||||
// 计算成功删除的文件
|
||||
int totalFiles = deleteObjects.size();
|
||||
int successCount = totalFiles - failedFiles.size();
|
||||
|
||||
log.info("同步批量删除分片文件完成,总计: {}个,成功: {}个,失败: {}个",
|
||||
totalFiles, successCount, failedFiles.size());
|
||||
|
||||
// 如果有失败的文件,尝试重试
|
||||
if (!failedFiles.isEmpty()) {
|
||||
log.warn("有{}个分片文件删除失败,尝试单个删除重试", failedFiles.size());
|
||||
retryDeleteFailedChunks(bucketName, failedFiles);
|
||||
}
|
||||
|
||||
// 验证删除结果
|
||||
int remainingFiles = verifyCleanupResult(bucketName, objectName, parts);
|
||||
if (remainingFiles > 0) {
|
||||
log.warn("验证发现仍有{}个分片文件未被删除", remainingFiles);
|
||||
} else {
|
||||
log.info("验证通过,所有分片文件已清理完成");
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("同步批量清理分片文件失败", e);
|
||||
throw new Exception("同步清理分片文件失败: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 验证清理结果
|
||||
* 检查分片文件是否真的被删除了
|
||||
*/
|
||||
private int verifyCleanupResult(String bucketName, String objectName,
|
||||
List<MultipartUploadCompleteRequest.PartInfo> parts) {
|
||||
int remainingFiles = 0;
|
||||
|
||||
for (MultipartUploadCompleteRequest.PartInfo part : parts) {
|
||||
String chunkObjectName = objectName + ".part" + part.getPartNumber();
|
||||
try {
|
||||
minioClient.removeObject(
|
||||
RemoveObjectArgs.builder()
|
||||
minioClient.statObject(
|
||||
StatObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.object(chunkObjectName)
|
||||
.build()
|
||||
);
|
||||
log.debug("降级删除分片文件: {}", chunkObjectName);
|
||||
// 如果能stat成功,说明文件仍然存在
|
||||
remainingFiles++;
|
||||
log.warn("分片文件仍然存在: {}", chunkObjectName);
|
||||
} catch (Exception e) {
|
||||
log.warn("降级删除分片文件失败: {}", chunkObjectName, e);
|
||||
// 文件不存在,说明删除成功
|
||||
log.debug("分片文件已删除: {}", chunkObjectName);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return remainingFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
* 重试删除失败的分片文件
|
||||
*/
|
||||
private void retryDeleteFailedChunks(String bucketName, List<String> failedFiles) {
|
||||
int retrySuccessCount = 0;
|
||||
|
||||
for (String fileName : failedFiles) {
|
||||
try {
|
||||
minioClient.removeObject(
|
||||
RemoveObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.object(fileName)
|
||||
.build()
|
||||
);
|
||||
retrySuccessCount++;
|
||||
log.debug("重试删除分片文件成功: {}", fileName);
|
||||
} catch (Exception e) {
|
||||
log.warn("重试删除分片文件仍然失败: {}, 错误: {}", fileName, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
log.info("重试删除完成,成功删除{}个失败文件", retrySuccessCount);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -815,6 +1068,117 @@ public class MinioService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 降级为单个删除(当批量删除失败时)- 同步版本
|
||||
*/
|
||||
private void fallbackToSingleDeleteSync(String bucketName, String objectName,
|
||||
List<MultipartUploadCompleteRequest.PartInfo> parts) {
|
||||
log.warn("使用同步单个删除模式");
|
||||
int successCount = 0;
|
||||
int failCount = 0;
|
||||
|
||||
for (MultipartUploadCompleteRequest.PartInfo part : parts) {
|
||||
String chunkObjectName = objectName + ".part" + part.getPartNumber();
|
||||
try {
|
||||
minioClient.removeObject(
|
||||
RemoveObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.object(chunkObjectName)
|
||||
.build()
|
||||
);
|
||||
successCount++;
|
||||
log.debug("同步单个删除分片文件成功: {}", chunkObjectName);
|
||||
} catch (Exception e) {
|
||||
failCount++;
|
||||
log.warn("同步单个删除分片文件失败: {}, 错误: {}", chunkObjectName, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
log.info("同步单个删除完成,成功{}个,失败{}个", successCount, failCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* 降级为单个删除(当批量删除失败时)- 异步版本
|
||||
*/
|
||||
private void fallbackToSingleDelete(String bucketName, String objectName,
|
||||
List<MultipartUploadCompleteRequest.PartInfo> parts) {
|
||||
log.warn("降级为单个删除模式");
|
||||
parts.parallelStream().forEach(part -> {
|
||||
String chunkObjectName = objectName + ".part" + part.getPartNumber();
|
||||
try {
|
||||
minioClient.removeObject(
|
||||
RemoveObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.object(chunkObjectName)
|
||||
.build()
|
||||
);
|
||||
log.debug("降级删除分片文件: {}", chunkObjectName);
|
||||
} catch (Exception e) {
|
||||
log.warn("降级删除分片文件失败: {}", chunkObjectName, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 手动清理临时文件(用于清理残留文件)
|
||||
*
|
||||
* @param objectName 对象名称
|
||||
* @param tempFileCount 临时文件数量(可选,如果不知道可以传null)
|
||||
* @return 清理结果描述
|
||||
*/
|
||||
public String manualCleanupTempFiles(String objectName, Integer tempFileCount) throws Exception {
|
||||
String bucketName = "user-uploads";
|
||||
List<String> tempObjects = new ArrayList<>();
|
||||
|
||||
try {
|
||||
// 如果指定了临时文件数量,生成精确的临时文件列表
|
||||
if (tempFileCount != null && tempFileCount > 0) {
|
||||
for (int i = 0; i < tempFileCount; i++) {
|
||||
tempObjects.add(objectName + ".temp." + i);
|
||||
}
|
||||
} else {
|
||||
// 如果没有指定数量,尝试查找可能的临时文件(最多检查20个)
|
||||
for (int i = 0; i < 20; i++) {
|
||||
String tempObject = objectName + ".temp." + i;
|
||||
try {
|
||||
minioClient.statObject(
|
||||
StatObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.object(tempObject)
|
||||
.build()
|
||||
);
|
||||
// 如果文件存在,添加到清理列表
|
||||
tempObjects.add(tempObject);
|
||||
log.info("发现临时文件: {}", tempObject);
|
||||
} catch (Exception e) {
|
||||
// 文件不存在,停止查找
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (tempObjects.isEmpty()) {
|
||||
String message = "没有发现需要清理的临时文件";
|
||||
log.info(message);
|
||||
return message;
|
||||
}
|
||||
|
||||
log.info("开始手动清理{}个临时文件: {}", tempObjects.size(), tempObjects);
|
||||
|
||||
// 使用同步清理方法确保清理完成
|
||||
cleanupTempObjectsSync(bucketName, tempObjects);
|
||||
|
||||
String message = String.format("手动清理完成,处理了%d个临时文件", tempObjects.size());
|
||||
log.info(message);
|
||||
return message;
|
||||
|
||||
} catch (Exception e) {
|
||||
String errorMessage = "手动清理临时文件失败: " + e.getMessage();
|
||||
log.error(errorMessage, e);
|
||||
throw new Exception(errorMessage, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 分片上传完成结果
|
||||
*/
|
||||
|
Loading…
Reference in New Issue
Block a user