一、nacos注册中心原理

注册流程

image-20230524233710857

整个注册中心的注册和发现流程主要有三个方面来完成:服务的提供方(简称server)、服务的消费者(client)、注册中心(nacos)。

server和nacos的交互过程:

server需要通过nacos官方的OpenAPi提供的接口来发起服务注册请求,随后server会定时向nacos发送心跳来进行心跳检测,对于使用者来说这一步可以采用ScheduleExecutorService创建定时任务来完成。nacos会异步的处理注册请求和心跳任务。

nacos心跳机制

image-20230524234854641

nacos和client之间采用推拉结合的交互模式,一方面client可以通过定时任务每隔10s向nacos发起查询请求,如果服务列表改变nacos就返回新列表,另一方面当本地服务实例发生变化时(即server实例注册成功或心跳停止中断链接),nacos会主动通过UDP协议推送到client,udp协议非常快,不需要保持长链接。而注册中心的场景中client数量往往多于server,如果每一次服务更新,nacos要和成千上万的服务消费者去建立Tcp请求的话性能肯定是不行的。而如果UDP通知失败,客户端每10s还会主动去拉一次,客户端和服务器推送时互补的,这样既能保证server实例更新的实效性,又能提高效率。

分析nacos服务注册源码

官方OpenApi服务注册

image-20230525000211142

源码运行

项目结构如下:(下载的版本是1.4.5 mac m1)

image-20230525232057058

项目启动在console模块,选择主程序Nacos运行

遇到问题

1. consistency模块实体类的缺失

image-20230525232424965

idea安装插件Protobuf插件,然后maven选择consistency模块complie进行编译

image-20230525232738659

maven 编译有问题,在maven的setting.xml文件中进行配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<settings>
<activeProfiles>
<activeProfile>
apple-silicon
</activeProfile>
</activeProfiles>
<profiles>
<profile>
<id>apple-silicon</id>
<properties>
<os.detected.classifier>osx-x86_64</os.detected.classifier>
</properties>
</profile>
</profiles>
</settings>

重新进行compile,解决

启动报错Unable to start embedded Tomcat

在VM options 加入-Dnacos.standalone=trueimage-20230525233402397

数据库文件

在distribution模块下,创建数据库

image-20230525233611857

然后在console模块下application.properties 进行配置

image-20230525233653657

运行成功,访问nacos

image-20230525233801107

追踪源码image-20230525234244751

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Configuration
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
AutoServiceRegistrationAutoConfiguration.class })
public class NacosDiscoveryAutoConfiguration {

@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}

@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(nacosDiscoveryProperties, context);
}

@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
}

找到NacosAutoServiceRegistration继承的父类AbstractAutoServiceRegistration

追踪bind()方法

1
2
3
4
5
@Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
1
2
3
4
5
6
7
8
9
10
11
12
@Deprecated
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}

跟踪start()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}

// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get()) {
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration()));
register();
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}

}

进入register()方法

image-20230525234841564

image-20230525234902914

image-20230525234922854

image-20230525235248768

image-20230525235136568

继续NamingProxy#registerService()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
namespaceId, serviceName, instance);

final Map<String, String> params = new HashMap<String, String>(9);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));

reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

}

去到nacos源码找到naming模块controller下的InstanceController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
/**
* Register new instance.
*
* @param request http request
* @return 'ok' if success
* @throws Exception any error during register
*/
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {

final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);

final Instance instance = parseInstance(request);
// 注册服务实例
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
}

可以看到它是通过registerInstance()方法完成服务实例注册的

ServiceManager实现了RecordListener接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Component
public class ServiceManager implements RecordListener<Service> {
/**
* 服务注册
*/
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {

createEmptyService(namespaceId, serviceName, instance.isEphemeral());

Service service = getService(namespaceId, serviceName);

if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}

addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
/**
* 添加实例
*/
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {

String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

Service service = getService(namespaceId, serviceName);

synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

Instances instances = new Instances();
instances.setInstanceList(instanceList);
//存放服务实例
consistencyService.put(key, instances);
}
}
}

跟进consistencyService.put(key, instances);有一个实现类DistroConsistencyServiceImpl

image-20230526000333818

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
public void onPut(String key, Record value) {

if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
dataStore.put(key, datum);
}

if (!listeners.containsKey(key)) {
return;
}
//notifier是一个阻塞队列,我们把注册任务添加进去
notifier.addTask(key, DataOperation.CHANGE);
}

public void addTask(String datumKey, DataOperation action) {

if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
// 将请求放入阻塞队列里面
tasks.offer(Pair.with(datumKey, action));
}
}

notifier是一个阻塞队列我们把注册任务添加进去,然后执行,将请求写入和执行请求分开,异步处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {

// 创建notifie对象
private volatile Notifier notifier = new Notifier();

// 当前对象创建的时候将notifier任务加入线程池执行
@PostConstruct
public void init() {
GlobalExecutor.submitDistroNotifyTask(notifier);
}

public class GlobalExecutor {
private static final ScheduledExecutorService DISTRO_NOTIFY_EXECUTOR = ExecutorFactory.Managed
.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
new NameThreadFactory("com.alibaba.nacos.naming.distro.notifier"));
}

// 将任务加入到线程池
public static void submitDistroNotifyTask(Runnable runnable) {
DISTRO_NOTIFY_EXECUTOR.submit(runnable);
}

public class Notifier implements Runnable {
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");

for (; ; ) {
try {
// 取出任务
Pair<String, DataOperation> pair = tasks.take();
// 处理任务
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
}
}

继续追踪DelegateConsistencyServiceImpl.Notifier#handle

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
// 从缓存中移除
services.remove(datumKey);

int count = 0;

ConcurrentLinkedQueue<RecordListener> recordListeners = listeners.get(datumKey);
if (recordListeners == null) {
Loggers.DISTRO.info("[DISTRO-WARN] RecordListener not found, key: {}", datumKey);
return;
}

for (RecordListener listener : recordListeners) {

count++;

try {
// 通知数据已经变更
if (action == DataOperation.CHANGE) { // 发生变更
Datum datum = dataStore.get(datumKey);
if (datum != null) {
listener.onChange(datumKey, datum.value);
} else {
Loggers.DISTRO.info("[DISTRO-WARN] data not found, key: {}", datumKey);
}
continue;
}
// 通知数据删除
if (action == DataOperation.DELETE) { // 删除
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}

if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}

追踪Service#onChange

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public void onChange(String key, Instances value) throws Exception {

Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);

for (Instance instance : value.getInstanceList()) {

if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}

// 处理权重问题
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}

if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}
// 更新instance,核心在updateIPs这个方法中,参数1是instance集合,参数2时否是临时借点
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));

recalculateChecksum();
}

Service#updateIPs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}

for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}

if (StringUtils.isEmpty(instance.getClusterName())) {
// cluster是null就设置为default
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}

// cluster不存在的话就创建对应的cluster
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
getClusterMap().put(instance.getClusterName(), cluster);
}

List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}

clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}

for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}

setLastModifiedMillis(System.currentTimeMillis());
getPushService().serviceChanged(this);
StringBuilder stringBuilder = new StringBuilder();

for (Instance instance : allIPs()) {
stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
}

Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
stringBuilder.toString());

}

image-20230526005824980

总结:

server向nacos发起注册请求,并维持一个心态检测的定时任务,nacos会通过阻塞队列异步地处理这些请求,并实时的通过UDP推送到client,为防止UDP数据丢失,client也会通过定时任务每隔10s向nacos发送拉取请求,当服务列表改变时,nacos再返回。