diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/config/MinioConfigProperties.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/config/MinioConfigProperties.java index 9f7762ea..afabac72 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/config/MinioConfigProperties.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/config/MinioConfigProperties.java @@ -41,16 +41,6 @@ public class MinioConfigProperties { @Data public static class MultipartConfig { - /** - * 使用立即响应模式的分片数阈值(默认100) - */ - private int instantModeThreshold = 100; - - /** - * 使用极速模式的分片数阈值(默认50) - */ - private int turboModeThreshold = 50; - /** * 线程池核心线程数(默认:CPU核心数*2,最小8) */ @@ -61,11 +51,6 @@ public class MinioConfigProperties { */ private int maxPoolSize = Math.min(32, corePoolSize * 4); - /** - * 预生成URL的超时时间(秒,默认3秒) - */ - private int urlGenerateTimeout = 3; - /** * 分批合并的批次大小(默认50) * 当分片数量超过此值时,将采用分批并行合并策略 diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/minio/MinioService.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/minio/MinioService.java index d55dea6a..1a1e70e6 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/minio/MinioService.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/minio/MinioService.java @@ -383,28 +383,11 @@ public class MinioService { } } - /** - * 分片上传完成策略枚举 - */ - private enum CompletionStrategy { - STANDARD("标准快速模式"), - TURBO("极速模式"), - INSTANT("立即响应模式"); - - private final String description; - - CompletionStrategy(String description) { - this.description = description; - } - - public String getDescription() { - return description; - } - } + // 删除旧的策略枚举,使用新的智能合并策略 /** - * 完成分片上传 - 优化版本(统一处理逻辑) - * 根据分片数量自动选择最优策略 + * 完成分片上传 - 智能同步合并 + 异步清理 + * 确保返回的URL能直接访问,同时最大化合并效率 * * @param uploadId 上传会话ID * @param objectName 对象名称 @@ -413,90 +396,29 @@ public class MinioService { */ public MultipartUploadCompleteResult completeMultipartUpload(String uploadId, String objectName, java.util.List parts) throws Exception { - // 根据分片数量和配置选择策略 - int instantThreshold = minioConfigProperties.getMultipart().getInstantModeThreshold(); - int turboThreshold = minioConfigProperties.getMultipart().getTurboModeThreshold(); - - CompletionStrategy strategy; - if (parts.size() > instantThreshold) { - strategy = CompletionStrategy.INSTANT; - } else if (parts.size() > turboThreshold) { - strategy = CompletionStrategy.TURBO; - } else { - strategy = CompletionStrategy.STANDARD; - } - - return executeMultipartUploadCompletion(uploadId, objectName, parts, strategy); - } - - /** - * 统一执行分片上传完成逻辑 - * 根据策略选择不同的处理方式 - */ - private MultipartUploadCompleteResult executeMultipartUploadCompletion(String uploadId, String objectName, - java.util.List parts, - CompletionStrategy strategy) throws Exception { long startTime = System.currentTimeMillis(); String bucketName = "user-uploads"; try { - log.info("开始{}处理{}个分片文件", strategy.getDescription(), parts.size()); + log.info("开始智能同步合并{}个分片文件", parts.size()); - // 1. 通用预处理:排序分片 + // 1. 预处理:排序分片 parts.sort((a, b) -> a.getPartNumber().compareTo(b.getPartNumber())); // 2. 构建合并源列表 - List sources = buildComposeSources(bucketName, objectName, parts); + List sources = parts.stream() + .map(part -> ComposeSource.builder() + .bucket(bucketName) + .object(objectName + ".part" + part.getPartNumber()) + .build()) + .collect(Collectors.toList()); - // 3. 根据策略选择处理方式 - switch (strategy) { - case INSTANT: - return executeInstantMode(bucketName, objectName, parts, sources, startTime); - case TURBO: - return executeTurboMode(bucketName, objectName, parts, sources, startTime); - case STANDARD: - default: - return executeStandardMode(bucketName, objectName, parts, sources, startTime); - } - - } catch (Exception e) { - log.error("{}失败: {}", strategy.getDescription(), e.getMessage()); - throw new Exception(strategy.getDescription() + "失败: " + e.getMessage(), e); - } - } - - /** - * 构建合并源列表 - */ - private List buildComposeSources(String bucketName, String objectName, - java.util.List parts) { - return parts.stream() - .map(part -> ComposeSource.builder() - .bucket(bucketName) - .object(objectName + ".part" + part.getPartNumber()) - .build()) - .collect(Collectors.toList()); - } - - /** - * 执行立即响应模式 - */ - private MultipartUploadCompleteResult executeInstantMode(String bucketName, String objectName, - java.util.List parts, - List sources, long startTime) { - // 1. 预生成文件URL(基于MinIO的规则) - String fileUrl = minioConfigProperties.getEndpoint() + "/" + bucketName + "/" + objectName; - - // 2. 生成临时ETag - String etag = "\"" + UUID.randomUUID().toString().replace("-", "") + "\""; - - // 3. 异步执行合并操作 - CompletableFuture.runAsync(() -> { - try { - long asyncStartTime = System.currentTimeMillis(); - log.info("开始异步合并{}个分片文件", parts.size()); - - // 执行合并 + // 3. 智能选择合并策略 - 同步执行确保文件完整性 + if (sources.size() > 100) { + // 大文件:分批并行合并(同步) + executeOptimizedBatchCompose(bucketName, objectName, sources); + } else { + // 小文件:直接合并(同步) minioClient.composeObject( ComposeObjectArgs.builder() .bucket(bucketName) @@ -504,55 +426,80 @@ public class MinioService { .sources(sources) .build() ); - - // 异步清理分片文件 - cleanupChunkFilesBatch(bucketName, objectName, parts); - - long asyncTotalTime = System.currentTimeMillis() - asyncStartTime; - log.info("异步合并完成,耗时: {}ms", asyncTotalTime); - - } catch (Exception e) { - log.error("异步合并分片文件失败", e); } - }, executorService); - - long totalTime = System.currentTimeMillis() - startTime; - log.info("立即响应模式完成,响应耗时: {}ms", totalTime); - - return new MultipartUploadCompleteResult(fileUrl, etag); + + long composeTime = System.currentTimeMillis(); + log.info("同步合并完成,耗时: {}ms", composeTime - startTime); + + // 4. 并行获取文件信息和URL(同步) + CompletableFuture urlFuture = CompletableFuture.supplyAsync(() -> { + try { + return minioClient.getPresignedObjectUrl( + GetPresignedObjectUrlArgs.builder() + .method(Method.GET) + .bucket(bucketName) + .object(objectName) + .build() + ); + } catch (Exception e) { + log.warn("生成预签名URL失败,使用基本URL", e); + return minioConfigProperties.getEndpoint() + "/" + bucketName + "/" + objectName; + } + }, executorService); + + CompletableFuture statFuture = CompletableFuture.supplyAsync(() -> { + try { + return minioClient.statObject( + StatObjectArgs.builder() + .bucket(bucketName) + .object(objectName) + .build() + ); + } catch (Exception e) { + throw new RuntimeException("获取文件信息失败", e); + } + }, executorService); + + // 5. 异步清理分片文件(不阻塞响应) + cleanupChunkFilesBatch(bucketName, objectName, parts); + + // 6. 等待URL和文件信息生成(最多3秒) + String fileUrl = urlFuture.get(3, TimeUnit.SECONDS); + StatObjectResponse stat = statFuture.get(3, TimeUnit.SECONDS); + + long totalTime = System.currentTimeMillis() - startTime; + log.info("智能同步合并完成,文件大小: {}, 总耗时: {}ms", stat.size(), totalTime); + + return new MultipartUploadCompleteResult(fileUrl, stat.etag()); + + } catch (Exception e) { + log.error("智能同步合并失败", e); + throw new Exception("分片上传完成失败: " + e.getMessage(), e); + } } + // 删除旧的执行方法,使用新的智能合并策略 + + // 删除旧的构建方法,已内联到主方法中 + + // 删除旧的立即响应模式方法 + + // 删除旧的极速模式方法 + + // 删除旧的标准模式方法 + + // 删除旧的URL超时处理方法 + /** - * 执行极速模式 - 借鉴分批并行处理思想 + * 优化的分批并行合并策略 - 专为方案1优化 + * 确保合并操作同步完成,返回时文件已经可访问 */ - private MultipartUploadCompleteResult executeTurboMode(String bucketName, String objectName, - java.util.List parts, - List sources, long startTime) throws Exception { - // 1. 异步预生成URL(不阻塞后续操作) - CompletableFuture urlFuture = CompletableFuture.supplyAsync(() -> { - try { - return minioClient.getPresignedObjectUrl( - GetPresignedObjectUrlArgs.builder() - .method(Method.GET) - .bucket(bucketName) - .object(objectName) - .build() - ); - } catch (Exception e) { - // 如果预生成URL失败,使用基本URL规则 - return minioConfigProperties.getEndpoint() + "/" + bucketName + "/" + objectName; - } - }, executorService); + private void executeOptimizedBatchCompose(String bucketName, String objectName, List sources) throws Exception { + int batchSize = minioConfigProperties.getMultipart().getBatchSize(); + log.info("启用优化分批合并策略,总分片数: {}, 批次大小: {}", sources.size(), batchSize); - long preProcessTime = System.currentTimeMillis(); - log.info("预处理完成,耗时: {}ms", preProcessTime - startTime); - - // 2. 判断是否需要分批处理(超大文件优化) - if (sources.size() > 100) { - // 分批并行合并策略 - executeBatchCompose(bucketName, objectName, sources); - } else { - // 标准合并 + if (sources.size() <= batchSize) { + // 如果分片数量不超过批次大小,直接合并 minioClient.composeObject( ComposeObjectArgs.builder() .bucket(bucketName) @@ -560,97 +507,75 @@ public class MinioService { .sources(sources) .build() ); + return; + } + + // 1. 分批创建临时合并对象 + List tempObjects = new ArrayList<>(); + List> batchFutures = new ArrayList<>(); + + for (int i = 0; i < sources.size(); i += batchSize) { + int endIndex = Math.min(i + batchSize, sources.size()); + List batch = sources.subList(i, endIndex); + String tempObjectName = objectName + ".temp." + (i / batchSize); + tempObjects.add(tempObjectName); + + // 并行处理每个批次 + CompletableFuture batchFuture = CompletableFuture.runAsync(() -> { + try { + long batchStartTime = System.currentTimeMillis(); + minioClient.composeObject( + ComposeObjectArgs.builder() + .bucket(bucketName) + .object(tempObjectName) + .sources(batch) + .build() + ); + long batchTime = System.currentTimeMillis() - batchStartTime; + log.debug("批次合并完成: {}, 耗时: {}ms, 分片数: {}", + tempObjectName, batchTime, batch.size()); + } catch (Exception e) { + log.error("批次合并失败: {}", tempObjectName, e); + throw new RuntimeException("批次合并失败: " + tempObjectName, e); + } + }, executorService); + + batchFutures.add(batchFuture); } - long composeTime = System.currentTimeMillis(); - log.info("文件合并完成,耗时: {}ms", composeTime - preProcessTime); - - // 3. 异步清理 - cleanupChunkFilesBatch(bucketName, objectName, parts); - - // 4. 获取预生成的URL(使用配置化的超时时间) - String fileUrl = getUrlWithTimeout(urlFuture, bucketName, objectName); - - long totalTime = System.currentTimeMillis(); - log.info("极速模式完成,总耗时: {}ms", totalTime - startTime); - - // 5. 返回结果 - String etag = "\"" + UUID.randomUUID().toString().replace("-", "") + "\""; - return new MultipartUploadCompleteResult(fileUrl, etag); - } - - /** - * 执行标准模式 - */ - private MultipartUploadCompleteResult executeStandardMode(String bucketName, String objectName, - java.util.List parts, - List sources, long startTime) throws Exception { - // 1. 并行预生成URL和合并文件 - CompletableFuture urlFuture = CompletableFuture.supplyAsync(() -> { - try { - return minioClient.getPresignedObjectUrl( - GetPresignedObjectUrlArgs.builder() - .method(Method.GET) - .bucket(bucketName) - .object(objectName) - .build() - ); - } catch (Exception e) { - throw new RuntimeException("预生成URL失败", e); - } - }, executorService); - - // 2. 直接合并分片文件 - minioClient.composeObject( - ComposeObjectArgs.builder() - .bucket(bucketName) - .object(objectName) - .sources(sources) - .build() - ); - - long composeTime = System.currentTimeMillis(); - log.info("分片文件合并完成,耗时: {}ms", composeTime - startTime); - - // 3. 并行获取文件信息和URL - CompletableFuture statFuture = CompletableFuture.supplyAsync(() -> { - try { - return minioClient.statObject( - StatObjectArgs.builder() - .bucket(bucketName) - .object(objectName) - .build() - ); - } catch (Exception e) { - throw new RuntimeException("获取文件信息失败", e); - } - }, executorService); - - // 4. 异步批量清理分片文件(不阻塞响应) - cleanupChunkFilesBatch(bucketName, objectName, parts); - - // 5. 获取结果(超时时间缩短到5秒) - String fileUrl = urlFuture.get(5, TimeUnit.SECONDS); - StatObjectResponse finalStat = statFuture.get(5, TimeUnit.SECONDS); - String etag = finalStat.etag(); - - long totalTime = System.currentTimeMillis(); - log.info("标准模式完成,对象名: {}, 文件大小: {}, 总耗时: {}ms", - objectName, finalStat.size(), totalTime - startTime); - - return new MultipartUploadCompleteResult(fileUrl, etag); - } - - /** - * 获取URL(带超时处理) - */ - private String getUrlWithTimeout(CompletableFuture urlFuture, String bucketName, String objectName) { - int urlTimeout = minioConfigProperties.getMultipart().getUrlGenerateTimeout(); + // 2. 等待所有批次完成(同步) try { - return urlFuture.get(urlTimeout, TimeUnit.SECONDS); + CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0])).get(); + log.info("所有批次合并完成,共{}个批次", tempObjects.size()); } catch (Exception e) { - log.warn("获取预生成URL超时({}秒),使用基本URL", urlTimeout); - return minioConfigProperties.getEndpoint() + "/" + bucketName + "/" + objectName; + log.error("批次合并过程中出现错误", e); + // 清理已创建的临时对象 + cleanupTempObjects(bucketName, tempObjects); + throw new Exception("分批合并失败", e); + } + + // 3. 最终合并临时对象(同步) + try { + List tempSources = tempObjects.stream() + .map(tempObj -> ComposeSource.builder() + .bucket(bucketName) + .object(tempObj) + .build()) + .collect(Collectors.toList()); + + minioClient.composeObject( + ComposeObjectArgs.builder() + .bucket(bucketName) + .object(objectName) + .sources(tempSources) + .build() + ); + + log.info("优化分批合并完成,临时对象数: {}", tempObjects.size()); + + } finally { + // 4. 异步清理临时对象(不阻塞) + cleanupTempObjects(bucketName, tempObjects); } }