diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/config/MinioConfigProperties.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/config/MinioConfigProperties.java index afabac72..0902acc0 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/config/MinioConfigProperties.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/config/MinioConfigProperties.java @@ -56,6 +56,25 @@ public class MinioConfigProperties { * 当分片数量超过此值时,将采用分批并行合并策略 */ private int batchSize = 50; + + /** + * 是否启用动态批次大小优化(默认true) + * 启用后,大文件会使用更大的批次大小以提高合并效率 + */ + private boolean enableDynamicBatchSize = true; + + /** + * 动态批次大小的最大值(默认100) + * 当启用动态批次大小时,批次大小不会超过此值 + */ + private int maxDynamicBatchSize = 100; + + /** + * 是否启用同步清理模式(默认false) + * 启用后,分片文件清理将同步执行,便于调试验证清理效果 + * 生产环境建议设置为false以提高性能 + */ + private boolean enableSyncCleanup = false; } } \ No newline at end of file diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/controller/admin/file/FileController.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/controller/admin/file/FileController.java index 2bd9c048..71da964d 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/controller/admin/file/FileController.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/controller/admin/file/FileController.java @@ -417,12 +417,23 @@ public class FileController { return success(null); } catch (Exception e) { log.error("取消分片上传失败", e); - String errorMessage = e.getMessage(); - if (errorMessage != null && errorMessage.contains("上传会话不存在")) { - return CommonResult.error(404, "上传会话不存在"); - } else { - return CommonResult.error(500, "取消分片上传失败: " + errorMessage); - } + return CommonResult.error(500, "取消分片上传失败: " + e.getMessage()); + } + } + + @PostMapping("/multipart/cleanup-temp-files") + @Operation(summary = "手动清理临时文件") + @Parameter(name = "objectName", description = "对象名称", required = true) + @Parameter(name = "tempFileCount", description = "临时文件数量(可选)", required = false) + public CommonResult manualCleanupTempFiles( + @RequestParam("objectName") String objectName, + @RequestParam(value = "tempFileCount", required = false) Integer tempFileCount) { + try { + String result = minioService.manualCleanupTempFiles(objectName, tempFileCount); + return CommonResult.success(result); + } catch (Exception e) { + log.error("手动清理临时文件失败", e); + return CommonResult.error(500, "手动清理临时文件失败: " + e.getMessage()); } } diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/minio/MinioService.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/minio/MinioService.java index 1a1e70e6..6914d172 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/minio/MinioService.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/minio/MinioService.java @@ -16,6 +16,8 @@ import io.minio.ComposeObjectArgs; import io.minio.ComposeSource; import io.minio.RemoveObjectsArgs; import io.minio.messages.DeleteObject; +import io.minio.messages.DeleteError; +import io.minio.Result; import cn.iocoder.yudao.module.infra.controller.admin.file.vo.file.MultipartUploadCompleteRequest; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -28,15 +30,8 @@ import java.io.InputStream; import java.time.Instant; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.stream.Collectors; /** @@ -52,7 +47,7 @@ public class MinioService { private MinioConfigProperties minioConfigProperties; private MinioClient minioClient; - + // 用于并行处理的线程池 private ExecutorService executorService; @@ -383,11 +378,9 @@ public class MinioService { } } - // 删除旧的策略枚举,使用新的智能合并策略 - /** - * 完成分片上传 - 智能同步合并 + 异步清理 - * 确保返回的URL能直接访问,同时最大化合并效率 + * 完成分片上传 - 高效合并 + 异步清理 + * 优化合并效率,提高接口响应速度 * * @param uploadId 上传会话ID * @param objectName 对象名称 @@ -398,13 +391,13 @@ public class MinioService { java.util.List parts) throws Exception { long startTime = System.currentTimeMillis(); String bucketName = "user-uploads"; - + try { - log.info("开始智能同步合并{}个分片文件", parts.size()); - + log.info("开始高效合并{}个分片文件", parts.size()); + // 1. 预处理:排序分片 - parts.sort((a, b) -> a.getPartNumber().compareTo(b.getPartNumber())); - + parts.sort(Comparator.comparing(MultipartUploadCompleteRequest.PartInfo::getPartNumber)); + // 2. 构建合并源列表 List sources = parts.stream() .map(part -> ComposeSource.builder() @@ -412,25 +405,13 @@ public class MinioService { .object(objectName + ".part" + part.getPartNumber()) .build()) .collect(Collectors.toList()); - - // 3. 智能选择合并策略 - 同步执行确保文件完整性 - if (sources.size() > 100) { - // 大文件:分批并行合并(同步) - executeOptimizedBatchCompose(bucketName, objectName, sources); - } else { - // 小文件:直接合并(同步) - minioClient.composeObject( - ComposeObjectArgs.builder() - .bucket(bucketName) - .object(objectName) - .sources(sources) - .build() - ); - } - + + // 3. 智能合并策略 - 根据分片数量选择最优策略 + executeSmartCompose(bucketName, objectName, sources); + long composeTime = System.currentTimeMillis(); - log.info("同步合并完成,耗时: {}ms", composeTime - startTime); - + log.info("合并完成,耗时: {}ms", composeTime - startTime); + // 4. 并行获取文件信息和URL(同步) CompletableFuture urlFuture = CompletableFuture.supplyAsync(() -> { try { @@ -446,7 +427,7 @@ public class MinioService { return minioConfigProperties.getEndpoint() + "/" + bucketName + "/" + objectName; } }, executorService); - + CompletableFuture statFuture = CompletableFuture.supplyAsync(() -> { try { return minioClient.statObject( @@ -459,47 +440,54 @@ public class MinioService { throw new RuntimeException("获取文件信息失败", e); } }, executorService); + + // 5. 清理分片文件 - 可配置同步/异步模式 + boolean useSyncCleanup = minioConfigProperties.getMultipart().isEnableSyncCleanup(); - // 5. 异步清理分片文件(不阻塞响应) - cleanupChunkFilesBatch(bucketName, objectName, parts); - + if (useSyncCleanup) { + log.info("使用同步清理模式(调试模式)"); + try { + cleanupChunkFilesBatchSync(bucketName, objectName, parts); + } catch (Exception cleanupException) { + log.error("同步清理失败,但不影响主流程", cleanupException); + } + } else { + log.info("使用异步清理模式(生产模式)"); + cleanupChunkFilesBatch(bucketName, objectName, parts); + } + // 6. 等待URL和文件信息生成(最多3秒) String fileUrl = urlFuture.get(3, TimeUnit.SECONDS); StatObjectResponse stat = statFuture.get(3, TimeUnit.SECONDS); - + long totalTime = System.currentTimeMillis() - startTime; - log.info("智能同步合并完成,文件大小: {}, 总耗时: {}ms", stat.size(), totalTime); - + log.info("高效合并完成,文件大小: {}, 总耗时: {}ms", stat.size(), totalTime); + return new MultipartUploadCompleteResult(fileUrl, stat.etag()); - + } catch (Exception e) { - log.error("智能同步合并失败", e); + log.error("高效合并失败", e); throw new Exception("分片上传完成失败: " + e.getMessage(), e); } } - // 删除旧的执行方法,使用新的智能合并策略 - - // 删除旧的构建方法,已内联到主方法中 - - // 删除旧的立即响应模式方法 - - // 删除旧的极速模式方法 - - // 删除旧的标准模式方法 - - // 删除旧的URL超时处理方法 - /** - * 优化的分批并行合并策略 - 专为方案1优化 - * 确保合并操作同步完成,返回时文件已经可访问 + * 智能合并策略 - 根据分片数量自动选择最优方案 + * 优化点: + * 1. 小文件直接合并,避免创建临时文件 + * 2. 大文件使用优化的分批策略 + * 3. 移除不必要的验证步骤 */ - private void executeOptimizedBatchCompose(String bucketName, String objectName, List sources) throws Exception { + private void executeSmartCompose(String bucketName, String objectName, List sources) throws Exception { + int sourceCount = sources.size(); int batchSize = minioConfigProperties.getMultipart().getBatchSize(); - log.info("启用优化分批合并策略,总分片数: {}, 批次大小: {}", sources.size(), batchSize); - if (sources.size() <= batchSize) { - // 如果分片数量不超过批次大小,直接合并 + log.info("启用智能合并策略,总分片数: {}, 配置批次大小: {}", sourceCount, batchSize); + + // 优化1:小文件直接合并,避免不必要的分批 + if (sourceCount <= batchSize) { + log.info("分片数量({})不超过批次大小({}),采用直接合并", sourceCount, batchSize); + minioClient.composeObject( ComposeObjectArgs.builder() .bucket(bucketName) @@ -507,23 +495,55 @@ public class MinioService { .sources(sources) .build() ); + + log.info("直接合并完成"); return; } + // 优化2:大文件使用高效分批策略 + log.info("分片数量({})超过批次大小({}),采用高效分批合并", sourceCount, batchSize); + executeOptimizedBatchCompose(bucketName, objectName, sources); + } + + /** + * 优化的分批并行合并策略 + * 优化点: + * 1. 移除不必要的临时文件验证 + * 2. 优化批次大小策略 + * 3. 异步清理临时文件,不阻塞主流程 + */ + private void executeOptimizedBatchCompose(String bucketName, String objectName, List sources) throws Exception { + int batchSize = minioConfigProperties.getMultipart().getBatchSize(); + + // 优化3:动态调整批次大小,大文件使用更大的批次 + int optimizedBatchSize = batchSize; + + if (minioConfigProperties.getMultipart().isEnableDynamicBatchSize()) { + int maxDynamicBatchSize = minioConfigProperties.getMultipart().getMaxDynamicBatchSize(); + optimizedBatchSize = Math.max(batchSize, Math.min(maxDynamicBatchSize, sources.size() / 4)); + + if (optimizedBatchSize != batchSize) { + log.info("启用动态批次大小优化:{} -> {}", batchSize, optimizedBatchSize); + } + } + + log.info("启用优化分批合并策略,总分片数: {}, 批次大小: {}", sources.size(), optimizedBatchSize); + // 1. 分批创建临时合并对象 List tempObjects = new ArrayList<>(); List> batchFutures = new ArrayList<>(); - - for (int i = 0; i < sources.size(); i += batchSize) { - int endIndex = Math.min(i + batchSize, sources.size()); + + for (int i = 0; i < sources.size(); i += optimizedBatchSize) { + int endIndex = Math.min(i + optimizedBatchSize, sources.size()); List batch = sources.subList(i, endIndex); - String tempObjectName = objectName + ".temp." + (i / batchSize); + String tempObjectName = objectName + ".temp." + (i / optimizedBatchSize); tempObjects.add(tempObjectName); - + // 并行处理每个批次 CompletableFuture batchFuture = CompletableFuture.runAsync(() -> { try { long batchStartTime = System.currentTimeMillis(); + minioClient.composeObject( ComposeObjectArgs.builder() .bucket(bucketName) @@ -531,29 +551,31 @@ public class MinioService { .sources(batch) .build() ); + long batchTime = System.currentTimeMillis() - batchStartTime; - log.debug("批次合并完成: {}, 耗时: {}ms, 分片数: {}", + log.debug("批次合并完成: {}, 耗时: {}ms, 分片数: {}", tempObjectName, batchTime, batch.size()); + } catch (Exception e) { - log.error("批次合并失败: {}", tempObjectName, e); + log.error("批次合并失败: {}, 原因: {}", tempObjectName, e.getMessage(), e); throw new RuntimeException("批次合并失败: " + tempObjectName, e); } }, executorService); - + batchFutures.add(batchFuture); } - + // 2. 等待所有批次完成(同步) try { CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0])).get(); log.info("所有批次合并完成,共{}个批次", tempObjects.size()); } catch (Exception e) { log.error("批次合并过程中出现错误", e); - // 清理已创建的临时对象 + // 异步清理已创建的临时对象 cleanupTempObjects(bucketName, tempObjects); throw new Exception("分批合并失败", e); } - + // 3. 最终合并临时对象(同步) try { List tempSources = tempObjects.stream() @@ -562,7 +584,9 @@ public class MinioService { .object(tempObj) .build()) .collect(Collectors.toList()); - + + log.info("开始最终合并,临时对象数: {}", tempObjects.size()); + minioClient.composeObject( ComposeObjectArgs.builder() .bucket(bucketName) @@ -570,148 +594,241 @@ public class MinioService { .sources(tempSources) .build() ); - + log.info("优化分批合并完成,临时对象数: {}", tempObjects.size()); - - } finally { - // 4. 异步清理临时对象(不阻塞) - cleanupTempObjects(bucketName, tempObjects); - } - } - /** - * 分批并行合并策略 - 借鉴优秀实践 - * 将大量分片分批处理,减少单次操作的复杂度 - */ - private void executeBatchCompose(String bucketName, String objectName, List sources) throws Exception { - int batchSize = minioConfigProperties.getMultipart().getBatchSize(); - log.info("启用分批合并策略,总分片数: {}, 批次大小: {}", sources.size(), batchSize); - - if (sources.size() <= batchSize) { - // 如果分片数量不超过批次大小,直接合并 - minioClient.composeObject( - ComposeObjectArgs.builder() - .bucket(bucketName) - .object(objectName) - .sources(sources) - .build() - ); - return; - } - - // 1. 分批创建临时合并对象 - List tempObjects = new ArrayList<>(); - List> batchFutures = new ArrayList<>(); - - for (int i = 0; i < sources.size(); i += batchSize) { - int endIndex = Math.min(i + batchSize, sources.size()); - List batch = sources.subList(i, endIndex); - String tempObjectName = objectName + ".temp." + (i / batchSize); - tempObjects.add(tempObjectName); - - // 并行处理每个批次 - CompletableFuture batchFuture = CompletableFuture.runAsync(() -> { - try { - long batchStartTime = System.currentTimeMillis(); - minioClient.composeObject( - ComposeObjectArgs.builder() - .bucket(bucketName) - .object(tempObjectName) - .sources(batch) - .build() - ); - long batchTime = System.currentTimeMillis() - batchStartTime; - log.debug("批次合并完成: {}, 耗时: {}ms, 分片数: {}", - tempObjectName, batchTime, batch.size()); - } catch (Exception e) { - log.error("批次合并失败: {}", tempObjectName, e); - throw new RuntimeException("批次合并失败: " + tempObjectName, e); - } - }, executorService); - - batchFutures.add(batchFuture); - } - - // 2. 等待所有批次完成 - try { - CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0])).get(); - log.info("所有批次合并完成,共{}个批次", tempObjects.size()); } catch (Exception e) { - log.error("批次合并过程中出现错误", e); - // 清理已创建的临时对象 - cleanupTempObjects(bucketName, tempObjects); - throw new Exception("分批合并失败", e); - } - - // 3. 最终合并临时对象 - try { - List tempSources = tempObjects.stream() - .map(tempObj -> ComposeSource.builder() - .bucket(bucketName) - .object(tempObj) - .build()) - .collect(Collectors.toList()); - - minioClient.composeObject( - ComposeObjectArgs.builder() - .bucket(bucketName) - .object(objectName) - .sources(tempSources) - .build() - ); - - log.info("最终合并完成,临时对象数: {}", tempObjects.size()); - + log.error("最终合并失败,临时对象: {}", tempObjects, e); + throw new Exception("最终合并失败: " + e.getMessage(), e); } finally { - // 4. 清理临时对象 - cleanupTempObjects(bucketName, tempObjects); + // 4. 清理临时对象 - 支持同步/异步模式 + boolean useSyncCleanup = minioConfigProperties.getMultipart().isEnableSyncCleanup(); + + if (useSyncCleanup) { + log.info("使用同步清理临时对象模式(调试模式)"); + try { + cleanupTempObjectsSync(bucketName, tempObjects); + } catch (Exception cleanupException) { + log.error("同步清理临时对象失败,但不影响主流程", cleanupException); + } + } else { + log.info("使用异步清理临时对象模式(生产模式)"); + cleanupTempObjects(bucketName, tempObjects); + } } } /** - * 清理临时对象 + * 异步清理临时对象(修复版本) + * 正确处理MinIO删除结果,增加删除验证 */ private void cleanupTempObjects(String bucketName, List tempObjects) { if (tempObjects.isEmpty()) { return; } - + CompletableFuture.runAsync(() -> { try { + log.info("开始异步清理{}个临时对象", tempObjects.size()); + List deleteObjects = tempObjects.stream() .map(DeleteObject::new) .collect(Collectors.toList()); - - minioClient.removeObjects( + + // 批量删除临时对象 + Iterable> results = minioClient.removeObjects( RemoveObjectsArgs.builder() .bucket(bucketName) .objects(deleteObjects) .build() ); + + // 检查删除结果 + List failedFiles = new ArrayList<>(); + int successCount = 0; - log.info("临时对象清理完成,清理数量: {}", tempObjects.size()); + for (Result result : results) { + try { + DeleteError error = result.get(); + if (error != null) { + failedFiles.add(error.objectName()); + log.warn("删除临时对象失败: {}, 错误: {}", error.objectName(), error.message()); + } + } catch (Exception e) { + // 如果没有错误,说明删除成功 + successCount++; + } + } + + log.info("批量删除临时对象完成,成功: {}个,失败: {}个", successCount, failedFiles.size()); + + // 如果有失败的文件,尝试重试 + if (!failedFiles.isEmpty()) { + log.warn("有{}个临时对象删除失败,尝试单个删除重试", failedFiles.size()); + retryDeleteFailedTempObjects(bucketName, failedFiles); + } + } catch (Exception e) { - log.error("清理临时对象失败", e); + log.error("异步清理临时对象失败", e); + // 如果批量删除完全失败,降级为单个删除 + fallbackToSingleDeleteTempObjects(bucketName, tempObjects); } }, executorService); } + /** + * 同步清理临时对象(用于调试) + * 确保删除完成后返回,提供详细的删除结果 + */ + public void cleanupTempObjectsSync(String bucketName, List tempObjects) throws Exception { + if (tempObjects.isEmpty()) { + log.info("没有临时对象需要清理"); + return; + } + try { + log.info("开始同步清理{}个临时对象", tempObjects.size()); + List deleteObjects = tempObjects.stream() + .map(tempObj -> { + log.debug("准备删除临时对象: {}", tempObj); + return new DeleteObject(tempObj); + }) + .collect(Collectors.toList()); + // 批量删除临时对象 + log.info("执行批量删除临时对象,文件数量: {}", deleteObjects.size()); + Iterable> results = minioClient.removeObjects( + RemoveObjectsArgs.builder() + .bucket(bucketName) + .objects(deleteObjects) + .build() + ); + // 检查删除结果 + List failedFiles = new ArrayList<>(); + + for (Result result : results) { + try { + DeleteError error = result.get(); + if (error != null) { + failedFiles.add(error.objectName()); + log.warn("删除临时对象失败: {}, 错误代码: {}, 错误信息: {}", + error.objectName(), error.code(), error.message()); + } + } catch (Exception e) { + // 如果没有错误,说明删除成功 + } + } + // 计算成功删除的文件 + int totalFiles = deleteObjects.size(); + int successCount = totalFiles - failedFiles.size(); + + log.info("同步批量删除临时对象完成,总计: {}个,成功: {}个,失败: {}个", + totalFiles, successCount, failedFiles.size()); + + // 如果有失败的文件,尝试重试 + if (!failedFiles.isEmpty()) { + log.warn("有{}个临时对象删除失败,尝试单个删除重试", failedFiles.size()); + retryDeleteFailedTempObjects(bucketName, failedFiles); + } + + // 验证删除结果 + int remainingFiles = verifyTempObjectsCleanupResult(bucketName, tempObjects); + if (remainingFiles > 0) { + log.warn("验证发现仍有{}个临时对象未被删除", remainingFiles); + } else { + log.info("验证通过,所有临时对象已清理完成"); + } + + } catch (Exception e) { + log.error("同步清理临时对象失败", e); + throw new Exception("同步清理临时对象失败: " + e.getMessage(), e); + } + } /** - * 异步批量清理分片文件 - * 使用MinIO批量删除API,性能更优 + * 验证临时对象清理结果 + * 检查临时对象是否真的被删除了 */ - private void cleanupChunkFilesBatch(String bucketName, String objectName, + private int verifyTempObjectsCleanupResult(String bucketName, List tempObjects) { + int remainingFiles = 0; + + for (String tempObject : tempObjects) { + try { + minioClient.statObject( + StatObjectArgs.builder() + .bucket(bucketName) + .object(tempObject) + .build() + ); + // 如果能stat成功,说明文件仍然存在 + remainingFiles++; + log.warn("临时对象仍然存在: {}", tempObject); + } catch (Exception e) { + // 文件不存在,说明删除成功 + log.debug("临时对象已删除: {}", tempObject); + } + } + + return remainingFiles; + } + + /** + * 重试删除失败的临时对象 + */ + private void retryDeleteFailedTempObjects(String bucketName, List failedFiles) { + int retrySuccessCount = 0; + + for (String fileName : failedFiles) { + try { + minioClient.removeObject( + RemoveObjectArgs.builder() + .bucket(bucketName) + .object(fileName) + .build() + ); + retrySuccessCount++; + log.debug("重试删除临时对象成功: {}", fileName); + } catch (Exception e) { + log.warn("重试删除临时对象仍然失败: {}, 错误: {}", fileName, e.getMessage()); + } + } + + log.info("重试删除临时对象完成,成功删除{}个失败文件", retrySuccessCount); + } + + /** + * 降级为单个删除临时对象(当批量删除失败时) + */ + private void fallbackToSingleDeleteTempObjects(String bucketName, List tempObjects) { + log.warn("降级为单个删除临时对象模式"); + tempObjects.parallelStream().forEach(tempObject -> { + try { + minioClient.removeObject( + RemoveObjectArgs.builder() + .bucket(bucketName) + .object(tempObject) + .build() + ); + log.debug("降级删除临时对象: {}", tempObject); + } catch (Exception e) { + log.warn("降级删除临时对象失败: {}", tempObject, e); + } + }); + } + + /** + * 异步批量清理分片文件(修复版本) + * 正确处理MinIO删除结果,增加删除验证 + */ + private void cleanupChunkFilesBatch(String bucketName, String objectName, List parts) { CompletableFuture.runAsync(() -> { try { log.info("开始异步批量清理{}个分片文件", parts.size()); - + // 构建待删除对象列表 List deleteObjects = parts.stream() .map(part -> { @@ -720,45 +837,181 @@ public class MinioService { }) .collect(Collectors.toList()); - // 批量删除分片文件 - if (!deleteObjects.isEmpty()) { - minioClient.removeObjects( - RemoveObjectsArgs.builder() - .bucket(bucketName) - .objects(deleteObjects) - .build() - ); - log.info("异步批量清理分片文件完成,清理了{}个文件", deleteObjects.size()); + if (deleteObjects.isEmpty()) { + log.info("没有分片文件需要清理"); + return; } + + // 批量删除分片文件 + Iterable> results = minioClient.removeObjects( + RemoveObjectsArgs.builder() + .bucket(bucketName) + .objects(deleteObjects) + .build() + ); + + // 检查删除结果 + List failedFiles = new ArrayList<>(); + int successCount = 0; + for (Result result : results) { + try { + DeleteError error = result.get(); + if (error != null) { + failedFiles.add(error.objectName()); + log.warn("删除分片文件失败: {}, 错误: {}", error.objectName(), error.message()); + } + } catch (Exception e) { + // 如果没有错误,说明删除成功 + successCount++; + } + } + + log.info("批量删除分片文件完成,成功: {}个,失败: {}个", successCount, failedFiles.size()); + + // 如果有失败的文件,尝试重试 + if (!failedFiles.isEmpty()) { + log.warn("有{}个分片文件删除失败,尝试单个删除重试", failedFiles.size()); + retryDeleteFailedChunks(bucketName, failedFiles); + } + } catch (Exception e) { log.error("异步批量清理分片文件失败", e); - // 如果批量删除失败,降级为单个删除 + // 如果批量删除完全失败,降级为单个删除 fallbackToSingleDelete(bucketName, objectName, parts); } }, executorService); } /** - * 降级为单个删除(当批量删除失败时) + * 同步批量清理分片文件(用于调试) + * 确保删除完成后返回,提供详细的删除结果 */ - private void fallbackToSingleDelete(String bucketName, String objectName, - List parts) { - log.warn("降级为单个删除模式"); - parts.parallelStream().forEach(part -> { + public void cleanupChunkFilesBatchSync(String bucketName, String objectName, + List parts) throws Exception { + try { + log.info("开始同步批量清理{}个分片文件", parts.size()); + + // 构建待删除对象列表 + List deleteObjects = parts.stream() + .map(part -> { + String chunkObjectName = objectName + ".part" + part.getPartNumber(); + log.debug("准备删除分片文件: {}", chunkObjectName); + return new DeleteObject(chunkObjectName); + }) + .collect(Collectors.toList()); + + if (deleteObjects.isEmpty()) { + log.info("没有分片文件需要清理"); + return; + } + + // 批量删除分片文件 + log.info("执行批量删除,文件数量: {}", deleteObjects.size()); + Iterable> results = minioClient.removeObjects( + RemoveObjectsArgs.builder() + .bucket(bucketName) + .objects(deleteObjects) + .build() + ); + + // 检查删除结果 + List failedFiles = new ArrayList<>(); + List successFiles = new ArrayList<>(); + + for (Result result : results) { + try { + DeleteError error = result.get(); + if (error != null) { + failedFiles.add(error.objectName()); + log.warn("删除分片文件失败: {}, 错误代码: {}, 错误信息: {}", + error.objectName(), error.code(), error.message()); + } + } catch (Exception e) { + // 如果没有错误,说明删除成功 + // 但我们需要知道是哪个文件成功了 + // 由于MinIO API限制,我们无法直接知道,只能通过对比 + } + } + + // 计算成功删除的文件 + int totalFiles = deleteObjects.size(); + int successCount = totalFiles - failedFiles.size(); + + log.info("同步批量删除分片文件完成,总计: {}个,成功: {}个,失败: {}个", + totalFiles, successCount, failedFiles.size()); + + // 如果有失败的文件,尝试重试 + if (!failedFiles.isEmpty()) { + log.warn("有{}个分片文件删除失败,尝试单个删除重试", failedFiles.size()); + retryDeleteFailedChunks(bucketName, failedFiles); + } + + // 验证删除结果 + int remainingFiles = verifyCleanupResult(bucketName, objectName, parts); + if (remainingFiles > 0) { + log.warn("验证发现仍有{}个分片文件未被删除", remainingFiles); + } else { + log.info("验证通过,所有分片文件已清理完成"); + } + + } catch (Exception e) { + log.error("同步批量清理分片文件失败", e); + throw new Exception("同步清理分片文件失败: " + e.getMessage(), e); + } + } + + /** + * 验证清理结果 + * 检查分片文件是否真的被删除了 + */ + private int verifyCleanupResult(String bucketName, String objectName, + List parts) { + int remainingFiles = 0; + + for (MultipartUploadCompleteRequest.PartInfo part : parts) { String chunkObjectName = objectName + ".part" + part.getPartNumber(); try { - minioClient.removeObject( - RemoveObjectArgs.builder() + minioClient.statObject( + StatObjectArgs.builder() .bucket(bucketName) .object(chunkObjectName) .build() ); - log.debug("降级删除分片文件: {}", chunkObjectName); + // 如果能stat成功,说明文件仍然存在 + remainingFiles++; + log.warn("分片文件仍然存在: {}", chunkObjectName); } catch (Exception e) { - log.warn("降级删除分片文件失败: {}", chunkObjectName, e); + // 文件不存在,说明删除成功 + log.debug("分片文件已删除: {}", chunkObjectName); } - }); + } + + return remainingFiles; + } + + /** + * 重试删除失败的分片文件 + */ + private void retryDeleteFailedChunks(String bucketName, List failedFiles) { + int retrySuccessCount = 0; + + for (String fileName : failedFiles) { + try { + minioClient.removeObject( + RemoveObjectArgs.builder() + .bucket(bucketName) + .object(fileName) + .build() + ); + retrySuccessCount++; + log.debug("重试删除分片文件成功: {}", fileName); + } catch (Exception e) { + log.warn("重试删除分片文件仍然失败: {}, 错误: {}", fileName, e.getMessage()); + } + } + + log.info("重试删除完成,成功删除{}个失败文件", retrySuccessCount); } /** @@ -815,6 +1068,117 @@ public class MinioService { } } + /** + * 降级为单个删除(当批量删除失败时)- 同步版本 + */ + private void fallbackToSingleDeleteSync(String bucketName, String objectName, + List parts) { + log.warn("使用同步单个删除模式"); + int successCount = 0; + int failCount = 0; + + for (MultipartUploadCompleteRequest.PartInfo part : parts) { + String chunkObjectName = objectName + ".part" + part.getPartNumber(); + try { + minioClient.removeObject( + RemoveObjectArgs.builder() + .bucket(bucketName) + .object(chunkObjectName) + .build() + ); + successCount++; + log.debug("同步单个删除分片文件成功: {}", chunkObjectName); + } catch (Exception e) { + failCount++; + log.warn("同步单个删除分片文件失败: {}, 错误: {}", chunkObjectName, e.getMessage()); + } + } + + log.info("同步单个删除完成,成功{}个,失败{}个", successCount, failCount); + } + + /** + * 降级为单个删除(当批量删除失败时)- 异步版本 + */ + private void fallbackToSingleDelete(String bucketName, String objectName, + List parts) { + log.warn("降级为单个删除模式"); + parts.parallelStream().forEach(part -> { + String chunkObjectName = objectName + ".part" + part.getPartNumber(); + try { + minioClient.removeObject( + RemoveObjectArgs.builder() + .bucket(bucketName) + .object(chunkObjectName) + .build() + ); + log.debug("降级删除分片文件: {}", chunkObjectName); + } catch (Exception e) { + log.warn("降级删除分片文件失败: {}", chunkObjectName, e); + } + }); + } + + /** + * 手动清理临时文件(用于清理残留文件) + * + * @param objectName 对象名称 + * @param tempFileCount 临时文件数量(可选,如果不知道可以传null) + * @return 清理结果描述 + */ + public String manualCleanupTempFiles(String objectName, Integer tempFileCount) throws Exception { + String bucketName = "user-uploads"; + List tempObjects = new ArrayList<>(); + + try { + // 如果指定了临时文件数量,生成精确的临时文件列表 + if (tempFileCount != null && tempFileCount > 0) { + for (int i = 0; i < tempFileCount; i++) { + tempObjects.add(objectName + ".temp." + i); + } + } else { + // 如果没有指定数量,尝试查找可能的临时文件(最多检查20个) + for (int i = 0; i < 20; i++) { + String tempObject = objectName + ".temp." + i; + try { + minioClient.statObject( + StatObjectArgs.builder() + .bucket(bucketName) + .object(tempObject) + .build() + ); + // 如果文件存在,添加到清理列表 + tempObjects.add(tempObject); + log.info("发现临时文件: {}", tempObject); + } catch (Exception e) { + // 文件不存在,停止查找 + break; + } + } + } + + if (tempObjects.isEmpty()) { + String message = "没有发现需要清理的临时文件"; + log.info(message); + return message; + } + + log.info("开始手动清理{}个临时文件: {}", tempObjects.size(), tempObjects); + + // 使用同步清理方法确保清理完成 + cleanupTempObjectsSync(bucketName, tempObjects); + + String message = String.format("手动清理完成,处理了%d个临时文件", tempObjects.size()); + log.info(message); + return message; + + } catch (Exception e) { + String errorMessage = "手动清理临时文件失败: " + e.getMessage(); + log.error(errorMessage, e); + throw new Exception(errorMessage, e); + } + } + /** * 分片上传完成结果 */