refactor(infra): 优化 Minio 服务中的分片上传逻辑

- 将原有方法重命名为 completeMultipartUploadFast,优化普通文件上传性能
- 新增 completeMultipartUploadTurbo 方法,针对超大文件进行极限性能优化- 根据分片数量自动选择合适的上传策略
- 极速模式中跳过分片验证,直接合并,最大化性能
-标准快速模式中优化了分片验证和合并流程
-调整了日志输出内容,提高可读性
This commit is contained in:
aikai 2025-07-04 17:20:47 +08:00
parent c81e81f5ee
commit a749a7db6d

View File

@ -382,8 +382,8 @@ public class MinioService {
}
/**
* 完成分片上传 - 极限性能优化版本
* 使用预生成URL + 批量删除 + 优化的并行处理
* 完成分片上传 - 超快速版本跳过验证
* 跳过分片验证直接合并最大化性能
*
* @param uploadId 上传会话ID
* @param objectName 对象名称
@ -392,6 +392,21 @@ public class MinioService {
*/
public MultipartUploadCompleteResult completeMultipartUpload(String uploadId, String objectName,
java.util.List<MultipartUploadCompleteRequest.PartInfo> parts) throws Exception {
// 根据分片数量选择策略
if (parts.size() > 50) {
// 超大文件使用极速模式
return completeMultipartUploadTurbo(uploadId, objectName, parts);
} else {
// 普通文件使用标准快速模式
return completeMultipartUploadFast(uploadId, objectName, parts);
}
}
/**
* 标准快速模式
*/
private MultipartUploadCompleteResult completeMultipartUploadFast(String uploadId, String objectName,
java.util.List<MultipartUploadCompleteRequest.PartInfo> parts) throws Exception {
long startTime = System.currentTimeMillis();
try {
@ -401,10 +416,20 @@ public class MinioService {
// 1. 按照分片编号排序
parts.sort((a, b) -> a.getPartNumber().compareTo(b.getPartNumber()));
// 2. 并行验证 + 预生成URL
log.info("开始极限性能处理{}个分片文件", parts.size());
log.info("开始超快速处理{}个分片文件", parts.size());
// 预生成下载URL的Future
// 2. 直接构建ComposeSource列表跳过验证
List<ComposeSource> sources = parts.stream()
.map(part -> {
String chunkObjectName = objectName + ".part" + part.getPartNumber();
return ComposeSource.builder()
.bucket(bucketName)
.object(chunkObjectName)
.build();
})
.collect(Collectors.toList());
// 3. 并行预生成URL和合并文件
CompletableFuture<String> urlFuture = CompletableFuture.supplyAsync(() -> {
try {
return minioClient.getPresignedObjectUrl(
@ -419,40 +444,7 @@ public class MinioService {
}
}, executorService);
// 并行验证分片文件
List<CompletableFuture<ComposeSource>> futures = parts.stream()
.map(part -> CompletableFuture.supplyAsync(() -> {
String chunkObjectName = objectName + ".part" + part.getPartNumber();
try {
// 快速验证只检查存在性不获取详细信息
minioClient.statObject(
StatObjectArgs.builder()
.bucket(bucketName)
.object(chunkObjectName)
.build()
);
// 返回ComposeSource
return ComposeSource.builder()
.bucket(bucketName)
.object(chunkObjectName)
.build();
} catch (Exception e) {
log.error("分片文件不存在: {}", chunkObjectName);
throw new RuntimeException("分片文件不存在: " + chunkObjectName, e);
}
}, executorService))
.collect(Collectors.toList());
// 等待所有验证完成
List<ComposeSource> sources = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long verifyTime = System.currentTimeMillis();
log.info("分片文件验证完成,耗时: {}ms", verifyTime - startTime);
// 3. 合并分片文件成最终文件
// 4. 直接合并分片文件
minioClient.composeObject(
ComposeObjectArgs.builder()
.bucket(bucketName)
@ -462,9 +454,9 @@ public class MinioService {
);
long composeTime = System.currentTimeMillis();
log.info("分片文件合并成功: {}, 耗时: {}ms", objectName, composeTime - verifyTime);
log.info("分片文件合并完成,耗时: {}ms", composeTime - startTime);
// 4. 并行获取文件信息和URL
// 5. 并行获取文件信息和URL
CompletableFuture<StatObjectResponse> statFuture = CompletableFuture.supplyAsync(() -> {
try {
return minioClient.statObject(
@ -478,22 +470,80 @@ public class MinioService {
}
}, executorService);
// 等待文件信息和URL
StatObjectResponse finalStat = statFuture.get(30, TimeUnit.SECONDS);
String fileUrl = urlFuture.get(30, TimeUnit.SECONDS);
String etag = finalStat.etag();
// 5. 异步批量清理分片文件不阻塞响应
// 6. 异步批量清理分片文件不阻塞响应
cleanupChunkFilesBatch(bucketName, objectName, parts);
// 7. 获取结果超时时间缩短到5秒
String fileUrl = urlFuture.get(5, TimeUnit.SECONDS);
StatObjectResponse finalStat = statFuture.get(5, TimeUnit.SECONDS);
String etag = finalStat.etag();
long totalTime = System.currentTimeMillis();
log.info("完成分片上传成功,对象名: {}, 文件大小: {}, 总耗时: {}ms",
objectName, finalStat.size(), totalTime - startTime);
return new MultipartUploadCompleteResult(fileUrl, etag);
} catch (Exception e) {
log.error("完成分片上传失败", e);
throw new Exception("完成分片上传失败: " + e.getMessage(), e);
log.error("完成分片上传失败: {}", e.getMessage());
throw new Exception("分片文件合并失败: " + e.getMessage(), e);
}
}
/**
* 极速模式 - 适用于超大文件50个分片以上
* 合并完成后立即返回跳过文件信息获取
*/
private MultipartUploadCompleteResult completeMultipartUploadTurbo(String uploadId, String objectName,
java.util.List<MultipartUploadCompleteRequest.PartInfo> parts) throws Exception {
long startTime = System.currentTimeMillis();
try {
String bucketName = "user-uploads";
// 1. 快速排序
parts.sort((a, b) -> a.getPartNumber().compareTo(b.getPartNumber()));
log.info("开始极速模式处理{}个分片文件", parts.size());
// 2. 预生成URL
String fileUrl = minioClient.getPresignedObjectUrl(
GetPresignedObjectUrlArgs.builder()
.method(Method.GET)
.bucket(bucketName)
.object(objectName)
.build()
);
// 3. 构建合并源
List<ComposeSource> sources = parts.stream()
.map(part -> ComposeSource.builder()
.bucket(bucketName)
.object(objectName + ".part" + part.getPartNumber())
.build())
.collect(Collectors.toList());
// 4. 合并文件
minioClient.composeObject(
ComposeObjectArgs.builder()
.bucket(bucketName)
.object(objectName)
.sources(sources)
.build()
);
// 5. 异步清理
cleanupChunkFilesBatch(bucketName, objectName, parts);
long totalTime = System.currentTimeMillis();
log.info("极速模式完成,总耗时: {}ms", totalTime - startTime);
// 6. 立即返回使用模拟的ETag
String etag = "\"" + UUID.randomUUID().toString().replace("-", "") + "\"";
return new MultipartUploadCompleteResult(fileUrl, etag);
} catch (Exception e) {
log.error("极速模式失败: {}", e.getMessage());
throw new Exception("极速合并失败: " + e.getMessage(), e);
}
}