feat(任务执行): 异步执行删除文件任务

This commit is contained in:
MAC 2021-12-19 22:43:48 +08:00
parent 31240a80c7
commit b91a85392b
11 changed files with 243 additions and 117 deletions

15
pom.xml
View File

@ -109,21 +109,6 @@
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>com.inversoft</groupId>
<artifactId>prime-jwt</artifactId>
<version>1.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.ctrip.framework.apollo/apollo-client -->
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>

View File

@ -1,63 +0,0 @@
package com.qiwenshare.file.advice;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.concurrent.Executor;
/**
* 异步线程池配置 AsyncConfigurer在applicationContext早期初始化如果需要依赖于其它的bean尽可能的将它们声明为lazy
*/
@Slf4j
@Component
@EnableConfigurationProperties(ThreadPoolProperties.class)
public class BaseAsyncConfigurer implements AsyncConfigurer {
@Autowired
private ThreadPoolProperties threadPoolProperties;
/**
* 定义线程池
* @return Executor
*/
@Bean("asyncTaskExecutor")
@Override
public Executor getAsyncExecutor() {
//Java虚拟机可用的处理器数
int processors = Runtime.getRuntime().availableProcessors();
//定义线程池
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//核心线程数
taskExecutor.setCorePoolSize(threadPoolProperties.getCorePoolSize());
//线程池最大线程数,默认40000
taskExecutor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
//线程池中线程最大空闲时间默认60单位
taskExecutor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
taskExecutor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
taskExecutor.setThreadNamePrefix(threadPoolProperties.getThreadNamePrefix());
//初始化
taskExecutor.initialize();
return taskExecutor;
}
/**
* 异步方法执行的过程中抛出的异常捕获
*
* @return AsyncUncaughtExceptionHandler
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new BaseAsyncUncaughtExceptionHandler();
}
}

View File

@ -8,7 +8,7 @@ import com.qiwenshare.file.vo.file.RecoveryFileListVo;
import java.util.List;
public interface IRecoveryFileService extends IService<RecoveryFile> {
void deleteRecoveryFile(UserFile userFile);
void deleteUserFileByDeleteBatchNum(String deleteBatchNum);
void restorefile(String deleteBatchNum, String filePath, Long sessionUserId);
List<RecoveryFileListVo> selectRecoveryFileList(Long userId);
}

View File

@ -0,0 +1,106 @@
package com.qiwenshare.file.component;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.qiwenshare.file.api.IFileService;
import com.qiwenshare.file.api.IFiletransferService;
import com.qiwenshare.file.api.IRecoveryFileService;
import com.qiwenshare.file.api.IUserFileService;
import com.qiwenshare.file.domain.FileBean;
import com.qiwenshare.file.domain.UserFile;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.Future;
/**
* 功能描述异步任务业务类@Async也可添加在方法上
*/
@Slf4j
@Component
@Async("asyncTaskExecutor")
public class AsyncTaskComp {
@Resource
IUserFileService userFileService;
@Resource
IFileService fileService;
@Resource
IRecoveryFileService recoveryFileService;
@Resource
IFiletransferService filetransferService;
public Future<String> deleteUserFile(Long userFileId) {
long begin = System.currentTimeMillis();
UserFile userFile =userFileService.getById(userFileId);
if (userFile.getIsDir() == 1) {
LambdaQueryWrapper<UserFile> userFileLambdaQueryWrapper = new LambdaQueryWrapper<>();
userFileLambdaQueryWrapper.eq(UserFile::getDeleteBatchNum, userFile.getDeleteBatchNum());
List<UserFile> list = userFileService.list(userFileLambdaQueryWrapper);
recoveryFileService.deleteUserFileByDeleteBatchNum(userFile.getDeleteBatchNum());
for (UserFile userFileItem : list) {
Long filePointCount = fileService.getFilePointCount(userFileItem.getFileId());
if (filePointCount != null && filePointCount == 0 && userFileItem.getIsDir() == 0) {
FileBean fileBean = fileService.getById(userFileItem.getFileId());
try {
filetransferService.deleteFile(fileBean);
fileService.removeById(fileBean.getFileId());
} catch (Exception e) {
log.error("删除本地文件失败:" + JSON.toJSONString(fileBean));
}
}
}
} else {
recoveryFileService.deleteUserFileByDeleteBatchNum(userFile.getDeleteBatchNum());
Long filePointCount = fileService.getFilePointCount(userFile.getFileId());
if (filePointCount != null && filePointCount == 0 && userFile.getIsDir() == 0) {
FileBean fileBean = fileService.getById(userFile.getFileId());
try {
filetransferService.deleteFile(fileBean);
fileService.removeById(fileBean.getFileId());
} catch (Exception e) {
log.error("删除本地文件失败:" + JSON.toJSONString(fileBean));
}
}
}
long end = System.currentTimeMillis();
System.out.println("任务 deleteUserFile 耗时="+(end-begin));
return new AsyncResult<String>("deleteUserFile");
}
//获取异步结果
public Future<String> task4() throws InterruptedException{
long begin = System.currentTimeMillis();
Thread.sleep(2000L);
long end = System.currentTimeMillis();
System.out.println("任务4耗时="+(end-begin));
return new AsyncResult<String>("任务4");
}
public Future<String> task5() throws InterruptedException{
long begin = System.currentTimeMillis();
Thread.sleep(3000L);
long end = System.currentTimeMillis();
System.out.println("任务5耗时="+(end-begin));
return new AsyncResult<String>("任务5");
}
public Future<String> task6() throws InterruptedException{
long begin = System.currentTimeMillis();
Thread.sleep(1000L);
long end = System.currentTimeMillis();
System.out.println("任务6耗时="+(end-begin));
return new AsyncResult<String>("任务6");
}
}

View File

@ -0,0 +1,85 @@
package com.qiwenshare.file.config.threadpool;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 异步线程池配置 AsyncConfigurer在applicationContext早期初始化如果需要依赖于其它的bean尽可能的将它们声明为lazy
*/
@Slf4j
@EnableAsync
@Component
@EnableConfigurationProperties(AsyncThreadPoolProperties.class)
public class AsyncThreadPoolAutoConfiguration implements AsyncConfigurer {
@Autowired
private AsyncThreadPoolProperties asyncThreadPoolProperties;
/**
* 定义线程池
* 使用{@link java.util.concurrent.LinkedBlockingQueue}(FIFO队列是一个用于并发环境下的阻塞队列集合类
* ThreadPoolTaskExecutor不是完全被IOC容器管理的bean,可以在方法上加上@Bean注解交给容器管理,这样可以将taskExecutor.initialize()方法调用去掉容器会自动调用
*
* @return
*/
@Bean("asyncTaskExecutor")
@Override
public Executor getAsyncExecutor() {
//Java虚拟机可用的处理器数
int processors = Runtime.getRuntime().availableProcessors();
//定义线程池
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//核心线程数
taskExecutor.setCorePoolSize(Objects.nonNull(asyncThreadPoolProperties.getCorePoolSize()) ? asyncThreadPoolProperties.getCorePoolSize() : processors);
//线程池最大线程数,默认40000
taskExecutor.setMaxPoolSize(Objects.nonNull(asyncThreadPoolProperties.getMaxPoolSize()) ? asyncThreadPoolProperties.getMaxPoolSize() : 40000);
//线程队列最大线程数,默认80000
taskExecutor.setQueueCapacity(Objects.nonNull(asyncThreadPoolProperties.getMaxPoolSize()) ? asyncThreadPoolProperties.getMaxPoolSize() : 80000);
//线程名称前缀
taskExecutor.setThreadNamePrefix(StringUtils.isNotEmpty(asyncThreadPoolProperties.getThreadNamePrefix()) ? asyncThreadPoolProperties.getThreadNamePrefix() : "Async-ThreadPool-");
//线程池中线程最大空闲时间默认60单位
taskExecutor.setKeepAliveSeconds(asyncThreadPoolProperties.getKeepAliveSeconds());
//核心线程是否允许超时默认:false
taskExecutor.setAllowCoreThreadTimeOut(asyncThreadPoolProperties.isAllowCoreThreadTimeOut());
//IOC容器关闭时是否阻塞等待剩余的任务执行完成默认:false必须设置setAwaitTerminationSeconds
taskExecutor.setWaitForTasksToCompleteOnShutdown(asyncThreadPoolProperties.isWaitForTasksToCompleteOnShutdown());
//阻塞IOC容器关闭的时间默认10秒必须设置setWaitForTasksToCompleteOnShutdown
taskExecutor.setAwaitTerminationSeconds(asyncThreadPoolProperties.getAwaitTerminationSeconds());
/**
* 拒绝策略默认是AbortPolicy
* AbortPolicy丢弃任务并抛出RejectedExecutionException异常
* DiscardPolicy丢弃任务但不抛出异常
* DiscardOldestPolicy丢弃最旧的处理程序然后重试如果执行器关闭这时丢弃任务
* CallerRunsPolicy执行器执行任务失败则在策略回调方法中执行任务如果执行器关闭这时丢弃任务
*/
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//初始化
//taskExecutor.initialize();
return taskExecutor;
}
/**
* 异步方法执行的过程中抛出的异常捕获
*
* @return AsyncUncaughtExceptionHandler
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new BaseAsyncUncaughtExceptionHandler();
}
}

View File

@ -1,4 +1,4 @@
package com.qiwenshare.file.advice;
package com.qiwenshare.file.config.threadpool;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ -8,7 +8,11 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
*/
@Data
@ConfigurationProperties(prefix = "spring.async-thread-pool")
public class ThreadPoolProperties {
public class AsyncThreadPoolProperties {
/**
* 是否启动异步线程池默认 false
*/
private boolean enable;
/**
* 核心线程数,默认Java虚拟机可用线程数
*/
@ -30,6 +34,18 @@ public class ThreadPoolProperties {
* 自定义线程名前缀默认Async-ThreadPool-
*/
private String threadNamePrefix = "async-threadpool-";
/**
* 核心线程是否允许超时默认false
*/
private boolean allowCoreThreadTimeOut;
/**
* IOC容器关闭时是否阻塞等待剩余的任务执行完成默认:false必须设置setAwaitTerminationSeconds
*/
private boolean waitForTasksToCompleteOnShutdown;
/**
* 阻塞IOC容器关闭的时间默认10秒必须设置setWaitForTasksToCompleteOnShutdown
*/
private int awaitTerminationSeconds = 10;
}

View File

@ -1,4 +1,4 @@
package com.qiwenshare.file.advice;
package com.qiwenshare.file.config.threadpool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;

View File

@ -5,10 +5,9 @@ import com.qiwenshare.common.anno.MyLog;
import com.qiwenshare.common.exception.NotLoginException;
import com.qiwenshare.common.result.RestResult;
import com.qiwenshare.file.api.*;
import com.qiwenshare.file.domain.FileBean;
import com.qiwenshare.file.component.AsyncTaskComp;
import com.qiwenshare.file.domain.RecoveryFile;
import com.qiwenshare.file.domain.UserBean;
import com.qiwenshare.file.domain.UserFile;
import com.qiwenshare.file.dto.file.DeleteRecoveryFileDTO;
import com.qiwenshare.file.dto.recoveryfile.BatchDeleteRecoveryFileDTO;
import com.qiwenshare.file.dto.recoveryfile.RestoreFileDTO;
@ -36,6 +35,8 @@ public class RecoveryFileController {
IFileService fileService;
@Resource
IFiletransferService filetransferService;
@Resource
AsyncTaskComp asyncTaskComp;
public static final String CURRENT_MODULE = "回收站文件接口";
@ -51,21 +52,10 @@ public class RecoveryFileController {
}
RecoveryFile recoveryFile = recoveryFileService.getById(deleteRecoveryFileDTO.getRecoveryFileId());
UserFile userFile =userFileService.getById(recoveryFile.getUserFileId());
recoveryFileService.deleteRecoveryFile(userFile);
asyncTaskComp.deleteUserFile(recoveryFile.getUserFileId());
recoveryFileService.removeById(deleteRecoveryFileDTO.getRecoveryFileId());
Long filePointCount = fileService.getFilePointCount(userFile.getFileId());
if (filePointCount != null && filePointCount == 0) {
FileBean fileBean = fileService.getById(userFile.getFileId());
try {
filetransferService.deleteFile(fileBean);
fileService.removeById(fileBean.getFileId());
} catch (Exception e) {
log.error("删除本地文件失败:" + JSON.toJSONString(fileBean));
}
}
return RestResult.success().data("删除成功");
}
@ -80,23 +70,12 @@ public class RecoveryFileController {
}
List<RecoveryFile> recoveryFileList = JSON.parseArray(batchDeleteRecoveryFileDTO.getRecoveryFileIds(), RecoveryFile.class);
for (RecoveryFile recoveryFile : recoveryFileList) {
RecoveryFile recoveryFile1 = recoveryFileService.getById(recoveryFile.getRecoveryFileId());
UserFile userFile =userFileService.getById(recoveryFile1.getUserFileId());
recoveryFileService.deleteRecoveryFile(userFile);
recoveryFileService.removeById(recoveryFile.getRecoveryFileId());
asyncTaskComp.deleteUserFile(recoveryFile1.getUserFileId());
recoveryFileService.removeById(recoveryFile1.getRecoveryFileId());
Long filePointCount = fileService.getFilePointCount(userFile.getFileId());
if (filePointCount != null && filePointCount == 0) {
FileBean fileBean = fileService.getById(userFile.getFileId());
try {
filetransferService.deleteFile(fileBean);
fileService.removeById(fileBean.getFileId());
} catch (Exception e) {
log.error("删除本地文件失败:" + JSON.toJSONString(fileBean));
}
}
}
return RestResult.success().data("批量删除成功");
}

View File

@ -94,7 +94,6 @@ public class FiletransferService implements IFiletransferService {
fileBean.setFileUrl(uploadFileResult.getFileUrl());
fileBean.setFileSize(uploadFileResult.getFileSize());
fileBean.setStorageType(uploadFileResult.getStorageType().getCode());
// fileBean.setPointCount(1);
fileBean.setFileStatus(1);
fileBean.setCreateTime(DateUtil.getCurrentTime());
fileBean.setCreateUserId(userId);

View File

@ -37,14 +37,11 @@ public class RecoveryFileService extends ServiceImpl<RecoveryFileMapper, Recove
public static Executor executor = Executors.newFixedThreadPool(20);
@Override
public void deleteRecoveryFile(UserFile userFile) {
if (userFile == null) {
return ;
public void deleteUserFileByDeleteBatchNum(String deleteBatchNum) {
}
LambdaQueryWrapper<UserFile> userFileLambdaQueryWrapper = new LambdaQueryWrapper<>();
userFileLambdaQueryWrapper.eq(UserFile::getDeleteBatchNum, userFile.getDeleteBatchNum());
userFileLambdaQueryWrapper.eq(UserFile::getDeleteBatchNum, deleteBatchNum);
userFileMapper.delete(userFileLambdaQueryWrapper);

View File

@ -92,6 +92,28 @@ spring.elasticsearch.rest.username=
spring.elasticsearch.rest.password=
#异步线程池
#异步线程池组件开关默认false
spring.async-thread-pool.enable=true
#核心线程数,默认Java虚拟机可用线程数
spring.async-thread-pool.core-pool-size=8
#线程池最大线程数,默认40000
spring.async-thread-pool.max-pool-size=40000
#线程队列最大线程数,默认80000
spring.async-thread-pool.queue-capacity=80000
#自定义线程名前缀默认Async-ThreadPool-
spring.async-thread-pool.thread-name-prefix=Async-ThreadPool-
#线程池中线程最大空闲时间默认60单位
spring.async-thread-pool.keep-alive-seconds=60
#核心线程是否允许超时默认false
spring.async-thread-pool.allow-core-thread-time-out=false
#IOC容器关闭时是否阻塞等待剩余的任务执行完成默认:false必须设置setAwaitTerminationSeconds
spring.async-thread-pool.wait-for-tasks-to-complete-on-shutdown=false
#阻塞IOC容器关闭的时间默认10秒必须设置setWaitForTasksToCompleteOnShutdown
spring.async-thread-pool.await-termination-seconds=10
# 当前部署外网IP用于office预览
deployment.host=192.168.31.158