一、nacos注册中心原理 注册流程
整个注册中心的注册和发现流程主要有三个方面来完成:服务的提供方(简称server)、服务的消费者(client)、注册中心(nacos)。
server和nacos的交互过程:
server需要通过nacos官方的OpenAPi提供的接口来发起服务注册请求,随后server会定时向nacos发送心跳来进行心跳检测,对于使用者来说这一步可以采用ScheduleExecutorService创建定时任务来完成。nacos会异步的处理注册请求和心跳任务。
nacos心跳机制
nacos和client之间采用推拉结合的交互模式,一方面client可以通过定时任务每隔10s向nacos发起查询请求,如果服务列表改变nacos就返回新列表,另一方面当本地服务实例发生变化时(即server实例注册成功或心跳停止中断链接),nacos会主动通过UDP协议推送到client,udp协议非常快,不需要保持长链接。而注册中心的场景中client数量往往多于server,如果每一次服务更新,nacos要和成千上万的服务消费者去建立Tcp请求的话性能肯定是不行的。而如果UDP通知失败,客户端每10s还会主动去拉一次,客户端和服务器推送时互补的,这样既能保证server实例更新的实效性,又能提高效率。
分析nacos服务注册源码
官方OpenApi服务注册
源码运行 项目结构如下:(下载的版本是1.4.5 mac m1)
项目启动在console模块,选择主程序Nacos运行
遇到问题 1. consistency模块实体类的缺失
idea安装插件Protobuf插件,然后maven选择consistency模块complie进行编译
maven 编译有问题,在maven的setting.xml文件中进行配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <settings > <activeProfiles > <activeProfile > apple-silicon </activeProfile > </activeProfiles > <profiles > <profile > <id > apple-silicon</id > <properties > <os.detected.classifier > osx-x86_64</os.detected.classifier > </properties > </profile > </profiles > </settings >
重新进行compile,解决
启动报错Unable to start embedded Tomcat 在VM options 加入-Dnacos.standalone=true
数据库文件 在distribution模块下,创建数据库
然后在console模块下application.properties 进行配置
运行成功,访问nacos
追踪源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Configuration @EnableConfigurationProperties @ConditionalOnNacosDiscoveryEnabled @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class }) public class NacosDiscoveryAutoConfiguration { @Bean public NacosServiceRegistry nacosServiceRegistry ( NacosDiscoveryProperties nacosDiscoveryProperties) { return new NacosServiceRegistry (nacosDiscoveryProperties); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosRegistration nacosRegistration ( NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { return new NacosRegistration (nacosDiscoveryProperties, context); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosAutoServiceRegistration nacosAutoServiceRegistration ( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration (registry, autoServiceRegistrationProperties, registration); } }
找到NacosAutoServiceRegistration继承的父类AbstractAutoServiceRegistration
追踪bind()方法
1 2 3 4 5 @Override @SuppressWarnings("deprecation") public void onApplicationEvent (WebServerInitializedEvent event) { bind(event); }
1 2 3 4 5 6 7 8 9 10 11 12 @Deprecated public void bind (WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); if (context instanceof ConfigurableWebServerApplicationContext) { if ("management" .equals(((ConfigurableWebServerApplicationContext) context) .getServerNamespace())) { return ; } } this .port.compareAndSet(0 , event.getWebServer().getPort()); this .start(); }
跟踪start()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void start () { if (!isEnabled()) { if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting" ); } return ; } if (!this .running.get()) { this .context.publishEvent( new InstancePreRegisteredEvent (this , getRegistration())); register(); if (shouldRegisterManagement()) { registerManagement(); } this .context.publishEvent( new InstanceRegisteredEvent <>(this , getConfiguration())); this .running.compareAndSet(false , true ); } }
进入register()方法
继续NamingProxy#registerService()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public void registerService (String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}" , namespaceId, serviceName, instance); final Map<String, String> params = new HashMap <String, String>(9 ); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip" , instance.getIp()); params.put("port" , String.valueOf(instance.getPort())); params.put("weight" , String.valueOf(instance.getWeight())); params.put("enable" , String.valueOf(instance.isEnabled())); params.put("healthy" , String.valueOf(instance.isHealthy())); params.put("ephemeral" , String.valueOf(instance.isEphemeral())); params.put("metadata" , JSON.toJSONString(instance.getMetadata())); reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST); }
去到nacos源码找到naming模块controller下的InstanceController
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance") public class InstanceController { @CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register (HttpServletRequest request) throws Exception { final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); final Instance instance = parseInstance(request); serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok" ; } }
可以看到它是通过registerInstance()方法完成服务实例注册的
ServiceManager实现了RecordListener接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Component public class ServiceManager implements RecordListener <Service> { public void registerInstance (String namespaceId, String serviceName, Instance instance) throws NacosException { createEmptyService(namespaceId, serviceName, instance.isEphemeral()); Service service = getService(namespaceId, serviceName); if (service == null ) { throw new NacosException (NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); } public void addInstance (String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances (); instances.setInstanceList(instanceList); consistencyService.put(key, instances); } } }
跟进consistencyService.put(key, instances);有一个实现类DistroConsistencyServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @DependsOn("ProtocolManager") @org .springframework.stereotype.Service("distroConsistencyService" )public class DistroConsistencyServiceImpl implements EphemeralConsistencyService , DistroDataProcessor { @Override public void put (String key, Record value) throws NacosException { onPut(key, value); distroProtocol.sync(new DistroKey (key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2 ); } public void onPut (String key, Record value) { if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum<Instances> datum = new Datum <>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return ; } notifier.addTask(key, DataOperation.CHANGE); } public void addTask (String datumKey, DataOperation action) { if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return ; } if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } tasks.offer(Pair.with(datumKey, action)); } }
notifier是一个阻塞队列我们把注册任务添加进去,然后执行,将请求写入和执行请求分开,异步处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @DependsOn("ProtocolManager") @org .springframework.stereotype.Service("distroConsistencyService" )public class DistroConsistencyServiceImpl implements EphemeralConsistencyService , DistroDataProcessor { private volatile Notifier notifier = new Notifier (); @PostConstruct public void init () { GlobalExecutor.submitDistroNotifyTask(notifier); } public class GlobalExecutor { private static final ScheduledExecutorService DISTRO_NOTIFY_EXECUTOR = ExecutorFactory.Managed .newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class), new NameThreadFactory ("com.alibaba.nacos.naming.distro.notifier" )); } public static void submitDistroNotifyTask (Runnable runnable) { DISTRO_NOTIFY_EXECUTOR.submit(runnable); } public class Notifier implements Runnable { @Override public void run () { Loggers.DISTRO.info("distro notifier started" ); for (; ; ) { try { Pair<String, DataOperation> pair = tasks.take(); handle(pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task" , e); } } } } }
继续追踪DelegateConsistencyServiceImpl.Notifier#handle
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 private void handle (Pair<String, DataOperation> pair) { try { String datumKey = pair.getValue0(); DataOperation action = pair.getValue1(); services.remove(datumKey); int count = 0 ; ConcurrentLinkedQueue<RecordListener> recordListeners = listeners.get(datumKey); if (recordListeners == null ) { Loggers.DISTRO.info("[DISTRO-WARN] RecordListener not found, key: {}" , datumKey); return ; } for (RecordListener listener : recordListeners) { count++; try { if (action == DataOperation.CHANGE) { Datum datum = dataStore.get(datumKey); if (datum != null ) { listener.onChange(datumKey, datum.value); } else { Loggers.DISTRO.info("[DISTRO-WARN] data not found, key: {}" , datumKey); } continue ; } if (action == DataOperation.DELETE) { listener.onDelete(datumKey); continue ; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}" , datumKey, e); } } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}" , datumKey, count, action.name()); } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task" , e); } }
追踪Service#onChange
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Override public void onChange (String key, Instances value) throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}" , key, value); for (Instance instance : value.getInstanceList()) { if (instance == null ) { throw new RuntimeException ("got null instance " + key); } if (instance.getWeight() > 10000.0D ) { instance.setWeight(10000.0D ); } if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D ) { instance.setWeight(0.01D ); } } updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); recalculateChecksum(); }
Service#updateIPs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public void updateIPs (Collection<Instance> instances, boolean ephemeral) { Map<String, List<Instance>> ipMap = new HashMap <>(clusterMap.size()); for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList <>()); } for (Instance instance : instances) { try { if (instance == null ) { Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null" ); continue ; } if (StringUtils.isEmpty(instance.getClusterName())) { instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } if (!clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration." , instance.getClusterName(), instance.toJson()); Cluster cluster = new Cluster (instance.getClusterName(), this ); cluster.init(); getClusterMap().put(instance.getClusterName(), cluster); } List<Instance> clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null ) { clusterIPs = new LinkedList <>(); ipMap.put(instance.getClusterName(), clusterIPs); } clusterIPs.add(instance); } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e); } } for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) { List<Instance> entryIPs = entry.getValue(); clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); } setLastModifiedMillis(System.currentTimeMillis()); getPushService().serviceChanged(this ); StringBuilder stringBuilder = new StringBuilder (); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIpAddr()).append("_" ).append(instance.isHealthy()).append("," ); } Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}" , getNamespaceId(), getName(), stringBuilder.toString()); }
总结:
server向nacos发起注册请求,并维持一个心态检测的定时任务,nacos会通过阻塞队列异步地处理这些请求,并实时的通过UDP推送到client,为防止UDP数据丢失,client也会通过定时任务每隔10s向nacos发送拉取请求,当服务列表改变时,nacos再返回。