diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/config/MinioConfigProperties.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/config/MinioConfigProperties.java index 16b70985..9f7762ea 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/config/MinioConfigProperties.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/config/MinioConfigProperties.java @@ -34,4 +34,43 @@ public class MinioConfigProperties { */ private String bucketName; + /** + * 分片上传性能配置 + */ + private MultipartConfig multipart = new MultipartConfig(); + + @Data + public static class MultipartConfig { + /** + * 使用立即响应模式的分片数阈值(默认100) + */ + private int instantModeThreshold = 100; + + /** + * 使用极速模式的分片数阈值(默认50) + */ + private int turboModeThreshold = 50; + + /** + * 线程池核心线程数(默认:CPU核心数*2,最小8) + */ + private int corePoolSize = Math.max(8, Runtime.getRuntime().availableProcessors() * 2); + + /** + * 线程池最大线程数(默认:核心线程数*4,最大32) + */ + private int maxPoolSize = Math.min(32, corePoolSize * 4); + + /** + * 预生成URL的超时时间(秒,默认3秒) + */ + private int urlGenerateTimeout = 3; + + /** + * 分批合并的批次大小(默认50) + * 当分片数量超过此值时,将采用分批并行合并策略 + */ + private int batchSize = 50; + } + } \ No newline at end of file diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/minio/MinioService.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/minio/MinioService.java index db72e7b0..d55dea6a 100644 --- a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/minio/MinioService.java +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/minio/MinioService.java @@ -67,11 +67,13 @@ public class MinioService { .credentials(minioConfigProperties.getAccessKey(), minioConfigProperties.getSecretKey()) .build(); - // 初始化线程池,用于并行处理(优化配置) - int corePoolSize = Math.max(4, Runtime.getRuntime().availableProcessors()); - this.executorService = Executors.newFixedThreadPool(corePoolSize * 2, r -> { + // 初始化线程池,用于并行处理(使用配置化的线程池大小) + int corePoolSize = minioConfigProperties.getMultipart().getCorePoolSize(); + int maxPoolSize = minioConfigProperties.getMultipart().getMaxPoolSize(); + this.executorService = Executors.newFixedThreadPool(maxPoolSize, r -> { Thread t = new Thread(r, "minio-parallel-pool"); t.setDaemon(true); + t.setPriority(Thread.NORM_PRIORITY + 1); // 提高优先级 return t; }); @@ -382,8 +384,27 @@ public class MinioService { } /** - * 完成分片上传 - 超快速版本(跳过验证) - * 跳过分片验证,直接合并,最大化性能 + * 分片上传完成策略枚举 + */ + private enum CompletionStrategy { + STANDARD("标准快速模式"), + TURBO("极速模式"), + INSTANT("立即响应模式"); + + private final String description; + + CompletionStrategy(String description) { + this.description = description; + } + + public String getDescription() { + return description; + } + } + + /** + * 完成分片上传 - 优化版本(统一处理逻辑) + * 根据分片数量自动选择最优策略 * * @param uploadId 上传会话ID * @param objectName 对象名称 @@ -392,59 +413,146 @@ public class MinioService { */ public MultipartUploadCompleteResult completeMultipartUpload(String uploadId, String objectName, java.util.List parts) throws Exception { - // 根据分片数量选择策略 - if (parts.size() > 50) { - // 超大文件使用极速模式 - return completeMultipartUploadTurbo(uploadId, objectName, parts); + // 根据分片数量和配置选择策略 + int instantThreshold = minioConfigProperties.getMultipart().getInstantModeThreshold(); + int turboThreshold = minioConfigProperties.getMultipart().getTurboModeThreshold(); + + CompletionStrategy strategy; + if (parts.size() > instantThreshold) { + strategy = CompletionStrategy.INSTANT; + } else if (parts.size() > turboThreshold) { + strategy = CompletionStrategy.TURBO; } else { - // 普通文件使用标准快速模式 - return completeMultipartUploadFast(uploadId, objectName, parts); + strategy = CompletionStrategy.STANDARD; + } + + return executeMultipartUploadCompletion(uploadId, objectName, parts, strategy); + } + + /** + * 统一执行分片上传完成逻辑 + * 根据策略选择不同的处理方式 + */ + private MultipartUploadCompleteResult executeMultipartUploadCompletion(String uploadId, String objectName, + java.util.List parts, + CompletionStrategy strategy) throws Exception { + long startTime = System.currentTimeMillis(); + String bucketName = "user-uploads"; + + try { + log.info("开始{}处理{}个分片文件", strategy.getDescription(), parts.size()); + + // 1. 通用预处理:排序分片 + parts.sort((a, b) -> a.getPartNumber().compareTo(b.getPartNumber())); + + // 2. 构建合并源列表 + List sources = buildComposeSources(bucketName, objectName, parts); + + // 3. 根据策略选择处理方式 + switch (strategy) { + case INSTANT: + return executeInstantMode(bucketName, objectName, parts, sources, startTime); + case TURBO: + return executeTurboMode(bucketName, objectName, parts, sources, startTime); + case STANDARD: + default: + return executeStandardMode(bucketName, objectName, parts, sources, startTime); + } + + } catch (Exception e) { + log.error("{}失败: {}", strategy.getDescription(), e.getMessage()); + throw new Exception(strategy.getDescription() + "失败: " + e.getMessage(), e); } } /** - * 标准快速模式 + * 构建合并源列表 */ - private MultipartUploadCompleteResult completeMultipartUploadFast(String uploadId, String objectName, - java.util.List parts) throws Exception { - long startTime = System.currentTimeMillis(); + private List buildComposeSources(String bucketName, String objectName, + java.util.List parts) { + return parts.stream() + .map(part -> ComposeSource.builder() + .bucket(bucketName) + .object(objectName + ".part" + part.getPartNumber()) + .build()) + .collect(Collectors.toList()); + } + + /** + * 执行立即响应模式 + */ + private MultipartUploadCompleteResult executeInstantMode(String bucketName, String objectName, + java.util.List parts, + List sources, long startTime) { + // 1. 预生成文件URL(基于MinIO的规则) + String fileUrl = minioConfigProperties.getEndpoint() + "/" + bucketName + "/" + objectName; - try { - // 使用固定的 user-uploads bucket - String bucketName = "user-uploads"; - - // 1. 按照分片编号排序 - parts.sort((a, b) -> a.getPartNumber().compareTo(b.getPartNumber())); - - log.info("开始超快速处理{}个分片文件", parts.size()); - - // 2. 直接构建ComposeSource列表(跳过验证) - List sources = parts.stream() - .map(part -> { - String chunkObjectName = objectName + ".part" + part.getPartNumber(); - return ComposeSource.builder() + // 2. 生成临时ETag + String etag = "\"" + UUID.randomUUID().toString().replace("-", "") + "\""; + + // 3. 异步执行合并操作 + CompletableFuture.runAsync(() -> { + try { + long asyncStartTime = System.currentTimeMillis(); + log.info("开始异步合并{}个分片文件", parts.size()); + + // 执行合并 + minioClient.composeObject( + ComposeObjectArgs.builder() .bucket(bucketName) - .object(chunkObjectName) - .build(); - }) - .collect(Collectors.toList()); + .object(objectName) + .sources(sources) + .build() + ); + + // 异步清理分片文件 + cleanupChunkFilesBatch(bucketName, objectName, parts); + + long asyncTotalTime = System.currentTimeMillis() - asyncStartTime; + log.info("异步合并完成,耗时: {}ms", asyncTotalTime); + + } catch (Exception e) { + log.error("异步合并分片文件失败", e); + } + }, executorService); + + long totalTime = System.currentTimeMillis() - startTime; + log.info("立即响应模式完成,响应耗时: {}ms", totalTime); + + return new MultipartUploadCompleteResult(fileUrl, etag); + } - // 3. 并行预生成URL和合并文件 - CompletableFuture urlFuture = CompletableFuture.supplyAsync(() -> { - try { - return minioClient.getPresignedObjectUrl( - GetPresignedObjectUrlArgs.builder() - .method(Method.GET) - .bucket(bucketName) - .object(objectName) - .build() - ); - } catch (Exception e) { - throw new RuntimeException("预生成URL失败", e); - } - }, executorService); - - // 4. 直接合并分片文件 + /** + * 执行极速模式 - 借鉴分批并行处理思想 + */ + private MultipartUploadCompleteResult executeTurboMode(String bucketName, String objectName, + java.util.List parts, + List sources, long startTime) throws Exception { + // 1. 异步预生成URL(不阻塞后续操作) + CompletableFuture urlFuture = CompletableFuture.supplyAsync(() -> { + try { + return minioClient.getPresignedObjectUrl( + GetPresignedObjectUrlArgs.builder() + .method(Method.GET) + .bucket(bucketName) + .object(objectName) + .build() + ); + } catch (Exception e) { + // 如果预生成URL失败,使用基本URL规则 + return minioConfigProperties.getEndpoint() + "/" + bucketName + "/" + objectName; + } + }, executorService); + + long preProcessTime = System.currentTimeMillis(); + log.info("预处理完成,耗时: {}ms", preProcessTime - startTime); + + // 2. 判断是否需要分批处理(超大文件优化) + if (sources.size() > 100) { + // 分批并行合并策略 + executeBatchCompose(bucketName, objectName, sources); + } else { + // 标准合并 minioClient.composeObject( ComposeObjectArgs.builder() .bucket(bucketName) @@ -452,101 +560,223 @@ public class MinioService { .sources(sources) .build() ); + } + + long composeTime = System.currentTimeMillis(); + log.info("文件合并完成,耗时: {}ms", composeTime - preProcessTime); + + // 3. 异步清理 + cleanupChunkFilesBatch(bucketName, objectName, parts); + + // 4. 获取预生成的URL(使用配置化的超时时间) + String fileUrl = getUrlWithTimeout(urlFuture, bucketName, objectName); + + long totalTime = System.currentTimeMillis(); + log.info("极速模式完成,总耗时: {}ms", totalTime - startTime); + + // 5. 返回结果 + String etag = "\"" + UUID.randomUUID().toString().replace("-", "") + "\""; + return new MultipartUploadCompleteResult(fileUrl, etag); + } - long composeTime = System.currentTimeMillis(); - log.info("分片文件合并完成,耗时: {}ms", composeTime - startTime); + /** + * 执行标准模式 + */ + private MultipartUploadCompleteResult executeStandardMode(String bucketName, String objectName, + java.util.List parts, + List sources, long startTime) throws Exception { + // 1. 并行预生成URL和合并文件 + CompletableFuture urlFuture = CompletableFuture.supplyAsync(() -> { + try { + return minioClient.getPresignedObjectUrl( + GetPresignedObjectUrlArgs.builder() + .method(Method.GET) + .bucket(bucketName) + .object(objectName) + .build() + ); + } catch (Exception e) { + throw new RuntimeException("预生成URL失败", e); + } + }, executorService); - // 5. 并行获取文件信息和URL - CompletableFuture statFuture = CompletableFuture.supplyAsync(() -> { - try { - return minioClient.statObject( - StatObjectArgs.builder() - .bucket(bucketName) - .object(objectName) - .build() - ); - } catch (Exception e) { - throw new RuntimeException("获取文件信息失败", e); - } - }, executorService); + // 2. 直接合并分片文件 + minioClient.composeObject( + ComposeObjectArgs.builder() + .bucket(bucketName) + .object(objectName) + .sources(sources) + .build() + ); - // 6. 异步批量清理分片文件(不阻塞响应) - cleanupChunkFilesBatch(bucketName, objectName, parts); + long composeTime = System.currentTimeMillis(); + log.info("分片文件合并完成,耗时: {}ms", composeTime - startTime); - // 7. 获取结果(超时时间缩短到5秒) - String fileUrl = urlFuture.get(5, TimeUnit.SECONDS); - StatObjectResponse finalStat = statFuture.get(5, TimeUnit.SECONDS); - String etag = finalStat.etag(); + // 3. 并行获取文件信息和URL + CompletableFuture statFuture = CompletableFuture.supplyAsync(() -> { + try { + return minioClient.statObject( + StatObjectArgs.builder() + .bucket(bucketName) + .object(objectName) + .build() + ); + } catch (Exception e) { + throw new RuntimeException("获取文件信息失败", e); + } + }, executorService); - long totalTime = System.currentTimeMillis(); - log.info("完成分片上传成功,对象名: {}, 文件大小: {}, 总耗时: {}ms", - objectName, finalStat.size(), totalTime - startTime); - - return new MultipartUploadCompleteResult(fileUrl, etag); + // 4. 异步批量清理分片文件(不阻塞响应) + cleanupChunkFilesBatch(bucketName, objectName, parts); + + // 5. 获取结果(超时时间缩短到5秒) + String fileUrl = urlFuture.get(5, TimeUnit.SECONDS); + StatObjectResponse finalStat = statFuture.get(5, TimeUnit.SECONDS); + String etag = finalStat.etag(); + + long totalTime = System.currentTimeMillis(); + log.info("标准模式完成,对象名: {}, 文件大小: {}, 总耗时: {}ms", + objectName, finalStat.size(), totalTime - startTime); + + return new MultipartUploadCompleteResult(fileUrl, etag); + } + + /** + * 获取URL(带超时处理) + */ + private String getUrlWithTimeout(CompletableFuture urlFuture, String bucketName, String objectName) { + int urlTimeout = minioConfigProperties.getMultipart().getUrlGenerateTimeout(); + try { + return urlFuture.get(urlTimeout, TimeUnit.SECONDS); } catch (Exception e) { - log.error("完成分片上传失败: {}", e.getMessage()); - throw new Exception("分片文件合并失败: " + e.getMessage(), e); + log.warn("获取预生成URL超时({}秒),使用基本URL", urlTimeout); + return minioConfigProperties.getEndpoint() + "/" + bucketName + "/" + objectName; } } /** - * 极速模式 - 适用于超大文件(50个分片以上) - * 合并完成后立即返回,跳过文件信息获取 + * 分批并行合并策略 - 借鉴优秀实践 + * 将大量分片分批处理,减少单次操作的复杂度 */ - private MultipartUploadCompleteResult completeMultipartUploadTurbo(String uploadId, String objectName, - java.util.List parts) throws Exception { - long startTime = System.currentTimeMillis(); + private void executeBatchCompose(String bucketName, String objectName, List sources) throws Exception { + int batchSize = minioConfigProperties.getMultipart().getBatchSize(); + log.info("启用分批合并策略,总分片数: {}, 批次大小: {}", sources.size(), batchSize); - try { - String bucketName = "user-uploads"; - - // 1. 快速排序 - parts.sort((a, b) -> a.getPartNumber().compareTo(b.getPartNumber())); - - log.info("开始极速模式处理{}个分片文件", parts.size()); - - // 2. 预生成URL - String fileUrl = minioClient.getPresignedObjectUrl( - GetPresignedObjectUrlArgs.builder() - .method(Method.GET) + if (sources.size() <= batchSize) { + // 如果分片数量不超过批次大小,直接合并 + minioClient.composeObject( + ComposeObjectArgs.builder() .bucket(bucketName) .object(objectName) + .sources(sources) .build() ); + return; + } + + // 1. 分批创建临时合并对象 + List tempObjects = new ArrayList<>(); + List> batchFutures = new ArrayList<>(); + + for (int i = 0; i < sources.size(); i += batchSize) { + int endIndex = Math.min(i + batchSize, sources.size()); + List batch = sources.subList(i, endIndex); + String tempObjectName = objectName + ".temp." + (i / batchSize); + tempObjects.add(tempObjectName); - // 3. 构建合并源 - List sources = parts.stream() - .map(part -> ComposeSource.builder() + // 并行处理每个批次 + CompletableFuture batchFuture = CompletableFuture.runAsync(() -> { + try { + long batchStartTime = System.currentTimeMillis(); + minioClient.composeObject( + ComposeObjectArgs.builder() + .bucket(bucketName) + .object(tempObjectName) + .sources(batch) + .build() + ); + long batchTime = System.currentTimeMillis() - batchStartTime; + log.debug("批次合并完成: {}, 耗时: {}ms, 分片数: {}", + tempObjectName, batchTime, batch.size()); + } catch (Exception e) { + log.error("批次合并失败: {}", tempObjectName, e); + throw new RuntimeException("批次合并失败: " + tempObjectName, e); + } + }, executorService); + + batchFutures.add(batchFuture); + } + + // 2. 等待所有批次完成 + try { + CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0])).get(); + log.info("所有批次合并完成,共{}个批次", tempObjects.size()); + } catch (Exception e) { + log.error("批次合并过程中出现错误", e); + // 清理已创建的临时对象 + cleanupTempObjects(bucketName, tempObjects); + throw new Exception("分批合并失败", e); + } + + // 3. 最终合并临时对象 + try { + List tempSources = tempObjects.stream() + .map(tempObj -> ComposeSource.builder() .bucket(bucketName) - .object(objectName + ".part" + part.getPartNumber()) + .object(tempObj) .build()) .collect(Collectors.toList()); - // 4. 合并文件 minioClient.composeObject( ComposeObjectArgs.builder() .bucket(bucketName) .object(objectName) - .sources(sources) + .sources(tempSources) .build() ); - // 5. 异步清理 - cleanupChunkFilesBatch(bucketName, objectName, parts); + log.info("最终合并完成,临时对象数: {}", tempObjects.size()); - long totalTime = System.currentTimeMillis(); - log.info("极速模式完成,总耗时: {}ms", totalTime - startTime); - - // 6. 立即返回(使用模拟的ETag) - String etag = "\"" + UUID.randomUUID().toString().replace("-", "") + "\""; - return new MultipartUploadCompleteResult(fileUrl, etag); - - } catch (Exception e) { - log.error("极速模式失败: {}", e.getMessage()); - throw new Exception("极速合并失败: " + e.getMessage(), e); + } finally { + // 4. 清理临时对象 + cleanupTempObjects(bucketName, tempObjects); } } + /** + * 清理临时对象 + */ + private void cleanupTempObjects(String bucketName, List tempObjects) { + if (tempObjects.isEmpty()) { + return; + } + + CompletableFuture.runAsync(() -> { + try { + List deleteObjects = tempObjects.stream() + .map(DeleteObject::new) + .collect(Collectors.toList()); + + minioClient.removeObjects( + RemoveObjectsArgs.builder() + .bucket(bucketName) + .objects(deleteObjects) + .build() + ); + + log.info("临时对象清理完成,清理数量: {}", tempObjects.size()); + } catch (Exception e) { + log.error("清理临时对象失败", e); + } + }, executorService); + } + + + + + + + /** * 异步批量清理分片文件 * 使用MinIO批量删除API,性能更优