先通过bean方式注入线程池
@Configuration
@EnableAsync//开启异步调用
public class ThreadExecutorConfig {
@Bean("ThreadPoolConfig")
public ThreadPoolTaskExecutor getMyExecutor() {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
// 设置核心线程数
threadPool.setCorePoolSize(50);
// 设置最大线程数
threadPool.setMaxPoolSize(1000);
// 线程池所使用的缓冲队列
threadPool.setQueueCapacity(2000);
// 等待任务在关机时完成--表明等待所有线程执行完
threadPool.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
threadPool.setAwaitTerminationSeconds(60);
//线程空闲时间
threadPool.setKeepAliveSeconds(1000);
// 线程名称前缀
threadPool.setThreadNamePrefix("comsumer");
// 是否允许核心线程空闲退出,默认值为false.一般就使用false的
// threadPool.setAllowCoreThreadTimeOut(true);
//当pool已经达到max size的时候,如何处理新任务
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化线程
threadPool.initialize();
return threadPool;
}
定义工具类
@Component
public class BootBeanGetter implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
setApplicationContextStatic(applicationContext);
}
public static <T> T getClassBean(Class<T> requiredType) {
return applicationContext.getBean(requiredType);
}
public static Object getBean(String name){
return applicationContext.getBean(name);
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static void setApplicationContextStatic(final ApplicationContext applicationContext) {
BootBeanGetter.applicationContext = applicationContext;
}
}
定义控制线程池参数的接口
@Autowired
private ThreadExecutorConfig threadExecutorConfig;
@Autowired
private BootBeanGetter bootBeanGetter;
@Autowired
private BootBeanSetter bootBeanSetter;
/**
* 获取线程池参数
* @return
*/
@GetMapping("/getThreadPool")
@Role(value = "admin")
public JSONObject getThreadPool(){
ThreadPoolTaskExecutor myExecutor = threadExecutorConfig.getMyExecutor();
int corePoolSize = myExecutor.getCorePoolSize();
int keepAliveSeconds = myExecutor.getKeepAliveSeconds();
int maxPoolSize = myExecutor.getMaxPoolSize();
HashMap<String,Object> map= new HashMap<>();
map.put("corePoolSize",corePoolSize);
map.put("keepAliveSeconds",keepAliveSeconds);
map.put("maxPoolSize",maxPoolSize);
return success1(map);
}
/**
* 获取注入的线程池bean信息
* @return
*/
@GetMapping("/getBean")
@Role(value = "admin")
public JSONObject getBean(){
ThreadPoolTaskExecutor threadPoolConfig =(ThreadPoolTaskExecutor) bootBeanGetter.getBean("ThreadPoolConfig");
int corePoolSize = threadPoolConfig.getCorePoolSize();
int keepAliveSeconds = threadPoolConfig.getKeepAliveSeconds();
int maxPoolSize = threadPoolConfig.getMaxPoolSize();
HashMap<String,Object> map= new HashMap<>();
map.put("corePoolSize",corePoolSize);
map.put("keepAliveSeconds",keepAliveSeconds);
map.put("maxPoolSize",maxPoolSize);
return success1(map);
}
/**
* 通过bean动态修改线程池参数
* @param vo
* @return
*/
@PostMapping("editThreadPool")
@Role(value = "admin")
public JSONObject editThreadPool(@RequestBody ThreadPoolVo vo){
ApplicationContext applicationContext = bootBeanGetter.getApplicationContext();
String beanName = "ThreadPoolConfig";
// 拿到bean的Class对象
Class<?> beanType = applicationContext.getType(beanName);
// 拿到当前bean类型的所有字段
Field[] declaredFields = beanType.getDeclaredFields();
for (Field field : declaredFields) {
// 从spring容器中拿到这个具体的bean对象
Object bean = applicationContext.getBean(beanName);
// 当前字段设置新的值
try {
setFieldData2(field, bean, vo);
String name = field.getName();
System.out.println(name);
} catch (Exception e) {
e.printStackTrace();
return failure1("修改失败",e.getMessage());
}
}
return success1("修改成功");
}
private void setFieldData2(Field field, Object bean, ThreadPoolVo data) throws Exception {
// 注意这里要设置权限为true
field.setAccessible(true);
String name = field.getName();
if(StrUtil.equals("corePoolSize",name)){
if(ObjectUtil.isNotNull(data.getCorePoolSize())){
field.set(bean,data.getCorePoolSize());
}
}
if(StrUtil.equals("maxPoolSize",name)) {
if (ObjectUtil.isNotNull(data.getMaxPoolSize())) {
field.set(bean, data.getMaxPoolSize());
}
}
if(StrUtil.equals("keepAliveSeconds",name)){
if(ObjectUtil.isNotNull(data.getKeepAliveSeconds())){
field.set(bean,data.getKeepAliveSeconds());
}
}
if(StrUtil.equals("queueCapacity",name)){
if(ObjectUtil.isNotNull(data.getQueueCapacity())){
field.set(bean,data.getQueueCapacity());
}
}
}
线程池的用法
private void test() throws InterruptedException {
ThreadPoolTaskExecutor myExecutor = threadExecutorConfig.getMyExecutor();
final CountDownLatch latch =new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
System.out.println("执行任务");
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();//执行完之后-1
}
}
};
myExecutor.execute(runnable);//执行任务
}
latch.await();//阻塞 等待所有线程执行完
}
TAT 后面测试了一下 发现只能修改bean的属性,但是实际用线程池的时候线程数量并没有改变.然后发现了线程池有一个初始化的方法 threadPool.initialize();
然后就通过直接修改线程池的参数再调用一遍 threadPool.initialize();这个方法就能实现动态线程池了
@PostMapping("editThreadPool")
@Role(value = "admin,user")
public JSONObject editThreadPool(@RequestBody ThreadPoolVo vo){
ThreadPoolTaskExecutor myExecutor = threadExecutorConfig.getMyExecutor();
if(ObjectUtil.isNotNull(vo.getCorePoolSize())){
myExecutor.setCorePoolSize(vo.getCorePoolSize());
}
if (ObjectUtil.isNotNull(vo.getMaxPoolSize())) {
myExecutor.setMaxPoolSize(vo.getMaxPoolSize());
}
if(ObjectUtil.isNotNull(vo.getKeepAliveSeconds())){
myExecutor.setKeepAliveSeconds(vo.getKeepAliveSeconds());
}
if(ObjectUtil.isNotNull(vo.getQueueCapacity())){
myExecutor.setQueueCapacity(vo.getQueueCapacity());
}
myExecutor.initialize();
return success1("修改成功");
}