From b91a85392b02929f17a41bb2946cb9983573d391 Mon Sep 17 00:00:00 2001 From: MAC <1162714483@qq.com> Date: Sun, 19 Dec 2021 22:43:48 +0800 Subject: [PATCH] =?UTF-8?q?feat(=E4=BB=BB=E5=8A=A1=E6=89=A7=E8=A1=8C):=20?= =?UTF-8?q?=E5=BC=82=E6=AD=A5=E6=89=A7=E8=A1=8C=E5=88=A0=E9=99=A4=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 15 --- .../file/advice/BaseAsyncConfigurer.java | 63 ----------- .../file/api/IRecoveryFileService.java | 2 +- .../file/component/AsyncTaskComp.java | 106 ++++++++++++++++++ .../AsyncThreadPoolAutoConfiguration.java | 85 ++++++++++++++ .../AsyncThreadPoolProperties.java} | 20 +++- .../BaseAsyncUncaughtExceptionHandler.java | 2 +- .../controller/RecoveryFileController.java | 37 ++---- .../file/service/FiletransferService.java | 1 - .../file/service/RecoveryFileService.java | 7 +- .../resources/config/application.properties | 22 ++++ 11 files changed, 243 insertions(+), 117 deletions(-) delete mode 100644 src/main/java/com/qiwenshare/file/advice/BaseAsyncConfigurer.java create mode 100644 src/main/java/com/qiwenshare/file/component/AsyncTaskComp.java create mode 100644 src/main/java/com/qiwenshare/file/config/threadpool/AsyncThreadPoolAutoConfiguration.java rename src/main/java/com/qiwenshare/file/{advice/ThreadPoolProperties.java => config/threadpool/AsyncThreadPoolProperties.java} (54%) rename src/main/java/com/qiwenshare/file/{advice => config/threadpool}/BaseAsyncUncaughtExceptionHandler.java (91%) diff --git a/pom.xml b/pom.xml index aee9a9b..94a2e4e 100644 --- a/pom.xml +++ b/pom.xml @@ -109,21 +109,6 @@ spring-boot-starter-data-elasticsearch - - com.inversoft - prime-jwt - 1.3.1 - - - - com.ctrip.framework.apollo - apollo-client - 1.9.0 - - - - - diff --git a/src/main/java/com/qiwenshare/file/advice/BaseAsyncConfigurer.java b/src/main/java/com/qiwenshare/file/advice/BaseAsyncConfigurer.java deleted file mode 100644 index 4d99f4f..0000000 --- a/src/main/java/com/qiwenshare/file/advice/BaseAsyncConfigurer.java +++ /dev/null @@ -1,63 +0,0 @@ -package com.qiwenshare.file.advice; - - -import lombok.extern.slf4j.Slf4j; -import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.scheduling.annotation.AsyncConfigurer; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Component; - -import java.util.concurrent.Executor; - -/** - * 异步线程池配置 AsyncConfigurer在applicationContext早期初始化,如果需要依赖于其它的bean,尽可能的将它们声明为lazy - */ -@Slf4j -@Component -@EnableConfigurationProperties(ThreadPoolProperties.class) -public class BaseAsyncConfigurer implements AsyncConfigurer { - - @Autowired - private ThreadPoolProperties threadPoolProperties; - - /** - * 定义线程池 - * @return Executor - */ - @Bean("asyncTaskExecutor") - @Override - public Executor getAsyncExecutor() { - //Java虚拟机可用的处理器数 - int processors = Runtime.getRuntime().availableProcessors(); - //定义线程池 - ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); - //核心线程数 - taskExecutor.setCorePoolSize(threadPoolProperties.getCorePoolSize()); - //线程池最大线程数,默认:40000 - taskExecutor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize()); - //线程池中线程最大空闲时间,默认:60,单位:秒 - taskExecutor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds()); - - taskExecutor.setQueueCapacity(threadPoolProperties.getQueueCapacity()); - taskExecutor.setThreadNamePrefix(threadPoolProperties.getThreadNamePrefix()); - - //初始化 - taskExecutor.initialize(); - - return taskExecutor; - } - - /** - * 异步方法执行的过程中抛出的异常捕获 - * - * @return AsyncUncaughtExceptionHandler - */ - @Override - public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { - return new BaseAsyncUncaughtExceptionHandler(); - } -} - diff --git a/src/main/java/com/qiwenshare/file/api/IRecoveryFileService.java b/src/main/java/com/qiwenshare/file/api/IRecoveryFileService.java index 0770781..e31cdcd 100644 --- a/src/main/java/com/qiwenshare/file/api/IRecoveryFileService.java +++ b/src/main/java/com/qiwenshare/file/api/IRecoveryFileService.java @@ -8,7 +8,7 @@ import com.qiwenshare.file.vo.file.RecoveryFileListVo; import java.util.List; public interface IRecoveryFileService extends IService { - void deleteRecoveryFile(UserFile userFile); + void deleteUserFileByDeleteBatchNum(String deleteBatchNum); void restorefile(String deleteBatchNum, String filePath, Long sessionUserId); List selectRecoveryFileList(Long userId); } diff --git a/src/main/java/com/qiwenshare/file/component/AsyncTaskComp.java b/src/main/java/com/qiwenshare/file/component/AsyncTaskComp.java new file mode 100644 index 0000000..4efe9c3 --- /dev/null +++ b/src/main/java/com/qiwenshare/file/component/AsyncTaskComp.java @@ -0,0 +1,106 @@ +package com.qiwenshare.file.component; + +import com.alibaba.fastjson.JSON; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.qiwenshare.file.api.IFileService; +import com.qiwenshare.file.api.IFiletransferService; +import com.qiwenshare.file.api.IRecoveryFileService; +import com.qiwenshare.file.api.IUserFileService; +import com.qiwenshare.file.domain.FileBean; +import com.qiwenshare.file.domain.UserFile; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.AsyncResult; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.List; +import java.util.concurrent.Future; + +/** + * 功能描述:异步任务业务类(@Async也可添加在方法上) + */ +@Slf4j +@Component +@Async("asyncTaskExecutor") +public class AsyncTaskComp { + @Resource + IUserFileService userFileService; + @Resource + IFileService fileService; + @Resource + IRecoveryFileService recoveryFileService; + @Resource + IFiletransferService filetransferService; + + public Future deleteUserFile(Long userFileId) { + + long begin = System.currentTimeMillis(); + UserFile userFile =userFileService.getById(userFileId); + if (userFile.getIsDir() == 1) { + LambdaQueryWrapper userFileLambdaQueryWrapper = new LambdaQueryWrapper<>(); + userFileLambdaQueryWrapper.eq(UserFile::getDeleteBatchNum, userFile.getDeleteBatchNum()); + List list = userFileService.list(userFileLambdaQueryWrapper); + recoveryFileService.deleteUserFileByDeleteBatchNum(userFile.getDeleteBatchNum()); + for (UserFile userFileItem : list) { + + Long filePointCount = fileService.getFilePointCount(userFileItem.getFileId()); + + if (filePointCount != null && filePointCount == 0 && userFileItem.getIsDir() == 0) { + FileBean fileBean = fileService.getById(userFileItem.getFileId()); + try { + filetransferService.deleteFile(fileBean); + fileService.removeById(fileBean.getFileId()); + } catch (Exception e) { + log.error("删除本地文件失败:" + JSON.toJSONString(fileBean)); + } + } + } + } else { + + recoveryFileService.deleteUserFileByDeleteBatchNum(userFile.getDeleteBatchNum()); + Long filePointCount = fileService.getFilePointCount(userFile.getFileId()); + + if (filePointCount != null && filePointCount == 0 && userFile.getIsDir() == 0) { + FileBean fileBean = fileService.getById(userFile.getFileId()); + try { + filetransferService.deleteFile(fileBean); + fileService.removeById(fileBean.getFileId()); + } catch (Exception e) { + log.error("删除本地文件失败:" + JSON.toJSONString(fileBean)); + } + } + } + + long end = System.currentTimeMillis(); + System.out.println("任务 deleteUserFile 耗时="+(end-begin)); + return new AsyncResult("deleteUserFile"); + } + + //获取异步结果 + public Future task4() throws InterruptedException{ + long begin = System.currentTimeMillis(); + Thread.sleep(2000L); + long end = System.currentTimeMillis(); + System.out.println("任务4耗时="+(end-begin)); + return new AsyncResult("任务4"); + } + + + public Future task5() throws InterruptedException{ + long begin = System.currentTimeMillis(); + Thread.sleep(3000L); + long end = System.currentTimeMillis(); + System.out.println("任务5耗时="+(end-begin)); + return new AsyncResult("任务5"); + } + + public Future task6() throws InterruptedException{ + long begin = System.currentTimeMillis(); + Thread.sleep(1000L); + long end = System.currentTimeMillis(); + System.out.println("任务6耗时="+(end-begin)); + return new AsyncResult("任务6"); + } + +} \ No newline at end of file diff --git a/src/main/java/com/qiwenshare/file/config/threadpool/AsyncThreadPoolAutoConfiguration.java b/src/main/java/com/qiwenshare/file/config/threadpool/AsyncThreadPoolAutoConfiguration.java new file mode 100644 index 0000000..bfe8b07 --- /dev/null +++ b/src/main/java/com/qiwenshare/file/config/threadpool/AsyncThreadPoolAutoConfiguration.java @@ -0,0 +1,85 @@ +package com.qiwenshare.file.config.threadpool; + + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.annotation.AsyncConfigurer; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.util.Objects; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 异步线程池配置 AsyncConfigurer在applicationContext早期初始化,如果需要依赖于其它的bean,尽可能的将它们声明为lazy + */ +@Slf4j +@EnableAsync +@Component +@EnableConfigurationProperties(AsyncThreadPoolProperties.class) +public class AsyncThreadPoolAutoConfiguration implements AsyncConfigurer { + + @Autowired + private AsyncThreadPoolProperties asyncThreadPoolProperties; + + /** + * 定义线程池 + * 使用{@link java.util.concurrent.LinkedBlockingQueue}(FIFO)队列,是一个用于并发环境下的阻塞队列集合类 + * ThreadPoolTaskExecutor不是完全被IOC容器管理的bean,可以在方法上加上@Bean注解交给容器管理,这样可以将taskExecutor.initialize()方法调用去掉,容器会自动调用 + * + * @return + */ + @Bean("asyncTaskExecutor") + @Override + public Executor getAsyncExecutor() { + //Java虚拟机可用的处理器数 + int processors = Runtime.getRuntime().availableProcessors(); + //定义线程池 + ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); + //核心线程数 + taskExecutor.setCorePoolSize(Objects.nonNull(asyncThreadPoolProperties.getCorePoolSize()) ? asyncThreadPoolProperties.getCorePoolSize() : processors); + //线程池最大线程数,默认:40000 + taskExecutor.setMaxPoolSize(Objects.nonNull(asyncThreadPoolProperties.getMaxPoolSize()) ? asyncThreadPoolProperties.getMaxPoolSize() : 40000); + //线程队列最大线程数,默认:80000 + taskExecutor.setQueueCapacity(Objects.nonNull(asyncThreadPoolProperties.getMaxPoolSize()) ? asyncThreadPoolProperties.getMaxPoolSize() : 80000); + //线程名称前缀 + taskExecutor.setThreadNamePrefix(StringUtils.isNotEmpty(asyncThreadPoolProperties.getThreadNamePrefix()) ? asyncThreadPoolProperties.getThreadNamePrefix() : "Async-ThreadPool-"); + //线程池中线程最大空闲时间,默认:60,单位:秒 + taskExecutor.setKeepAliveSeconds(asyncThreadPoolProperties.getKeepAliveSeconds()); + //核心线程是否允许超时,默认:false + taskExecutor.setAllowCoreThreadTimeOut(asyncThreadPoolProperties.isAllowCoreThreadTimeOut()); + //IOC容器关闭时是否阻塞等待剩余的任务执行完成,默认:false(必须设置setAwaitTerminationSeconds) + taskExecutor.setWaitForTasksToCompleteOnShutdown(asyncThreadPoolProperties.isWaitForTasksToCompleteOnShutdown()); + //阻塞IOC容器关闭的时间,默认:10秒(必须设置setWaitForTasksToCompleteOnShutdown) + taskExecutor.setAwaitTerminationSeconds(asyncThreadPoolProperties.getAwaitTerminationSeconds()); + /** + * 拒绝策略,默认是AbortPolicy + * AbortPolicy:丢弃任务并抛出RejectedExecutionException异常 + * DiscardPolicy:丢弃任务但不抛出异常 + * DiscardOldestPolicy:丢弃最旧的处理程序,然后重试,如果执行器关闭,这时丢弃任务 + * CallerRunsPolicy:执行器执行任务失败,则在策略回调方法中执行任务,如果执行器关闭,这时丢弃任务 + */ + taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); + //初始化 + //taskExecutor.initialize(); + + return taskExecutor; + } + + /** + * 异步方法执行的过程中抛出的异常捕获 + * + * @return AsyncUncaughtExceptionHandler + */ + @Override + public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { + return new BaseAsyncUncaughtExceptionHandler(); + } +} + diff --git a/src/main/java/com/qiwenshare/file/advice/ThreadPoolProperties.java b/src/main/java/com/qiwenshare/file/config/threadpool/AsyncThreadPoolProperties.java similarity index 54% rename from src/main/java/com/qiwenshare/file/advice/ThreadPoolProperties.java rename to src/main/java/com/qiwenshare/file/config/threadpool/AsyncThreadPoolProperties.java index 9f05099..b9500dc 100644 --- a/src/main/java/com/qiwenshare/file/advice/ThreadPoolProperties.java +++ b/src/main/java/com/qiwenshare/file/config/threadpool/AsyncThreadPoolProperties.java @@ -1,4 +1,4 @@ -package com.qiwenshare.file.advice; +package com.qiwenshare.file.config.threadpool; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -8,7 +8,11 @@ import org.springframework.boot.context.properties.ConfigurationProperties; */ @Data @ConfigurationProperties(prefix = "spring.async-thread-pool") -public class ThreadPoolProperties { +public class AsyncThreadPoolProperties { + /** + * 是否启动异步线程池,默认 false + */ + private boolean enable; /** * 核心线程数,默认:Java虚拟机可用线程数 */ @@ -30,6 +34,18 @@ public class ThreadPoolProperties { * 自定义线程名前缀,默认:Async-ThreadPool- */ private String threadNamePrefix = "async-threadpool-"; + /** + * 核心线程是否允许超时,默认false + */ + private boolean allowCoreThreadTimeOut; + /** + * IOC容器关闭时是否阻塞等待剩余的任务执行完成,默认:false(必须设置setAwaitTerminationSeconds) + */ + private boolean waitForTasksToCompleteOnShutdown; + /** + * 阻塞IOC容器关闭的时间,默认:10秒(必须设置setWaitForTasksToCompleteOnShutdown) + */ + private int awaitTerminationSeconds = 10; } diff --git a/src/main/java/com/qiwenshare/file/advice/BaseAsyncUncaughtExceptionHandler.java b/src/main/java/com/qiwenshare/file/config/threadpool/BaseAsyncUncaughtExceptionHandler.java similarity index 91% rename from src/main/java/com/qiwenshare/file/advice/BaseAsyncUncaughtExceptionHandler.java rename to src/main/java/com/qiwenshare/file/config/threadpool/BaseAsyncUncaughtExceptionHandler.java index e05c5be..43bb9de 100644 --- a/src/main/java/com/qiwenshare/file/advice/BaseAsyncUncaughtExceptionHandler.java +++ b/src/main/java/com/qiwenshare/file/config/threadpool/BaseAsyncUncaughtExceptionHandler.java @@ -1,4 +1,4 @@ -package com.qiwenshare.file.advice; +package com.qiwenshare.file.config.threadpool; import lombok.extern.slf4j.Slf4j; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; diff --git a/src/main/java/com/qiwenshare/file/controller/RecoveryFileController.java b/src/main/java/com/qiwenshare/file/controller/RecoveryFileController.java index ade0b92..75afc7b 100644 --- a/src/main/java/com/qiwenshare/file/controller/RecoveryFileController.java +++ b/src/main/java/com/qiwenshare/file/controller/RecoveryFileController.java @@ -5,10 +5,9 @@ import com.qiwenshare.common.anno.MyLog; import com.qiwenshare.common.exception.NotLoginException; import com.qiwenshare.common.result.RestResult; import com.qiwenshare.file.api.*; -import com.qiwenshare.file.domain.FileBean; +import com.qiwenshare.file.component.AsyncTaskComp; import com.qiwenshare.file.domain.RecoveryFile; import com.qiwenshare.file.domain.UserBean; -import com.qiwenshare.file.domain.UserFile; import com.qiwenshare.file.dto.file.DeleteRecoveryFileDTO; import com.qiwenshare.file.dto.recoveryfile.BatchDeleteRecoveryFileDTO; import com.qiwenshare.file.dto.recoveryfile.RestoreFileDTO; @@ -36,6 +35,8 @@ public class RecoveryFileController { IFileService fileService; @Resource IFiletransferService filetransferService; + @Resource + AsyncTaskComp asyncTaskComp; public static final String CURRENT_MODULE = "回收站文件接口"; @@ -51,21 +52,10 @@ public class RecoveryFileController { } RecoveryFile recoveryFile = recoveryFileService.getById(deleteRecoveryFileDTO.getRecoveryFileId()); - UserFile userFile =userFileService.getById(recoveryFile.getUserFileId()); - recoveryFileService.deleteRecoveryFile(userFile); + asyncTaskComp.deleteUserFile(recoveryFile.getUserFileId()); + recoveryFileService.removeById(deleteRecoveryFileDTO.getRecoveryFileId()); - Long filePointCount = fileService.getFilePointCount(userFile.getFileId()); - if (filePointCount != null && filePointCount == 0) { - FileBean fileBean = fileService.getById(userFile.getFileId()); - try { - filetransferService.deleteFile(fileBean); - fileService.removeById(fileBean.getFileId()); - } catch (Exception e) { - log.error("删除本地文件失败:" + JSON.toJSONString(fileBean)); - } - } - return RestResult.success().data("删除成功"); } @@ -80,23 +70,12 @@ public class RecoveryFileController { } List recoveryFileList = JSON.parseArray(batchDeleteRecoveryFileDTO.getRecoveryFileIds(), RecoveryFile.class); for (RecoveryFile recoveryFile : recoveryFileList) { - RecoveryFile recoveryFile1 = recoveryFileService.getById(recoveryFile.getRecoveryFileId()); - UserFile userFile =userFileService.getById(recoveryFile1.getUserFileId()); - recoveryFileService.deleteRecoveryFile(userFile); - recoveryFileService.removeById(recoveryFile.getRecoveryFileId()); + asyncTaskComp.deleteUserFile(recoveryFile1.getUserFileId()); + + recoveryFileService.removeById(recoveryFile1.getRecoveryFileId()); - Long filePointCount = fileService.getFilePointCount(userFile.getFileId()); - if (filePointCount != null && filePointCount == 0) { - FileBean fileBean = fileService.getById(userFile.getFileId()); - try { - filetransferService.deleteFile(fileBean); - fileService.removeById(fileBean.getFileId()); - } catch (Exception e) { - log.error("删除本地文件失败:" + JSON.toJSONString(fileBean)); - } - } } return RestResult.success().data("批量删除成功"); } diff --git a/src/main/java/com/qiwenshare/file/service/FiletransferService.java b/src/main/java/com/qiwenshare/file/service/FiletransferService.java index 431327e..0083c8e 100644 --- a/src/main/java/com/qiwenshare/file/service/FiletransferService.java +++ b/src/main/java/com/qiwenshare/file/service/FiletransferService.java @@ -94,7 +94,6 @@ public class FiletransferService implements IFiletransferService { fileBean.setFileUrl(uploadFileResult.getFileUrl()); fileBean.setFileSize(uploadFileResult.getFileSize()); fileBean.setStorageType(uploadFileResult.getStorageType().getCode()); -// fileBean.setPointCount(1); fileBean.setFileStatus(1); fileBean.setCreateTime(DateUtil.getCurrentTime()); fileBean.setCreateUserId(userId); diff --git a/src/main/java/com/qiwenshare/file/service/RecoveryFileService.java b/src/main/java/com/qiwenshare/file/service/RecoveryFileService.java index 4e40917..777046d 100644 --- a/src/main/java/com/qiwenshare/file/service/RecoveryFileService.java +++ b/src/main/java/com/qiwenshare/file/service/RecoveryFileService.java @@ -37,14 +37,11 @@ public class RecoveryFileService extends ServiceImpl userFileLambdaQueryWrapper = new LambdaQueryWrapper<>(); - userFileLambdaQueryWrapper.eq(UserFile::getDeleteBatchNum, userFile.getDeleteBatchNum()); + userFileLambdaQueryWrapper.eq(UserFile::getDeleteBatchNum, deleteBatchNum); userFileMapper.delete(userFileLambdaQueryWrapper); diff --git a/src/main/resources/config/application.properties b/src/main/resources/config/application.properties index a7cfb5f..23671fb 100644 --- a/src/main/resources/config/application.properties +++ b/src/main/resources/config/application.properties @@ -92,6 +92,28 @@ spring.elasticsearch.rest.username= spring.elasticsearch.rest.password= +#异步线程池 +#异步线程池组件开关,默认false +spring.async-thread-pool.enable=true +#核心线程数,默认:Java虚拟机可用线程数 +spring.async-thread-pool.core-pool-size=8 +#线程池最大线程数,默认:40000 +spring.async-thread-pool.max-pool-size=40000 +#线程队列最大线程数,默认:80000 +spring.async-thread-pool.queue-capacity=80000 +#自定义线程名前缀,默认:Async-ThreadPool- +spring.async-thread-pool.thread-name-prefix=Async-ThreadPool- +#线程池中线程最大空闲时间,默认:60,单位:秒 +spring.async-thread-pool.keep-alive-seconds=60 +#核心线程是否允许超时,默认false +spring.async-thread-pool.allow-core-thread-time-out=false +#IOC容器关闭时是否阻塞等待剩余的任务执行完成,默认:false(必须设置setAwaitTerminationSeconds) +spring.async-thread-pool.wait-for-tasks-to-complete-on-shutdown=false +#阻塞IOC容器关闭的时间,默认:10秒(必须设置setWaitForTasksToCompleteOnShutdown) +spring.async-thread-pool.await-termination-seconds=10 + + + # 当前部署外网IP,用于office预览 deployment.host=192.168.31.158