feat(infra): 优化 MinIO 分片上传性能和体验

- 新增分片上传性能配置,包括线程池大小、URL生成超时等
- 实现了三种分片上传完成策略:标准快速模式、极速模式、立即响应模式
- 采用分批并行处理技术,提高大文件上传性能
- 优化了线程池配置和异步处理逻辑,提升并发处理能力
-增加了日志记录和异常处理,提高系统稳定性和可监控性
This commit is contained in:
aikai 2025-07-04 20:47:52 +08:00
parent a749a7db6d
commit bd92d1dc4b
2 changed files with 381 additions and 112 deletions

View File

@ -34,4 +34,43 @@ public class MinioConfigProperties {
*/
private String bucketName;
/**
* 分片上传性能配置
*/
private MultipartConfig multipart = new MultipartConfig();
@Data
public static class MultipartConfig {
/**
* 使用立即响应模式的分片数阈值默认100
*/
private int instantModeThreshold = 100;
/**
* 使用极速模式的分片数阈值默认50
*/
private int turboModeThreshold = 50;
/**
* 线程池核心线程数默认CPU核心数*2最小8
*/
private int corePoolSize = Math.max(8, Runtime.getRuntime().availableProcessors() * 2);
/**
* 线程池最大线程数默认核心线程数*4最大32
*/
private int maxPoolSize = Math.min(32, corePoolSize * 4);
/**
* 预生成URL的超时时间默认3秒
*/
private int urlGenerateTimeout = 3;
/**
* 分批合并的批次大小默认50
* 当分片数量超过此值时将采用分批并行合并策略
*/
private int batchSize = 50;
}
}

View File

