美团动态线程池实现原理

背景:

动态线程池,指的是线程池的参数可以动态修改并生效,比如corePoolSize、maximumPoolSize等。

在工作中,线程池的核心线程数和最大线程数等参数是很难估计和固,如果能在应用过程中动态进行调整,就很有必要了。

前置条件

1.直接基于SpringBoot

2.支持Nacos配置中心配置

核心配置项:

1
2
3
4
dtp:
enable: true
core-pool-size: 10
maximum-pool-size: 100

创建dtp-spring-boot-stater模块和user模块

在dtp-spring-boot-stater引入nacos配置坐标

1
2
3
4
5
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-starter</artifactId>
<version>0.2.12</version>
</dependency>

在user模块引入dtp-spring-boot-stater坐标:

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>com.can</groupId>
<artifactId>dtp-spring-boot-stater</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- 自定义配置生效-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>

yaml文件配置为:

1
2
3
4
5
6
7
8
9
10
nacos:
config:
bootstrap:
enable: true
server-addr: 127.0.0.1:8848
data-id: dtp.yaml
type: yaml
auto-refresh: true
username: nacos
password: nacos

创建DtpExecutor

创建DtpExecutor对象并添加到Spring容器中。

如果要开启动态线程池,就需要做这一步,并且在创建DtpExecutor对象时,得用配置的参数,并且得支持Nacos,放到spring容器中。

基于SpringBoot的自动配置类

1
2
3
4
5
public class DtpExecutor extends ThreadPoolExecutor {
public DtpExecutor(int corePoolSize, int maximumPoolSize) {
super(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10));
}
}

新建一个配置类DtpExecutorAutoConfiguration:

1
2
3
4
5
@Configuration
@ConditionalOnProperty(prefix = "dtp",value = "enable",havingValue = "true") // 动态线程池需要配置开启才能使用
public class DtpExecutorAutoConfiguration {

}

表示这是一个配置类,只有在”dtp.enable=true”的时候才会生效,没有这个配置或者为false时不会生效。

然后在DtpExecutorAutoConfiguration中定义DtpExecutor的Bean了

1
2
3
4
 @Bean
public DtpExecutor dtpExecutor(){
return new DtpExecutor();
}

通过Enviroment对象获取配置项:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
@ConditionalOnProperty(prefix = "dtp",value = "enable",havingValue = "true") // 动态线程池需要配置开启才能使用
public class DtpExecutorAutoConfiguration {
@Autowired
private Environment environment;

@Bean
public DtpExecutor dtpExecutor(DtpProperties dtpProperties){
Integer corePoolSize = Integer.valueOf(environment.getProperty("dtp.core-pool-size"));
Integer maximumPoolSize = Integer.valueOf(environment.getProperty("dtp.maximum-pool-size"));
return new DtpExecutor(dtpProperties.getCorePoolSize(),dtpProperties.getMaximumPoolSize());
}

}

使DtpAutoConfiguration配置生效,需要利用spring.factories

1
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.can.dpt.DtpExecutorAutoConfiguration

NacosListener

监听nacos配置发生变化,新建一个NacosListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class NacosListener implements Listener, InitializingBean {

@NacosInjected //采用注解注入形式@NacosInjected注入可以触发nacosConfigPublishedEvent回调,也就是配置注册到nacos的时候产生的回调,其余方式不会触发
private ConfigService configService;

@Override
public Executor getExecutor() {
return Executors.newFixedThreadPool(1);
}

// 利用Spring的Bean初始化机制,来设置要监听的nacos的dataid
// 暂时写死,最好是拿到程序员所配置的dataid和group
@Override
public void afterPropertiesSet() throws Exception {
configService.addListener("dtp.yaml","DEFAULT_GROUP",this);
}


/**
* 获取yaml配置项,并解析
*/
@Override
public void receiveConfigInfo(String configInfo) {
YamlPropertiesFactoryBean bean = new YamlPropertiesFactoryBean();
bean.setResources(new ByteArrayResource(configInfo.getBytes()));
Properties properties = bean.getObject();
System.out.println(properties );

}
}

