数据库

Nacos 配置中心源码分析

时间:2010-12-5 17:23:32  作者:IT科技类资讯   来源:数据库  查看:  评论:0
内容摘要:本文主要和大家一起以源码的角度来分析 Nacos 配置中心的配置信息获取,以及配置信息动态同步的过程和原理。环境介绍和使用 环境介绍:Jdk 1.8 nacos-server-1.4.2

本文主要和大家一起以源码的置中角度来分析 Nacos 配置中心的配置信息获取,以及配置信息动态同步的心源析过程和原理。环境介绍和使用 环境介绍:

Jdk 1.8 nacos-server-1.4.2 spring-boot-2.3.5.RELEASE spring-cloud-Hoxton.SR8 spring-cloiud-alibab-2.2.5.RELEASE

如果我们需要使用 Nacos 作为配置中心,码分我们首先需要导入 Nacos Config 的置中依赖信息,如下所示:

<dependency>   <groupId>com.alibaba.cloud</groupId>   <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency> 

然后再 bootstartp.yml 文件中配置 Nacos 服务信息。心源析

spring:   cloud:     nacos:       config:         server-addr: 127.0.0.1:8848 

客户端初始化

主要是码分通过 NacosConfigBootstrapConfiguration 类来进行初始化 NacosConfigManager 、NacosPropertySourceLocator

@Configuration(proxyBeanMethods = false) @ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled",置中 matchIfMissing = true) public class NacosConfigBootstrapConfiguration {   @Bean  @ConditionalOnMissingBean  public NacosConfigManager nacosConfigManager(    NacosConfigProperties nacosConfigProperties) {    return new NacosConfigManager(nacosConfigProperties);  }     @Bean  public NacosPropertySourceLocator nacosPropertySourceLocator(    NacosConfigManager nacosConfigManager) {    return new NacosPropertySourceLocator(nacosConfigManager);  }     // ... } 

在 NacosConfigManager 的构造方法中会调用 createConfigService 方法来创建 ConfigService 实例,内部调用工厂方法 ConfigFactory#createConfigService 通过反射实例化一个com.alibaba.nacos.client.config.NacosConfigService 的心源析实例对象。

