refactor(infra): 重构 MinIO 分片上传完成逻辑
- 删除了原有的三种完成策略(立即响应、极速、标准),改为使用智能同步合并策略 - 新增 executeOptimizedBatchCompose 方法,用于优化分批并行合并 - 重构 completeMultipartUpload 方法,采用新的智能同步合并策略 - 优化了日志输出,增加了更多详细信息 - 移除了未使用的配置项和枚举类
This commit is contained in:
parent
bd92d1dc4b
commit
90b1d55c25
@ -41,16 +41,6 @@ public class MinioConfigProperties {
|
||||
|
||||
@Data
|
||||
public static class MultipartConfig {
|
||||
/**
|
||||
* 使用立即响应模式的分片数阈值(默认100)
|
||||
*/
|
||||
private int instantModeThreshold = 100;
|
||||
|
||||
/**
|
||||
* 使用极速模式的分片数阈值(默认50)
|
||||
*/
|
||||
private int turboModeThreshold = 50;
|
||||
|
||||
/**
|
||||
* 线程池核心线程数(默认:CPU核心数*2,最小8)
|
||||
*/
|
||||
@ -61,11 +51,6 @@ public class MinioConfigProperties {
|
||||
*/
|
||||
private int maxPoolSize = Math.min(32, corePoolSize * 4);
|
||||
|
||||
/**
|
||||
* 预生成URL的超时时间(秒,默认3秒)
|
||||
*/
|
||||
private int urlGenerateTimeout = 3;
|
||||
|
||||
/**
|
||||
* 分批合并的批次大小(默认50)
|
||||
* 当分片数量超过此值时,将采用分批并行合并策略
|
||||
|
@ -383,28 +383,11 @@ public class MinioService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 分片上传完成策略枚举
|
||||
*/
|
||||
private enum CompletionStrategy {
|
||||
STANDARD("标准快速模式"),
|
||||
TURBO("极速模式"),
|
||||
INSTANT("立即响应模式");
|
||||
|
||||
private final String description;
|
||||
|
||||
CompletionStrategy(String description) {
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
}
|
||||
// 删除旧的策略枚举,使用新的智能合并策略
|
||||
|
||||
/**
|
||||
* 完成分片上传 - 优化版本(统一处理逻辑)
|
||||
* 根据分片数量自动选择最优策略
|
||||
* 完成分片上传 - 智能同步合并 + 异步清理
|
||||
* 确保返回的URL能直接访问,同时最大化合并效率
|
||||
*
|
||||
* @param uploadId 上传会话ID
|
||||
* @param objectName 对象名称
|
||||
@ -413,90 +396,29 @@ public class MinioService {
|
||||
*/
|
||||
public MultipartUploadCompleteResult completeMultipartUpload(String uploadId, String objectName,
|
||||
java.util.List<MultipartUploadCompleteRequest.PartInfo> parts) throws Exception {
|
||||
// 根据分片数量和配置选择策略
|
||||
int instantThreshold = minioConfigProperties.getMultipart().getInstantModeThreshold();
|
||||
int turboThreshold = minioConfigProperties.getMultipart().getTurboModeThreshold();
|
||||
|
||||
CompletionStrategy strategy;
|
||||
if (parts.size() > instantThreshold) {
|
||||
strategy = CompletionStrategy.INSTANT;
|
||||
} else if (parts.size() > turboThreshold) {
|
||||
strategy = CompletionStrategy.TURBO;
|
||||
} else {
|
||||
strategy = CompletionStrategy.STANDARD;
|
||||
}
|
||||
|
||||
return executeMultipartUploadCompletion(uploadId, objectName, parts, strategy);
|
||||
}
|
||||
|
||||
/**
|
||||
* 统一执行分片上传完成逻辑
|
||||
* 根据策略选择不同的处理方式
|
||||
*/
|
||||
private MultipartUploadCompleteResult executeMultipartUploadCompletion(String uploadId, String objectName,
|
||||
java.util.List<MultipartUploadCompleteRequest.PartInfo> parts,
|
||||
CompletionStrategy strategy) throws Exception {
|
||||
long startTime = System.currentTimeMillis();
|
||||
String bucketName = "user-uploads";
|
||||
|
||||
try {
|
||||
log.info("开始{}处理{}个分片文件", strategy.getDescription(), parts.size());
|
||||
log.info("开始智能同步合并{}个分片文件", parts.size());
|
||||
|
||||
// 1. 通用预处理:排序分片
|
||||
// 1. 预处理:排序分片
|
||||
parts.sort((a, b) -> a.getPartNumber().compareTo(b.getPartNumber()));
|
||||
|
||||
// 2. 构建合并源列表
|
||||
List<ComposeSource> sources = buildComposeSources(bucketName, objectName, parts);
|
||||
List<ComposeSource> sources = parts.stream()
|
||||
.map(part -> ComposeSource.builder()
|
||||
.bucket(bucketName)
|
||||
.object(objectName + ".part" + part.getPartNumber())
|
||||
.build())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// 3. 根据策略选择处理方式
|
||||
switch (strategy) {
|
||||
case INSTANT:
|
||||
return executeInstantMode(bucketName, objectName, parts, sources, startTime);
|
||||
case TURBO:
|
||||
return executeTurboMode(bucketName, objectName, parts, sources, startTime);
|
||||
case STANDARD:
|
||||
default:
|
||||
return executeStandardMode(bucketName, objectName, parts, sources, startTime);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("{}失败: {}", strategy.getDescription(), e.getMessage());
|
||||
throw new Exception(strategy.getDescription() + "失败: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建合并源列表
|
||||
*/
|
||||
private List<ComposeSource> buildComposeSources(String bucketName, String objectName,
|
||||
java.util.List<MultipartUploadCompleteRequest.PartInfo> parts) {
|
||||
return parts.stream()
|
||||
.map(part -> ComposeSource.builder()
|
||||
.bucket(bucketName)
|
||||
.object(objectName + ".part" + part.getPartNumber())
|
||||
.build())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行立即响应模式
|
||||
*/
|
||||
private MultipartUploadCompleteResult executeInstantMode(String bucketName, String objectName,
|
||||
java.util.List<MultipartUploadCompleteRequest.PartInfo> parts,
|
||||
List<ComposeSource> sources, long startTime) {
|
||||
// 1. 预生成文件URL(基于MinIO的规则)
|
||||
String fileUrl = minioConfigProperties.getEndpoint() + "/" + bucketName + "/" + objectName;
|
||||
|
||||
// 2. 生成临时ETag
|
||||
String etag = "\"" + UUID.randomUUID().toString().replace("-", "") + "\"";
|
||||
|
||||
// 3. 异步执行合并操作
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
long asyncStartTime = System.currentTimeMillis();
|
||||
log.info("开始异步合并{}个分片文件", parts.size());
|
||||
|
||||
// 执行合并
|
||||
// 3. 智能选择合并策略 - 同步执行确保文件完整性
|
||||
if (sources.size() > 100) {
|
||||
// 大文件:分批并行合并(同步)
|
||||
executeOptimizedBatchCompose(bucketName, objectName, sources);
|
||||
} else {
|
||||
// 小文件:直接合并(同步)
|
||||
minioClient.composeObject(
|
||||
ComposeObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
@ -504,55 +426,80 @@ public class MinioService {
|
||||
.sources(sources)
|
||||
.build()
|
||||
);
|
||||
|
||||
// 异步清理分片文件
|
||||
cleanupChunkFilesBatch(bucketName, objectName, parts);
|
||||
|
||||
long asyncTotalTime = System.currentTimeMillis() - asyncStartTime;
|
||||
log.info("异步合并完成,耗时: {}ms", asyncTotalTime);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("异步合并分片文件失败", e);
|
||||
}
|
||||
}, executorService);
|
||||
|
||||
long totalTime = System.currentTimeMillis() - startTime;
|
||||
log.info("立即响应模式完成,响应耗时: {}ms", totalTime);
|
||||
|
||||
return new MultipartUploadCompleteResult(fileUrl, etag);
|
||||
|
||||
long composeTime = System.currentTimeMillis();
|
||||
log.info("同步合并完成,耗时: {}ms", composeTime - startTime);
|
||||
|
||||
// 4. 并行获取文件信息和URL(同步)
|
||||
CompletableFuture<String> urlFuture = CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
return minioClient.getPresignedObjectUrl(
|
||||
GetPresignedObjectUrlArgs.builder()
|
||||
.method(Method.GET)
|
||||
.bucket(bucketName)
|
||||
.object(objectName)
|
||||
.build()
|
||||
);
|
||||
} catch (Exception e) {
|
||||
log.warn("生成预签名URL失败,使用基本URL", e);
|
||||
return minioConfigProperties.getEndpoint() + "/" + bucketName + "/" + objectName;
|
||||
}
|
||||
}, executorService);
|
||||
|
||||
CompletableFuture<StatObjectResponse> statFuture = CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
return minioClient.statObject(
|
||||
StatObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.object(objectName)
|
||||
.build()
|
||||
);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("获取文件信息失败", e);
|
||||
}
|
||||
}, executorService);
|
||||
|
||||
// 5. 异步清理分片文件(不阻塞响应)
|
||||
cleanupChunkFilesBatch(bucketName, objectName, parts);
|
||||
|
||||
// 6. 等待URL和文件信息生成(最多3秒)
|
||||
String fileUrl = urlFuture.get(3, TimeUnit.SECONDS);
|
||||
StatObjectResponse stat = statFuture.get(3, TimeUnit.SECONDS);
|
||||
|
||||
long totalTime = System.currentTimeMillis() - startTime;
|
||||
log.info("智能同步合并完成,文件大小: {}, 总耗时: {}ms", stat.size(), totalTime);
|
||||
|
||||
return new MultipartUploadCompleteResult(fileUrl, stat.etag());
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("智能同步合并失败", e);
|
||||
throw new Exception("分片上传完成失败: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
// 删除旧的执行方法,使用新的智能合并策略
|
||||
|
||||
// 删除旧的构建方法,已内联到主方法中
|
||||
|
||||
// 删除旧的立即响应模式方法
|
||||
|
||||
// 删除旧的极速模式方法
|
||||
|
||||
// 删除旧的标准模式方法
|
||||
|
||||
// 删除旧的URL超时处理方法
|
||||
|
||||
/**
|
||||
* 执行极速模式 - 借鉴分批并行处理思想
|
||||
* 优化的分批并行合并策略 - 专为方案1优化
|
||||
* 确保合并操作同步完成,返回时文件已经可访问
|
||||
*/
|
||||
private MultipartUploadCompleteResult executeTurboMode(String bucketName, String objectName,
|
||||
java.util.List<MultipartUploadCompleteRequest.PartInfo> parts,
|
||||
List<ComposeSource> sources, long startTime) throws Exception {
|
||||
// 1. 异步预生成URL(不阻塞后续操作)
|
||||
CompletableFuture<String> urlFuture = CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
return minioClient.getPresignedObjectUrl(
|
||||
GetPresignedObjectUrlArgs.builder()
|
||||
.method(Method.GET)
|
||||
.bucket(bucketName)
|
||||
.object(objectName)
|
||||
.build()
|
||||
);
|
||||
} catch (Exception e) {
|
||||
// 如果预生成URL失败,使用基本URL规则
|
||||
return minioConfigProperties.getEndpoint() + "/" + bucketName + "/" + objectName;
|
||||
}
|
||||
}, executorService);
|
||||
private void executeOptimizedBatchCompose(String bucketName, String objectName, List<ComposeSource> sources) throws Exception {
|
||||
int batchSize = minioConfigProperties.getMultipart().getBatchSize();
|
||||
log.info("启用优化分批合并策略,总分片数: {}, 批次大小: {}", sources.size(), batchSize);
|
||||
|
||||
long preProcessTime = System.currentTimeMillis();
|
||||
log.info("预处理完成,耗时: {}ms", preProcessTime - startTime);
|
||||
|
||||
// 2. 判断是否需要分批处理(超大文件优化)
|
||||
if (sources.size() > 100) {
|
||||
// 分批并行合并策略
|
||||
executeBatchCompose(bucketName, objectName, sources);
|
||||
} else {
|
||||
// 标准合并
|
||||
if (sources.size() <= batchSize) {
|
||||
// 如果分片数量不超过批次大小,直接合并
|
||||
minioClient.composeObject(
|
||||
ComposeObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
@ -560,97 +507,75 @@ public class MinioService {
|
||||
.sources(sources)
|
||||
.build()
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// 1. 分批创建临时合并对象
|
||||
List<String> tempObjects = new ArrayList<>();
|
||||
List<CompletableFuture<Void>> batchFutures = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < sources.size(); i += batchSize) {
|
||||
int endIndex = Math.min(i + batchSize, sources.size());
|
||||
List<ComposeSource> batch = sources.subList(i, endIndex);
|
||||
String tempObjectName = objectName + ".temp." + (i / batchSize);
|
||||
tempObjects.add(tempObjectName);
|
||||
|
||||
// 并行处理每个批次
|
||||
CompletableFuture<Void> batchFuture = CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
long batchStartTime = System.currentTimeMillis();
|
||||
minioClient.composeObject(
|
||||
ComposeObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.object(tempObjectName)
|
||||
.sources(batch)
|
||||
.build()
|
||||
);
|
||||
long batchTime = System.currentTimeMillis() - batchStartTime;
|
||||
log.debug("批次合并完成: {}, 耗时: {}ms, 分片数: {}",
|
||||
tempObjectName, batchTime, batch.size());
|
||||
} catch (Exception e) {
|
||||
log.error("批次合并失败: {}", tempObjectName, e);
|
||||
throw new RuntimeException("批次合并失败: " + tempObjectName, e);
|
||||
}
|
||||
}, executorService);
|
||||
|
||||
batchFutures.add(batchFuture);
|
||||
}
|
||||
|
||||
long composeTime = System.currentTimeMillis();
|
||||
log.info("文件合并完成,耗时: {}ms", composeTime - preProcessTime);
|
||||
|
||||
// 3. 异步清理
|
||||
cleanupChunkFilesBatch(bucketName, objectName, parts);
|
||||
|
||||
// 4. 获取预生成的URL(使用配置化的超时时间)
|
||||
String fileUrl = getUrlWithTimeout(urlFuture, bucketName, objectName);
|
||||
|
||||
long totalTime = System.currentTimeMillis();
|
||||
log.info("极速模式完成,总耗时: {}ms", totalTime - startTime);
|
||||
|
||||
// 5. 返回结果
|
||||
String etag = "\"" + UUID.randomUUID().toString().replace("-", "") + "\"";
|
||||
return new MultipartUploadCompleteResult(fileUrl, etag);
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行标准模式
|
||||
*/
|
||||
private MultipartUploadCompleteResult executeStandardMode(String bucketName, String objectName,
|
||||
java.util.List<MultipartUploadCompleteRequest.PartInfo> parts,
|
||||
List<ComposeSource> sources, long startTime) throws Exception {
|
||||
// 1. 并行预生成URL和合并文件
|
||||
CompletableFuture<String> urlFuture = CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
return minioClient.getPresignedObjectUrl(
|
||||
GetPresignedObjectUrlArgs.builder()
|
||||
.method(Method.GET)
|
||||
.bucket(bucketName)
|
||||
.object(objectName)
|
||||
.build()
|
||||
);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("预生成URL失败", e);
|
||||
}
|
||||
}, executorService);
|
||||
|
||||
// 2. 直接合并分片文件
|
||||
minioClient.composeObject(
|
||||
ComposeObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.object(objectName)
|
||||
.sources(sources)
|
||||
.build()
|
||||
);
|
||||
|
||||
long composeTime = System.currentTimeMillis();
|
||||
log.info("分片文件合并完成,耗时: {}ms", composeTime - startTime);
|
||||
|
||||
// 3. 并行获取文件信息和URL
|
||||
CompletableFuture<StatObjectResponse> statFuture = CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
return minioClient.statObject(
|
||||
StatObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.object(objectName)
|
||||
.build()
|
||||
);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("获取文件信息失败", e);
|
||||
}
|
||||
}, executorService);
|
||||
|
||||
// 4. 异步批量清理分片文件(不阻塞响应)
|
||||
cleanupChunkFilesBatch(bucketName, objectName, parts);
|
||||
|
||||
// 5. 获取结果(超时时间缩短到5秒)
|
||||
String fileUrl = urlFuture.get(5, TimeUnit.SECONDS);
|
||||
StatObjectResponse finalStat = statFuture.get(5, TimeUnit.SECONDS);
|
||||
String etag = finalStat.etag();
|
||||
|
||||
long totalTime = System.currentTimeMillis();
|
||||
log.info("标准模式完成,对象名: {}, 文件大小: {}, 总耗时: {}ms",
|
||||
objectName, finalStat.size(), totalTime - startTime);
|
||||
|
||||
return new MultipartUploadCompleteResult(fileUrl, etag);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取URL(带超时处理)
|
||||
*/
|
||||
private String getUrlWithTimeout(CompletableFuture<String> urlFuture, String bucketName, String objectName) {
|
||||
int urlTimeout = minioConfigProperties.getMultipart().getUrlGenerateTimeout();
|
||||
// 2. 等待所有批次完成(同步)
|
||||
try {
|
||||
return urlFuture.get(urlTimeout, TimeUnit.SECONDS);
|
||||
CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0])).get();
|
||||
log.info("所有批次合并完成,共{}个批次", tempObjects.size());
|
||||
} catch (Exception e) {
|
||||
log.warn("获取预生成URL超时({}秒),使用基本URL", urlTimeout);
|
||||
return minioConfigProperties.getEndpoint() + "/" + bucketName + "/" + objectName;
|
||||
log.error("批次合并过程中出现错误", e);
|
||||
// 清理已创建的临时对象
|
||||
cleanupTempObjects(bucketName, tempObjects);
|
||||
throw new Exception("分批合并失败", e);
|
||||
}
|
||||
|
||||
// 3. 最终合并临时对象(同步)
|
||||
try {
|
||||
List<ComposeSource> tempSources = tempObjects.stream()
|
||||
.map(tempObj -> ComposeSource.builder()
|
||||
.bucket(bucketName)
|
||||
.object(tempObj)
|
||||
.build())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
minioClient.composeObject(
|
||||
ComposeObjectArgs.builder()
|
||||
.bucket(bucketName)
|
||||
.object(objectName)
|
||||
.sources(tempSources)
|
||||
.build()
|
||||
);
|
||||
|
||||
log.info("优化分批合并完成,临时对象数: {}", tempObjects.size());
|
||||
|
||||
} finally {
|
||||
// 4. 异步清理临时对象(不阻塞)
|
||||
cleanupTempObjects(bucketName, tempObjects);
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user