在DtpExecutorAutoConfiguration中定义NacosListener为一个Bean:

1
2
3
4
@Bean
public NacosListener nacosListener(){
return new NacosListener();
}

从Spring容器中拿到DtpExecutor对象,利用BeanPostProcessor来存入一个static的map中:

1
2
3
4
5
6
7
8
9
10
11
12
public class DtpUtil {

public static Map<String, DtpExecutor> map = new HashMap<>();

public static void setDtpExecutor(String name,DtpExecutor dtpExecutor){
map.put(name, dtpExecutor);
}

public static DtpExecutor getDtpExecutor(String name){
return map.get(name);
}
}

然后新建一个BeanPostProcessor,把DtpExecutor对象存入DtpUtil中:

1
2
3
4
5
6
7
8
9
public class DtpBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DtpExecutor) {
DtpUtil.setDtpExecutor(beanName,(DtpExecutor ) bean);
}
return bean;
}
}

然后在Naocos中利用DtpUtil获取DtpExecutor对象:

1
2
3
4
5
6
7
8
9
@Override
public void receiveConfigInfo(String configInfo) {
YamlPropertiesFactoryBean bean = new YamlPropertiesFactoryBean();
bean.setResources(new ByteArrayResource(configInfo.getBytes()));
Properties properties = bean.getObject();
DtpExecutor dtpExecutor = DtpUtil.getDtpExecutor(executorProperties.getName());
dtpExecutor.setCorePoolSize(executorProperties.getCorePoolSize());
dtpExecutor.setMaximumPoolSize(executorProperties.getMaximumPoolSize());
}

为了更灵活和多个配置线程池,创建配置类DtpProperties和DtpImportBeanDefinitionRegistrar:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@ConfigurationProperties("dtp")
public class DtpProperties {


private List<DtpExecutorProperties> executors ;


public static class DtpExecutorProperties{

private String name;
private Integer corePoolSize = 50;
private Integer maximumPoolSize = 500;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Integer getCorePoolSize() {
return corePoolSize;
}

public void setCorePoolSize(Integer corePoolSize) {
this.corePoolSize = corePoolSize;
}

public Integer getMaximumPoolSize() {
return maximumPoolSize;
}

public void setMaximumPoolSize(Integer maximumPoolSize) {
this.maximumPoolSize = maximumPoolSize;
}
}

public List<DtpExecutorProperties> getExecutors() {
return executors;
}

public void setExecutors(List<DtpExecutorProperties> executors) {
this.executors = executors;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class DtpImportBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware {

@Autowired
private Environment environment;
@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}

@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry, BeanNameGenerator importBeanNameGenerator) {


// 读取用户配置 dtpProperties
DtpProperties dtpProperties = new DtpProperties();

Binder binder = Binder.get(environment);
ResolvableType type = ResolvableType.forClass(DtpProperties.class);
Bindable<?> target = Bindable.of(type).withExistingValue(dtpProperties);
binder.bind("dtp",target);

// 配置
for (DtpProperties.DtpExecutorProperties executorProperties : dtpProperties.getExecutors()) {
// 注册bean
AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.genericBeanDefinition().getBeanDefinition();
beanDefinition.setBeanClass(DtpExecutor.class );
beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(executorProperties.getCorePoolSize());
beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(executorProperties.getMaximumPoolSize());
registry.registerBeanDefinition(executorProperties.getName(),beanDefinition);
}
}
}

然后修改DtpExecutorAutoConfiguration:

1
2
3
4
5
6
7
8
9
10
11
@Configuration
@EnableConfigurationProperties(DtpProperties.class)
@Import({DtpImportBeanDefinitionRegistrar.class, DtpBeanPostProcessor.class})
@ConditionalOnProperty(prefix = "dtp",value = "enable",havingValue = "true") // 动态线程池需要配置开启才能使用
public class DtpExecutorAutoConfiguration {

@Bean
public NacosListener nacosListener(){
return new NacosListener();
}
}

这样就完成了DtpExecutor的参数修改,实现了一个动态线程池。