public static ConfigService createConfigService(Properties properties) throws NacosException {      try {          Class<?码分> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");         Constructor constructor = driverImplClass.getConstructor(Properties.class);         ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);         return vendorImpl;     } catch (Throwable e) {          throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);     } } 

NacosPropertySourceLocator 继承 PropertySourceLocator(PropertySourceLocator接口支持扩展自定义配置加载到 Spring Environment中)通过 locate 加载配置信息。

@Override public PropertySource<?置中> locate(Environment env) {   nacosConfigProperties.setEnvironment(env);  ConfigService configService = nacosConfigManager.getConfigService();  if (null == configService) {    log.warn("no instance of config service found, cant load config from nacos");   return null;  }  long timeout = nacosConfigProperties.getTimeout();  nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService,    timeout);  String name = nacosConfigProperties.getName();  String dataIdPrefix = nacosConfigProperties.getPrefix();  if (StringUtils.isEmpty(dataIdPrefix)) {    dataIdPrefix = name;  }  if (StringUtils.isEmpty(dataIdPrefix)) {    dataIdPrefix = env.getProperty("spring.application.name");  }  CompositePropertySource composite = new CompositePropertySource(    NACOS_PROPERTY_SOURCE_NAME);        // 共享配置  loadSharedConfiguration(composite);  // 拓展配置        loadExtConfiguration(composite);  // 应用配置        loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);  return composite; } 

配置读取过程

配置加载有三个方法 loadSharedConfiguration、loadSharedConfiguration、心源析 loadApplicationConfiguration 以 loadApplicationConfiguration 继续跟进。码分

private void loadApplicationConfiguration(     CompositePropertySource compositePropertySource,置中 String dataIdPrefix,     NacosConfigProperties properties, Environment environment) {      String fileExtension = properties.getFileExtension();     String nacosGroup = properties.getGroup();     // load directly once by default     loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup,                            fileExtension, true);     // load with suffix, which have a higher priority than the default     loadNacosDataIfPresent(compositePropertySource,                            dataIdPrefix + DOT + fileExtension, nacosGroup, fileExtension, true);     // Loaded with profile, which have a higher priority than the suffix     for (String profile : environment.getActiveProfiles()) {          String dataId = dataIdPrefix + SEP1 + profile + DOT + fileExtension;         loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup,                                fileExtension, true);     } } 

主要通过 loadNacosDataIfPresent 读取配置信息, 其实我们可以通过参数看出,服务器托管主要配置文件包含以下部分:dataId,心源析 group, fileExtension

private void loadNacosDataIfPresent(final CompositePropertySource composite,                                     final String dataId, final String group, String fileExtension,                                     boolean isRefreshable) {      if (null == dataId || dataId.trim().length() < 1) {          return;     }     if (null == group || group.trim().length() < 1) {          return;     }     NacosPropertySource propertySource = this.loadNacosPropertySource(dataId, group,                                                                       fileExtension, isRefreshable);     this.addFirstPropertySource(composite, propertySource, false); } 

然后调用 loadNacosPropertySource 最后一步步的会调用到 NacosConfigService#getConfigInner

private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {          group = null2defaultGroup(group);         ParamUtils.checkKeyParam(dataId, group);         ConfigResponse cr = new ConfigResponse();         cr.setDataId(dataId);         cr.setTenant(tenant);         cr.setGroup(group);         // 优先使用本地配置         String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);         if (content != null) {              LOGGER.warn("[{ }] [get-config] get failover ok, dataId={ }, group={ }, tenant={ }, config={ }", agent.getName(),                     dataId, group, tenant, ContentUtils.truncateContent(content));             cr.setContent(content);             configFilterChainManager.doFilter(null, cr);             content = cr.getContent();             return content;         }         try {              // 获取远程配置             String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs);             cr.setContent(ct[0]);             configFilterChainManager.doFilter(null, cr);             content = cr.getContent();             return content;         } catch (NacosException ioe) {              if (NacosException.NO_RIGHT == ioe.getErrCode()) {                  throw ioe;             }             LOGGER.warn("[{ }] [get-config] get from server error, dataId={ }, group={ }, tenant={ }, msg={ }",                     agent.getName(), dataId, group, tenant, ioe.toString());         }         LOGGER.warn("[{ }] [get-config] get snapshot ok, dataId={ }, group={ }, tenant={ }, config={ }", agent.getName(),                 dataId, group, tenant, ContentUtils.truncateContent(content));         content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);         cr.setContent(content);         configFilterChainManager.doFilter(null, cr);         content = cr.getContent();         return content;     } 

加载远程配置

worker.getServerConfig 主要是获取远程配置, ClIentWorker 的码分 getServerConfig 定义如下:

public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout)     throws NacosException {      String[] ct = new String[2];     if (StringUtils.isBlank(group)) {          group = Constants.DEFAULT_GROUP;     }     HttpRestResult<String> result = null;     try {          Map<String, String> params = new HashMap<String, String>(3);         if (StringUtils.isBlank(tenant)) {              params.put("dataId", dataId);             params.put("group", group);         } else {              params.put("dataId", dataId);             params.put("group", group);             params.put("tenant", tenant);         }         result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);     } catch (Exception ex) {          String message = String             .format("[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s",                     agent.getName(), dataId, group, tenant);         LOGGER.error(message, ex);         throw new NacosException(NacosException.SERVER_ERROR, ex);     }     switch (result.getCode()) {          case HttpURLConnection.HTTP_OK:             LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.getData());             ct[0] = result.getData();             if (result.getHeader().getValue(CONFIG_TYPE) != null) {                  ct[1] = result.getHeader().getValue(CONFIG_TYPE);             } else {                  ct[1] = ConfigType.TEXT.getType();             }             return ct;         case HttpURLConnection.HTTP_NOT_FOUND:             LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);             return ct;         case HttpURLConnection.HTTP_CONFLICT: {              LOGGER.error(                 "[{ }] [sub-server-error] get server config being modified concurrently, dataId={ }, group={ }, "                 + "tenant={ }", agent.getName(), dataId, group, tenant);             throw new NacosException(NacosException.CONFLICT,                                      "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);         }         case HttpURLConnection.HTTP_FORBIDDEN: {              LOGGER.error("[{ }] [sub-server-error] no right, dataId={ }, group={ }, tenant={ }", agent.getName(),                          dataId, group, tenant);             throw new NacosException(result.getCode(), result.getMessage());         }         default: {              LOGGER.error("[{ }] [sub-server-error]  dataId={ }, group={ }, tenant={ }, code={ }", agent.getName(),                          dataId, group, tenant, result.getCode());             throw new NacosException(result.getCode(),                                      "http error, code=" + result.getCode() + ",dataId=" + dataId + ",group=" + group + ",tenant="                                      + tenant);         }     } } 

agent 默认使用 MetricsHttpAgent 实现类

配置同步过程

Nacos 配置同步过程如下图所示:

客户端请求

客户端初始请求配置完成后,会通过 WorkClient 进行长轮询查询配置, 它的构造方法如下:

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,             final Properties properties) {          this.agent = agent;         this.configFilterChainManager = configFilterChainManager;         // Initialize the timeout parameter         init(properties);         // 检查线程池         this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {              @Override             public Thread newThread(Runnable r) {                  Thread t = new Thread(r);                 t.setName("com.alibaba.nacos.client.Worker." + agent.getName());                 t.setDaemon(true);                 return t;             }         });         // 长轮询线程         this.executorService = Executors                 .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {                      @Override                     public Thread newThread(Runnable r) {                          Thread t = new Thread(r);                         t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());                         t.setDaemon(true);                         return t;                     }                 });         this.executor.scheduleWithFixedDelay(new Runnable() {              @Override             public void run() {                  try {                      checkConfigInfo();                 } catch (Throwable e) {                      LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);                 }             }         }, 1L, 10L, TimeUnit.MILLISECONDS);     } 

这里初始化了两个线程池:

第一个线程池主要是用来初始化做长轮询的; 第二个线程池使用来做检查的,会每间隔 10 秒钟执行一次检查方法 checkConfigInfo

checkConfigInfo

在这个方法里面主要是分配任务,给每个 task 分配一个 taskId , 后面会去检查本地配置和远程配置,最终调用的是 LongPollingRunable 的 run 方法。

public void checkConfigInfo() {      // Dispatch taskes.     int listenerSize = cacheMap.size();     // Round up the longingTaskCount.     int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());     if (longingTaskCount > currentLongingTaskCount) {          for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {              // The task list is no order.So it maybe has issues when changing.             executorService.execute(new LongPollingRunnable(i));         }         currentLongingTaskCount = longingTaskCount;     } } 

LongPollingRunnable

长轮询线程实现,源码下载首先第一步检查本地配置信息,然后通过 dataId 去检查服务端是否有变动的配置信息,如果有就更新下来然后刷新配置。

public void run() {          List<CacheData> cacheDatas = new ArrayList<CacheData>();         List<String> inInitializingCacheList = new ArrayList<String>();         try {              // check failover config             for (CacheData cacheData : cacheMap.values()) {                  if (cacheData.getTaskId() == taskId) {                      cacheDatas.add(cacheData);                     try {                          checkLocalConfig(cacheData);                         if (cacheData.isUseLocalConfigInfo()) {                              // 触发回调                             cacheData.checkListenerMd5();                         }                     } catch (Exception e) {                          LOGGER.error("get local config info error", e);                     }                 }             }             // check server config             List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);             if (!CollectionUtils.isEmpty(changedGroupKeys)) {                  LOGGER.info("get changedGroupKeys:" + changedGroupKeys);             }             for (String groupKey : changedGroupKeys) {                  String[] key = GroupKey.parseKey(groupKey);                 String dataId = key[0];                 String group = key[1];                 String tenant = null;                 if (key.length == 3) {                      tenant = key[2];                 }                 try {                      String[] ct = getServerConfig(dataId, group, tenant, 3000L);                     CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));                     cache.setContent(ct[0]);                     if (null != ct[1]) {                          cache.setType(ct[1]);                     }                     LOGGER.info("[{ }] [data-received] dataId={ }, group={ }, tenant={ }, md5={ }, content={ }, type={ }",                                 agent.getName(), dataId, group, tenant, cache.getMd5(),                                 ContentUtils.truncateContent(ct[0]), ct[1]);                 } catch (NacosException ioe) {                      String message = String                         .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",                                 agent.getName(), dataId, group, tenant);                     LOGGER.error(message, ioe);                 }             }             for (CacheData cacheData : cacheDatas) {                  if (!cacheData.isInitializing() || inInitializingCacheList                     .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {                      cacheData.checkListenerMd5();                     cacheData.setInitializing(false);                 }             }             inInitializingCacheList.clear();             executorService.execute(this);         } catch (Throwable e) {              // If the rotation training task is abnormal, the next execution time of the task will be punished             LOGGER.error("longPolling error : ", e);             executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);         }     } } 

addTenantListeners

添加监听,这里主要是通过 dataId , group 来获取 cache 本地缓存的配置信息,然后再将 Listener 也传给 cache 统一管理。

public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)         throws NacosException {      group = null2defaultGroup(group);     String tenant = agent.getTenant();     CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);     for (Listener listener : listeners) {          cache.addListener(listener);     } } 

回调触发

如果 md5 值发生变化过后就会调用 safeNotifyListener 方法然后将配置信息发送给对应的监听器

void checkListenerMd5() {      for (ManagerListenerWrap wrap : listeners) {          if (!md5.equals(wrap.lastCallMd5)) {              safeNotifyListener(dataId, group, content, type, md5, wrap);         }     } } 

服务端响应

当服务端收到请求后,会 hold 住当前请求,如果有变化就返回,如果没有变化就等待超时之前返回无变化。

/**  * The client listens for configuration changes.  */ @PostMapping("/listener") @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class) public void listener(HttpServletRequest request, HttpServletResponse response)     throws ServletException, IOException {      request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);     String probeModify = request.getParameter("Listening-Configs");     if (StringUtils.isBlank(probeModify)) {          throw new IllegalArgumentException("invalid probeModify");     }     probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);     Map<String, String> clientMd5Map;     try {          clientMd5Map = MD5Util.getClientMd5Map(probeModify);     } catch (Throwable e) {          throw new IllegalArgumentException("invalid probeModify");     }     // do long-polling     inner.doPollingConfig(request, response, clientMd5Map, probeModify.length()); } 

LongPollingService

核心处理类 LongPollingService

/**    * Add LongPollingClient.    *    * @param req              HttpServletRequest.    * @param rsp              HttpServletResponse.    * @param clientMd5Map     clientMd5Map.    * @param probeRequestSize probeRequestSize.    */   public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,           int probeRequestSize) {        String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);       String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);       String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);       String tag = req.getHeader("Vipserver-Tag");       int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);       // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.       long timeout = Math.max(10000, Long.parseLong(str) - delayTime);       if (isFixedPolling()) {            timeout = Math.max(10000, getFixedPollingInterval());           // Do nothing but set fix polling timeout.       } else {            long start = System.currentTimeMillis();           List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);           if (changedGroups.size() > 0) {                generateResponse(req, rsp, changedGroups);               LogUtil.CLIENT_LOG.info("{ }|{ }|{ }|{ }|{ }|{ }|{ }", System.currentTimeMillis() - start, "instant",                       RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,                       changedGroups.size());               return;           } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {                LogUtil.CLIENT_LOG.info("{ }|{ }|{ }|{ }|{ }|{ }|{ }", System.currentTimeMillis() - start, "nohangup",                       RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,                       changedGroups.size());               return;           }       }       String ip = RequestUtil.getRemoteIp(req);       // Must be called by http thread, or send response.       final AsyncContext asyncContext = req.startAsync();       // AsyncContext.setTimeout() is incorrect, Control by oneself       asyncContext.setTimeout(0L);       ConfigExecutor.executeLongPolling(               new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));   } 

参考链接

https://blog.csdn.net/jason_jiahongfei/article/details/108373442 https://www.cnblogs.com/lockedsher/articles/14447700.html

本文转载自微信公众号「运维开发故事」,可以通过以下二维码关注。转载本文请联系运维开发故事公众号。

源码库
copyright © 2025 powered by 益强资讯全景  滇ICP备2023006006号-31sitemap