美团动态线程池实现原理
背景:
动态线程池,指的是线程池的参数可以动态修改并生效,比如corePoolSize、maximumPoolSize等。
在工作中,线程池的核心线程数和最大线程数等参数是很难估计和固,如果能在应用过程中动态进行调整,就很有必要了。
前置条件
1.直接基于SpringBoot
2.支持Nacos配置中心配置
核心配置项:
1 2 3 4
| dtp: enable: true core-pool-size: 10 maximum-pool-size: 100
|
创建dtp-spring-boot-stater模块和user模块
在dtp-spring-boot-stater引入nacos配置坐标
1 2 3 4 5
| <dependency> <groupId>com.alibaba.boot</groupId> <artifactId>nacos-config-spring-boot-starter</artifactId> <version>0.2.12</version> </dependency>
|
在user模块引入dtp-spring-boot-stater坐标:
1 2 3 4 5 6 7 8 9 10 11
| <dependency> <groupId>com.can</groupId> <artifactId>dtp-spring-boot-stater</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
|
yaml文件配置为:
1 2 3 4 5 6 7 8 9 10
| nacos: config: bootstrap: enable: true server-addr: 127.0.0.1:8848 data-id: dtp.yaml type: yaml auto-refresh: true username: nacos password: nacos
|
创建DtpExecutor
创建DtpExecutor对象并添加到Spring容器中。
如果要开启动态线程池,就需要做这一步,并且在创建DtpExecutor对象时,得用配置的参数,并且得支持Nacos,放到spring容器中。
基于SpringBoot的自动配置类
1 2 3 4 5
| public class DtpExecutor extends ThreadPoolExecutor { public DtpExecutor(int corePoolSize, int maximumPoolSize) { super(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10)); } }
|
新建一个配置类DtpExecutorAutoConfiguration:
1 2 3 4 5
| @Configuration @ConditionalOnProperty(prefix = "dtp",value = "enable",havingValue = "true") public class DtpExecutorAutoConfiguration {
}
|
表示这是一个配置类,只有在”dtp.enable=true”的时候才会生效,没有这个配置或者为false时不会生效。
然后在DtpExecutorAutoConfiguration中定义DtpExecutor的Bean了
1 2 3 4
| @Bean public DtpExecutor dtpExecutor(){ return new DtpExecutor(); }
|
通过Enviroment对象获取配置项:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Configuration @ConditionalOnProperty(prefix = "dtp",value = "enable",havingValue = "true") public class DtpExecutorAutoConfiguration { @Autowired private Environment environment;
@Bean public DtpExecutor dtpExecutor(DtpProperties dtpProperties){ Integer corePoolSize = Integer.valueOf(environment.getProperty("dtp.core-pool-size")); Integer maximumPoolSize = Integer.valueOf(environment.getProperty("dtp.maximum-pool-size")); return new DtpExecutor(dtpProperties.getCorePoolSize(),dtpProperties.getMaximumPoolSize()); }
}
|
使DtpAutoConfiguration配置生效,需要利用spring.factories
1
| org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.can.dpt.DtpExecutorAutoConfiguration
|
NacosListener
监听nacos配置发生变化,新建一个NacosListener
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class NacosListener implements Listener, InitializingBean {
@NacosInjected private ConfigService configService; @Override public Executor getExecutor() { return Executors.newFixedThreadPool(1); } @Override public void afterPropertiesSet() throws Exception { configService.addListener("dtp.yaml","DEFAULT_GROUP",this); }
@Override public void receiveConfigInfo(String configInfo) { YamlPropertiesFactoryBean bean = new YamlPropertiesFactoryBean(); bean.setResources(new ByteArrayResource(configInfo.getBytes())); Properties properties = bean.getObject(); System.out.println(properties ); } }
|
在DtpExecutorAutoConfiguration中定义NacosListener为一个Bean:
1 2 3 4
| @Bean public NacosListener nacosListener(){ return new NacosListener(); }
|
从Spring容器中拿到DtpExecutor对象,利用BeanPostProcessor来存入一个static的map中:
1 2 3 4 5 6 7 8 9 10 11 12
| public class DtpUtil {
public static Map<String, DtpExecutor> map = new HashMap<>();
public static void setDtpExecutor(String name,DtpExecutor dtpExecutor){ map.put(name, dtpExecutor); }
public static DtpExecutor getDtpExecutor(String name){ return map.get(name); } }
|
然后新建一个BeanPostProcessor,把DtpExecutor对象存入DtpUtil中:
1 2 3 4 5 6 7 8 9
| public class DtpBeanPostProcessor implements BeanPostProcessor { @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { if (bean instanceof DtpExecutor) { DtpUtil.setDtpExecutor(beanName,(DtpExecutor ) bean); } return bean; } }
|
然后在Naocos中利用DtpUtil获取DtpExecutor对象:
1 2 3 4 5 6 7 8 9
| @Override public void receiveConfigInfo(String configInfo) { YamlPropertiesFactoryBean bean = new YamlPropertiesFactoryBean(); bean.setResources(new ByteArrayResource(configInfo.getBytes())); Properties properties = bean.getObject(); DtpExecutor dtpExecutor = DtpUtil.getDtpExecutor(executorProperties.getName()); dtpExecutor.setCorePoolSize(executorProperties.getCorePoolSize()); dtpExecutor.setMaximumPoolSize(executorProperties.getMaximumPoolSize()); }
|
为了更灵活和多个配置线程池,创建配置类DtpProperties和DtpImportBeanDefinitionRegistrar:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| @ConfigurationProperties("dtp") public class DtpProperties {
private List<DtpExecutorProperties> executors ;
public static class DtpExecutorProperties{
private String name; private Integer corePoolSize = 50; private Integer maximumPoolSize = 500;
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public Integer getCorePoolSize() { return corePoolSize; }
public void setCorePoolSize(Integer corePoolSize) { this.corePoolSize = corePoolSize; }
public Integer getMaximumPoolSize() { return maximumPoolSize; }
public void setMaximumPoolSize(Integer maximumPoolSize) { this.maximumPoolSize = maximumPoolSize; } }
public List<DtpExecutorProperties> getExecutors() { return executors; }
public void setExecutors(List<DtpExecutorProperties> executors) { this.executors = executors; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class DtpImportBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware {
@Autowired private Environment environment; @Override public void setEnvironment(Environment environment) { this.environment = environment; }
@Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry, BeanNameGenerator importBeanNameGenerator) {
DtpProperties dtpProperties = new DtpProperties();
Binder binder = Binder.get(environment); ResolvableType type = ResolvableType.forClass(DtpProperties.class); Bindable<?> target = Bindable.of(type).withExistingValue(dtpProperties); binder.bind("dtp",target);
for (DtpProperties.DtpExecutorProperties executorProperties : dtpProperties.getExecutors()) { AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.genericBeanDefinition().getBeanDefinition(); beanDefinition.setBeanClass(DtpExecutor.class ); beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(executorProperties.getCorePoolSize()); beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(executorProperties.getMaximumPoolSize()); registry.registerBeanDefinition(executorProperties.getName(),beanDefinition); } } }
|
然后修改DtpExecutorAutoConfiguration:
1 2 3 4 5 6 7 8 9 10 11
| @Configuration @EnableConfigurationProperties(DtpProperties.class) @Import({DtpImportBeanDefinitionRegistrar.class, DtpBeanPostProcessor.class}) @ConditionalOnProperty(prefix = "dtp",value = "enable",havingValue = "true") public class DtpExecutorAutoConfiguration {
@Bean public NacosListener nacosListener(){ return new NacosListener(); } }
|
这样就完成了DtpExecutor的参数修改,实现了一个动态线程池。