@ -67,11 +67,13 @@ public class MinioService {
.credentials(minioConfigProperties.getAccessKey(), minioConfigProperties.getSecretKey())
.build();
// 初始化线程池用于并行处理优化配置
int corePoolSize = Math.max(4, Runtime.getRuntime().availableProcessors());
this.executorService = Executors.newFixedThreadPool(corePoolSize * 2, r -> {
// 初始化线程池用于并行处理使用配置化的线程池大小
int corePoolSize = minioConfigProperties.getMultipart().getCorePoolSize();
int maxPoolSize = minioConfigProperties.getMultipart().getMaxPoolSize();
this.executorService = Executors.newFixedThreadPool(maxPoolSize, r -> {
Thread t = new Thread(r, "minio-parallel-pool");
t.setDaemon(true);
t.setPriority(Thread.NORM_PRIORITY + 1); // 提高优先级
return t;
});
@ -382,8 +384,27 @@ public class MinioService {
}
/**
* 完成分片上传 - 超快速版本跳过验证
* 跳过分片验证直接合并最大化性能
* 分片上传完成策略枚举
*/
private enum CompletionStrategy {
STANDARD("标准快速模式"),
TURBO("极速模式"),
INSTANT("立即响应模式");
private final String description;
CompletionStrategy(String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}
/**
* 完成分片上传 - 优化版本统一处理逻辑
* 根据分片数量自动选择最优策略
*
* @param uploadId 上传会话ID
* @param objectName 对象名称
@ -392,59 +413,146 @@ public class MinioService {
*/
public MultipartUploadCompleteResult completeMultipartUpload(String uploadId, String objectName,
java.util.List<MultipartUploadCompleteRequest.PartInfo> parts) throws Exception {
// 根据分片数量选择策略
if (parts.size() > 50) {
// 超大文件使用极速模式
return completeMultipartUploadTurbo(uploadId, objectName, parts);
// 根据分片数量和配置选择策略
int instantThreshold = minioConfigProperties.getMultipart().getInstantModeThreshold();
int turboThreshold = minioConfigProperties.getMultipart().getTurboModeThreshold();
CompletionStrategy strategy;
if (parts.size() > instantThreshold) {
strategy = CompletionStrategy.INSTANT;
} else if (parts.size() > turboThreshold) {
strategy = CompletionStrategy.TURBO;
} else {
// 普通文件使用标准快速模式
return completeMultipartUploadFast(uploadId, objectName, parts);
strategy = CompletionStrategy.STANDARD;
}
return executeMultipartUploadCompletion(uploadId, objectName, parts, strategy);
}
/**
* 统一执行分片上传完成逻辑
* 根据策略选择不同的处理方式
*/
private MultipartUploadCompleteResult executeMultipartUploadCompletion(String uploadId, String objectName,
java.util.List<MultipartUploadCompleteRequest.PartInfo> parts,
CompletionStrategy strategy) throws Exception {
long startTime = System.currentTimeMillis();
String bucketName = "user-uploads";
try {
log.info("开始{}处理{}个分片文件", strategy.getDescription(), parts.size());
// 1. 通用预处理排序分片
parts.sort((a, b) -> a.getPartNumber().compareTo(b.getPartNumber()));
// 2. 构建合并源列表
List<ComposeSource> sources = buildComposeSources(bucketName, objectName, parts);
// 3. 根据策略选择处理方式
switch (strategy) {
case INSTANT:
return executeInstantMode(bucketName, objectName, parts, sources, startTime);
case TURBO:
return executeTurboMode(bucketName, objectName, parts, sources, startTime);
case STANDARD:
default:
return executeStandardMode(bucketName, objectName, parts, sources, startTime);
}
} catch (Exception e) {
log.error("{}失败: {}", strategy.getDescription(), e.getMessage());
throw new Exception(strategy.getDescription() + "失败: " + e.getMessage(), e);
}
}
/**
* 标准快速模式
* 构建合并源列表
*/
private MultipartUploadCompleteResult completeMultipartUploadFast(String uploadId, String objectName,
java.util.List<MultipartUploadCompleteRequest.PartInfo> parts) throws Exception {
long startTime = System.currentTimeMillis();
private List<ComposeSource> buildComposeSources(String bucketName, String objectName,
java.util.List<MultipartUploadCompleteRequest.PartInfo> parts) {
return parts.stream()
.map(part -> ComposeSource.builder()
.bucket(bucketName)
.object(objectName + ".part" + part.getPartNumber())
.build())
.collect(Collectors.toList());
}
/**
* 执行立即响应模式
*/
private MultipartUploadCompleteResult executeInstantMode(String bucketName, String objectName,
java.util.List<MultipartUploadCompleteRequest.PartInfo> parts,
List<ComposeSource> sources, long startTime) {
// 1. 预生成文件URL基于MinIO的规则
String fileUrl = minioConfigProperties.getEndpoint() + "/" + bucketName + "/" + objectName;
try {
// 使用固定的 user-uploads bucket
String bucketName = "user-uploads";
// 1. 按照分片编号排序
parts.sort((a, b) -> a.getPartNumber().compareTo(b.getPartNumber()));
log.info("开始超快速处理{}个分片文件", parts.size());
// 2. 直接构建ComposeSource列表跳过验证
List<ComposeSource> sources = parts.stream()
.map(part -> {
String chunkObjectName = objectName + ".part" + part.getPartNumber();
return ComposeSource.builder()
// 2. 生成临时ETag
String etag = "\"" + UUID.randomUUID().toString().replace("-", "") + "\"";
// 3. 异步执行合并操作
CompletableFuture.runAsync(() -> {
try {
long asyncStartTime = System.currentTimeMillis();
log.info("开始异步合并{}个分片文件", parts.size());
// 执行合并
minioClient.composeObject(
ComposeObjectArgs.builder()
.bucket(bucketName)
.object(chunkObjectName)
.build();
})
.collect(Collectors.toList());
.object(objectName)
.sources(sources)
.build()
);
// 异步清理分片文件
cleanupChunkFilesBatch(bucketName, objectName, parts);
long asyncTotalTime = System.currentTimeMillis() - asyncStartTime;
log.info("异步合并完成,耗时: {}ms", asyncTotalTime);
} catch (Exception e) {
log.error("异步合并分片文件失败", e);
}
}, executorService);
long totalTime = System.currentTimeMillis() - startTime;
log.info("立即响应模式完成,响应耗时: {}ms", totalTime);
return new MultipartUploadCompleteResult(fileUrl, etag);
}
// 3. 并行预生成URL和合并文件
CompletableFuture<String> urlFuture = CompletableFuture.supplyAsync(() -> {
try {
return minioClient.getPresignedObjectUrl(
GetPresignedObjectUrlArgs.builder()
.method(Method.GET)
.bucket(bucketName)
.object(objectName)
.build()
);
} catch (Exception e) {
throw new RuntimeException("预生成URL失败", e);
}
}, executorService);
// 4. 直接合并分片文件
/**
* 执行极速模式 - 借鉴分批并行处理思想
*/
private MultipartUploadCompleteResult executeTurboMode(String bucketName, String objectName,
java.util.List<MultipartUploadCompleteRequest.PartInfo> parts,
List<ComposeSource> sources, long startTime) throws Exception {
// 1. 异步预生成URL不阻塞后续操作
CompletableFuture<String> urlFuture = CompletableFuture.supplyAsync(() -> {
try {
return minioClient.getPresignedObjectUrl(
GetPresignedObjectUrlArgs.builder()
.method(Method.GET)
.bucket(bucketName)
.object(objectName)
.build()
);
} catch (Exception e) {
// 如果预生成URL失败使用基本URL规则
return minioConfigProperties.getEndpoint() + "/" + bucketName + "/" + objectName;
}
}, executorService);
long preProcessTime = System.currentTimeMillis();
log.info("预处理完成,耗时: {}ms", preProcessTime - startTime);
// 2. 判断是否需要分批处理超大文件优化
if (sources.size() > 100) {
// 分批并行合并策略
executeBatchCompose(bucketName, objectName, sources);
} else {
// 标准合并
minioClient.composeObject(
ComposeObjectArgs.builder()
.bucket(bucketName)
@ -452,101 +560,223 @@ public class MinioService {
.sources(sources)
.build()
);
}
long composeTime = System.currentTimeMillis();
log.info("文件合并完成,耗时: {}ms", composeTime - preProcessTime);
// 3. 异步清理
cleanupChunkFilesBatch(bucketName, objectName, parts);
// 4. 获取预生成的URL使用配置化的超时时间
String fileUrl = getUrlWithTimeout(urlFuture, bucketName, objectName);
long totalTime = System.currentTimeMillis();
log.info("极速模式完成,总耗时: {}ms", totalTime - startTime);
// 5. 返回结果
String etag = "\"" + UUID.randomUUID().toString().replace("-", "") + "\"";
return new MultipartUploadCompleteResult(fileUrl, etag);
}
long composeTime = System.currentTimeMillis();
log.info("分片文件合并完成,耗时: {}ms", composeTime - startTime);
/**
* 执行标准模式
*/
private MultipartUploadCompleteResult executeStandardMode(String bucketName, String objectName,
java.util.List<MultipartUploadCompleteRequest.PartInfo> parts,
List<ComposeSource> sources, long startTime) throws Exception {
// 1. 并行预生成URL和合并文件
CompletableFuture<String> urlFuture = CompletableFuture.supplyAsync(() -> {
try {
return minioClient.getPresignedObjectUrl(
GetPresignedObjectUrlArgs.builder()
.method(Method.GET)
.bucket(bucketName)
.object(objectName)
.build()
);
} catch (Exception e) {
throw new RuntimeException("预生成URL失败", e);
}
}, executorService);
// 5. 并行获取文件信息和URL
CompletableFuture<StatObjectResponse> statFuture = CompletableFuture.supplyAsync(() -> {
try {
return minioClient.statObject(
StatObjectArgs.builder()
.bucket(bucketName)
.object(objectName)
.build()
);
} catch (Exception e) {
throw new RuntimeException("获取文件信息失败", e);
}
}, executorService);
// 2. 直接合并分片文件
minioClient.composeObject(
ComposeObjectArgs.builder()
.bucket(bucketName)
.object(objectName)
.sources(sources)
.build()
);
// 6. 异步批量清理分片文件不阻塞响应
cleanupChunkFilesBatch(bucketName, objectName, parts);
long composeTime = System.currentTimeMillis();
log.info("分片文件合并完成,耗时: {}ms", composeTime - startTime);
// 7. 获取结果超时时间缩短到5秒
String fileUrl = urlFuture.get(5, TimeUnit.SECONDS);
StatObjectResponse finalStat = statFuture.get(5, TimeUnit.SECONDS);
String etag = finalStat.etag();
// 3. 并行获取文件信息和URL
CompletableFuture<StatObjectResponse> statFuture = CompletableFuture.supplyAsync(() -> {
try {
return minioClient.statObject(
StatObjectArgs.builder()
.bucket(bucketName)
.object(objectName)
.build()
);
} catch (Exception e) {
throw new RuntimeException("获取文件信息失败", e);
}
}, executorService);
long totalTime = System.currentTimeMillis();
log.info("完成分片上传成功,对象名: {}, 文件大小: {}, 总耗时: {}ms",
objectName, finalStat.size(), totalTime - startTime);
return new MultipartUploadCompleteResult(fileUrl, etag);
// 4. 异步批量清理分片文件不阻塞响应
cleanupChunkFilesBatch(bucketName, objectName, parts);
// 5. 获取结果超时时间缩短到5秒
String fileUrl = urlFuture.get(5, TimeUnit.SECONDS);
StatObjectResponse finalStat = statFuture.get(5, TimeUnit.SECONDS);
String etag = finalStat.etag();
long totalTime = System.currentTimeMillis();
log.info("标准模式完成,对象名: {}, 文件大小: {}, 总耗时: {}ms",
objectName, finalStat.size(), totalTime - startTime);
return new MultipartUploadCompleteResult(fileUrl, etag);
}
/**
* 获取URL带超时处理
*/
private String getUrlWithTimeout(CompletableFuture<String> urlFuture, String bucketName, String objectName) {
int urlTimeout = minioConfigProperties.getMultipart().getUrlGenerateTimeout();
try {
return urlFuture.get(urlTimeout, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("完成分片上传失败: {}", e.getMessage());
throw new Exception("分片文件合并失败: " + e.getMessage(), e);
log.warn("获取预生成URL超时{}秒使用基本URL", urlTimeout);
return minioConfigProperties.getEndpoint() + "/" + bucketName + "/" + objectName;
}
}
/**
* 极速模式 - 适用于超大文件50个分片以上
* 合并完成后立即返回跳过文件信息获取
* 分批并行合并策略 - 借鉴优秀实践
* 将大量分片分批处理减少单次操作的复杂度
*/
private MultipartUploadCompleteResult completeMultipartUploadTurbo(String uploadId, String objectName,
java.util.List<MultipartUploadCompleteRequest.PartInfo> parts) throws Exception {
long startTime = System.currentTimeMillis();
private void executeBatchCompose(String bucketName, String objectName, List<ComposeSource> sources) throws Exception {
int batchSize = minioConfigProperties.getMultipart().getBatchSize();
log.info("启用分批合并策略,总分片数: {}, 批次大小: {}", sources.size(), batchSize);
try {
String bucketName = "user-uploads";
// 1. 快速排序
parts.sort((a, b) -> a.getPartNumber().compareTo(b.getPartNumber()));
log.info("开始极速模式处理{}个分片文件", parts.size());
// 2. 预生成URL
String fileUrl = minioClient.getPresignedObjectUrl(
GetPresignedObjectUrlArgs.builder()
.method(Method.GET)
if (sources.size() <= batchSize) {
// 如果分片数量不超过批次大小直接合并
minioClient.composeObject(
ComposeObjectArgs.builder()
.bucket(bucketName)
.object(objectName)
.sources(sources)
.build()
);
return;
}
// 1. 分批创建临时合并对象
List<String> tempObjects = new ArrayList<>();
List<CompletableFuture<Void>> batchFutures = new ArrayList<>();
for (int i = 0; i < sources.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, sources.size());
List<ComposeSource> batch = sources.subList(i, endIndex);
String tempObjectName = objectName + ".temp." + (i / batchSize);
tempObjects.add(tempObjectName);
// 3. 构建合并源
List<ComposeSource> sources = parts.stream()
.map(part -> ComposeSource.builder()
// 并行处理每个批次
CompletableFuture<Void> batchFuture = CompletableFuture.runAsync(() -> {
try {
long batchStartTime = System.currentTimeMillis();
minioClient.composeObject(
ComposeObjectArgs.builder()
.bucket(bucketName)
.object(tempObjectName)
.sources(batch)
.build()
);
long batchTime = System.currentTimeMillis() - batchStartTime;
log.debug("批次合并完成: {}, 耗时: {}ms, 分片数: {}",
tempObjectName, batchTime, batch.size());
} catch (Exception e) {
log.error("批次合并失败: {}", tempObjectName, e);
throw new RuntimeException("批次合并失败: " + tempObjectName, e);
}
}, executorService);
batchFutures.add(batchFuture);
}
// 2. 等待所有批次完成
try {
CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0])).get();
log.info("所有批次合并完成,共{}个批次", tempObjects.size());
} catch (Exception e) {
log.error("批次合并过程中出现错误", e);
// 清理已创建的临时对象
cleanupTempObjects(bucketName, tempObjects);
throw new Exception("分批合并失败", e);
}
// 3. 最终合并临时对象
try {
List<ComposeSource> tempSources = tempObjects.stream()
.map(tempObj -> ComposeSource.builder()
.bucket(bucketName)
.object(objectName + ".part" + part.getPartNumber())
.object(tempObj)
.build())
.collect(Collectors.toList());
// 4. 合并文件
minioClient.composeObject(
ComposeObjectArgs.builder()
.bucket(bucketName)
.object(objectName)
.sources(sources)
.sources(tempSources)
.build()
);
// 5. 异步清理
cleanupChunkFilesBatch(bucketName, objectName, parts);
log.info("最终合并完成,临时对象数: {}", tempObjects.size());
long totalTime = System.currentTimeMillis();
log.info("极速模式完成,总耗时: {}ms", totalTime - startTime);
// 6. 立即返回使用模拟的ETag
String etag = "\"" + UUID.randomUUID().toString().replace("-", "") + "\"";
return new MultipartUploadCompleteResult(fileUrl, etag);
} catch (Exception e) {
log.error("极速模式失败: {}", e.getMessage());
throw new Exception("极速合并失败: " + e.getMessage(), e);
} finally {
// 4. 清理临时对象
cleanupTempObjects(bucketName, tempObjects);
}
}
/**
* 清理临时对象
*/
private void cleanupTempObjects(String bucketName, List<String> tempObjects) {
if (tempObjects.isEmpty()) {
return;
}
CompletableFuture.runAsync(() -> {
try {
List<DeleteObject> deleteObjects = tempObjects.stream()
.map(DeleteObject::new)
.collect(Collectors.toList());
minioClient.removeObjects(
RemoveObjectsArgs.builder()
.bucket(bucketName)
.objects(deleteObjects)
.build()
);
log.info("临时对象清理完成,清理数量: {}", tempObjects.size());
} catch (Exception e) {
log.error("清理临时对象失败", e);
}
}, executorService);
}
/**
* 异步批量清理分片文件
* 使用MinIO批量删除API性能更优