Nacos注册中心的篇学主要流程基本上撸完了,下面开始撸配置中心。配户端本文从示例入手走查了客户端的置管初始化流程,Listener的理客流程注册逻辑和执行逻辑。
示例
通过示例构建ConfigService、配户端注册了Listener分析其流程Client初始化概览
支持多种获取server地址方式(本地、置管endpoint) 支持多种namespace设置(本地、理客流程阿里云) 支持超时时间、篇学重试时间等参数设置 支持用户名和密码验证 长轮询会从BlockingQueue中获取元素,配户端队列有元素立即执行executeConfigListen,置管队列无元素阻塞5秒钟执行executeConfigListen()Listener注册逻辑
client添加Listener后会在cacheMap中缓存CacheData cacheMap中key由「dataId+group+tenant」拼接而成 每个CacheData会绑定注册的理客流程Listener列表 每个CacheData会绑定taskId,3000个不同的篇学CacheData对应一个taskId 设置isSyncWithServer=false表示 cache md5 data不是来自server同步 BlockingQueue中添加new Object() 供长轮询判断立即执行使用配置变更执行逻辑
执行逻辑由executeConfigListen方法实现 当CacheData从Server同步后,会校验md5是配户端否变更了,变更则回调注册的置管Listener完成通知 注册Listener后会构建与server的RPC通道rpcClient 向server发起变更查询请求configChangeListenRequest Server端通过比较缓存的md5值,返回client变更的key列表 Client通过变更的key列表向server发起配置查询请求ConfigQueryRequest 获取变更内容,并回调注册的Listener完成通知 回调注册的Listener是通过线程池异步执行Runnble Job实现的备注: 示例中构建了ConfigService,高防服务器注入Listener接受server配置变更通知。
NacosConfigService构造方法
public NacosConfigService(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); // 注解@1 initNamespace(properties); // 注解@2 ServerListManager serverListManager = new ServerListManager(properties); // 注解@3 serverListManager.start(); // 注解@4 this.worker = new ClientWorker(this.configFilterChainManager, serverListManager, properties); // 将被废弃HttpAgent,先忽略 // will be deleted in 2.0 later versions agent = new ServerHttpAgent(serverListManager); }注解@1 设置namespace可以通过properties.setProperty(PropertyKeyConst.NAMESPACE),代码中会兼容阿里云环境,在此忽略,默认为空。
注解@2 初始化namespace、server地址等信息
注解@3 启动主要用于endpoint方式定时获取server地址,当本地传入isFixed=true
注解@4 clientWorker初始化
ClientWorker初始化
public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager, final Properties properties) throws NacosException { this.configFilterChainManager = configFilterChainManager; // 注解@5 init(properties); // 注解@6 agent = new ConfigRpcTransportClient(properties, serverListManager); // 调度线程池,「处理器核数」 ScheduledExecutorService executorService = Executors .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker"); t.setDaemon(true); return t; } }); agent.setExecutor(executorService); // 注解@7 agent.start(); }注解@5 初始化超时时间、重试时间等
private void init(Properties properties) { // 超时时间,默认30秒 timeout = Math.max(ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT), Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT); // 重试时间,默认2秒 taskPenaltyTime = ConvertUtils .toInt(properties.getProperty(PropertyKeyConst.CONFIG_RETRY_TIME), Constants.CONFIG_RETRY_TIME); // 开启配置删除同步,默认false this.enableRemoteSyncConfig = Boolean .parseBoolean(properties.getProperty(PropertyKeyConst.ENABLE_REMOTE_SYNC_CONFIG)); }注解@6 gRPC config agent初始化
public ConfigTransportClient(Properties properties, ServerListManager serverListManager) { // 默认编码UTF-8 String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); if (StringUtils.isBlank(encodeTmp)) { this.encode = Constants.ENCODE; } else { this.encode = encodeTmp.trim(); } // namespace租户,默认空 this.tenant = properties.getProperty(PropertyKeyConst.NAMESPACE); this.serverListManager = serverListManager; // 用户名和密码验证 this.securityProxy = new SecurityProxy(properties, ConfigHttpClientManager.getInstance().getNacosRestTemplate()); }注解@7 gRPC agent启动
public void start() throws NacosException { // 简单用户名和密码验证 if (securityProxy.isEnabled()) { securityProxy.login(serverListManager.getServerUrls()); this.executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { securityProxy.login(serverListManager.getServerUrls()); } }, 0, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS); } startInternal(); } @Override public void startInternal() throws NacosException { executor.schedule(new Runnable() { @Override public void run() { while (true) { // 一直运行 try { // 最长等待5秒 listenExecutebell.poll(5L, TimeUnit.SECONDS); executeConfigListen(); } catch (Exception e) { LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e); } } } }, 0L, TimeUnit.MILLISECONDS); }小结: 线程会一直运行,从BlockingQueue中获取元素。队里不为空,获取后立即执行executeConfigListen();队列为空等待5秒后执行
executeConfigListen()。
注解@8 构建缓存数据CacheData并放入cacheMap中,缓存的key为 「dataId+group+tenant」例如:test+DEFAULT_GROUP。
每个CacheData会绑定对应的taskId,每3000个CacheData对应一个taskId。其实从后面的代码中可以看出,每个taskId会对应一个gRPC Client。
public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException { // 从缓存中获取 CacheData cache = getCache(dataId, group, tenant); if (null != cache) { return cache; } // 构造缓存key以+连接,test+DEFAULT_GROUP String key = GroupKey.getKeyTenant(dataId, group, tenant); synchronized (cacheMap) { CacheData cacheFromMap = getCache(dataId, group, tenant); // multiple listeners on the same dataid+group and race condition,so // double check again // other listener thread beat me to set to cacheMap if (null != cacheFromMap) { // 再检查一遍 cache = cacheFromMap; // reset so that server not hang this check cache.setInitializing(true); // 缓存正在初始化 } else { // 构造缓存数据对象 cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant); // 初始值taskId=0,云服务器注意此处每3000个CacheData共用一个taskId int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize(); cache.setTaskId(taskId); // fix issue # 1317 if (enableRemoteSyncConfig) { // 默认false String[] ct = getServerConfig(dataId, group, tenant, 3000L, false); cache.setContent(ct[0]); } } Map<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get()); // key = test+DEFAULT_GROUP copy.put(key, cache); // cacheMap = { test+DEFAULT_GROUP=CacheData [test, DEFAULT_GROUP]} cacheMap.set(copy); } LOGGER.info("[{ }] [subscribe] { }", agent.getName(), key); MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size()); return cache; } 属性 含义 name ConfigTransportClient名称,config_rpc_client configFilterChainManager filter拦截链条,可以执行一些列拦截器 dataId dataId group group名称,默认为DEFAULT_GROUP tenant 租户名称 listeners 添加的Listener列表,线程安全CopyOnWriteArrayList content 启动时会从本地文件读入,默认为null md5 content的md5字符串小结:添加监听器逻辑如下:构建CacheData,并缓存在cacheMap中,key是由「dataId+group+tenant」组成;每个CacheData会绑定了Listener列表,也绑定了taskId,3000个不同的CacheData对应一个taskId,对应一个gRPC通道实例;设置isSyncWithServer=false表示 cache md5 data不是来自server同步,BlockingQueue中添加new Object() 供前面提到的长轮询判断使用。
上文中提到一个线程一直在轮询,轮询执行executeConfigListen方法,这个方法比较关键。
public void executeConfigListen() { Map<String/*taskId*/, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16); Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16); long now = System.currentTimeMillis(); // 超过5分钟 boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL; for (CacheData cache : cacheMap.get().values()) { synchronized (cache) { // --------注解@9开始-------- if (cache.isSyncWithServer()) { cache.checkListenerMd5(); // 内容有变更通知Listener执行 if (!needAllSync) { // 不超过5分钟则不再全局校验 continue; } } // --------注解@9结束-------- if (!CollectionUtils.isEmpty(cache.getListeners())) { // 有添加Listeners // get listen config 默认 false if (!cache.isUseLocalConfigInfo()) { List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId())); if (cacheDatas == null) { cacheDatas = new LinkedList<CacheData>(); listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas); } // CacheData [test, DEFAULT_GROUP] cacheDatas.add(cache); } } else if (CollectionUtils.isEmpty(cache.getListeners())) { // 没有添加Listeners if (!cache.isUseLocalConfigInfo()) { List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId())); if (cacheDatas == null) { cacheDatas = new LinkedList<CacheData>(); removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas); } cacheDatas.add(cache); } } } } boolean hasChangedKeys = false; //-------------------注解@10开始--------------------------------- if (!listenCachesMap.isEmpty()) { // 有Listeners for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) { String taskId = entry.getKey(); List<CacheData> listenCaches = entry.getValue(); ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches); configChangeListenRequest.setListen(true); try { // 注解@10.1 每个taskId构建rpcClient RpcClient rpcClient = ensureRpcClient(taskId); // 注解@10.2 ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy( rpcClient, configChangeListenRequest); if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) { Set<String> changeKeys = new HashSet<String>(); // handle changed keys,notify listener // 有变化的configContext if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) { hasChangedKeys = true; for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse.getChangedConfigs()) { String changeKey = GroupKey .getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(), changeConfig.getTenant()); changeKeys.add(changeKey); boolean isInitializing = cacheMap.get().get(changeKey).isInitializing(); // 注解@10.3 回调Listener refreshContentAndCheck(changeKey, !isInitializing); } } //handler content configs for (CacheData cacheData : listenCaches) { String groupKey = GroupKey .getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant()); if (!changeKeys.contains(groupKey)) { // 注解@10.4 //sync:cache data md5 = server md5 && cache data md5 = all listeners md5. synchronized (cacheData) { if (!cacheData.getListeners().isEmpty()) { cacheData.setSyncWithServer(true); continue; } } } cacheData.setInitializing(false); } } } catch (Exception e) { LOGGER.error("Async listen config change error ", e); try { Thread.sleep(50L); } catch (InterruptedException interruptedException) { //ignore } } } } //-------------------注解@10结束--------------------------------- if (!removeListenCachesMap.isEmpty()) { for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) { String taskId = entry.getKey(); List<CacheData> removeListenCaches = entry.getValue(); ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches); configChangeListenRequest.setListen(false); try { // 向server发送Listener取消订阅请求ConfigBatchListenRequest#listen为false RpcClient rpcClient = ensureRpcClient(taskId); boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest); if (removeSuccess) { for (CacheData cacheData : removeListenCaches) { synchronized (cacheData) { if (cacheData.getListeners().isEmpty()) { // 移除本地缓存 ClientWorker.this .removeCache(cacheData.dataId, cacheData.group, cacheData.tenant); } } } } } catch (Exception e) { LOGGER.error("async remove listen config change error ", e); } try { Thread.sleep(50L); } catch (InterruptedException interruptedException) { //ignore } } } if (needAllSync) { lastAllSyncTime = now; } //If has changed keys,notify re sync md5. if (hasChangedKeys) { // key有变化触发下一轮轮询 notifyListenConfig(); } }注解@9 isSyncWithServer初始为false,在下文代码中校验结束后会设置为true,表示md5 cache data同步来自server。如果为true会校验Md5.
void checkListenerMd5() { for (ManagerListenerWrap wrap : listeners) { if (!md5.equals(wrap.lastCallMd5)) { // 注解@9.1 safeNotifyListener(dataId, group, content, type, md5, wrap); } } }注解@9.1 配置内容有变更时,回调到我们示例中注册的Listener中。
private void safeNotifyListener(final String dataId, final String group, final String content, final String type, final String md5, final ManagerListenerWrap listenerWrap) { final Listener listener = listenerWrap.listener; if (listenerWrap.inNotifying) { // ... return; } Runnable job = new Runnable() { @Override public void run() { long start = System.currentTimeMillis(); ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader(); ClassLoader appClassLoader = listener.getClass().getClassLoader(); try { if (listener instanceof AbstractSharedListener) { AbstractSharedListener adapter = (AbstractSharedListener) listener; adapter.fillContext(dataId, group); // ... } Thread.currentThread().setContextClassLoader(appClassLoader); ConfigResponse cr = new ConfigResponse(); cr.setDataId(dataId); cr.setGroup(group); cr.setContent(content); // filter拦截继续过滤 configFilterChainManager.doFilter(null, cr); String contentTmp = cr.getContent(); listenerWrap.inNotifying = true; // 注解@9.2 listener.receiveConfigInfo(contentTmp); // compare lastContent and content if (listener instanceof AbstractConfigChangeListener) { Map data = ConfigChangeHandler.getInstance() .parseChangeData(listenerWrap.lastContent, content, type); ConfigChangeEvent event = new ConfigChangeEvent(data); // 回调变更事件方法 ((AbstractConfigChangeListener) listener).receiveConfigChange(event); listenerWrap.lastContent = content; } listenerWrap.lastCallMd5 = md5; // .. } catch (NacosException ex) { // ... } catch (Throwable t) { // ... } finally { listenerWrap.inNotifying = false; Thread.currentThread().setContextClassLoader(myClassLoader); } } }; final long startNotify = System.currentTimeMillis(); try { // 注解@9.3 if (null != listener.getExecutor()) { listener.getExecutor().execute(job); } else { try { INTERNAL_NOTIFIER.submit(job); // 默认线程池执行,为5个线程 } catch (RejectedExecutionException rejectedExecutionException) { // ... job.run(); } catch (Throwable throwable) { // ... job.run(); } } } catch (Throwable t) { // ... } final long finishNotify = System.currentTimeMillis(); // ... }注解@9.2 回调注册Listener的receiveConfigInfo方法或者receiveConfigChange逻辑
注解@9.3 优先使用我们示例中注册提供的线程池执行job,网站模板如果没有设置使用默认线程池「INTERNAL_NOTIFIER」,默认5个线程
备注: 当CacheData从server同步后,会校验md5是否变更了,当变更时会回调到我们注册的Listener完成通知。通知任务被封装成Runnable任务,执行线程池可以自定义,默认为5个线程。
注解@10.1 每个taskId构建rpcClient,例如:taskId= config-0-c70e0314-4770-43f5-add4-f258a4083fd7;结合上下文每3000个CacheData对应一个rpcClient。
注解@10.2 向server发起configChangeListenRequest,server端由ConfigChangeBatchListenRequestHandler处理,还是比较md5
是否变更了,变更后server端返回变更的key列表。
注解@10.3 当server返回变更key列表时执行refreshContentAndCheck方法。
private void refreshContentAndCheck(CacheData cacheData, boolean notify) { try { // 注解@10.3.1 String[] ct = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L, notify); cacheData.setContent(ct[0]); if (null != ct[1]) { cacheData.setType(ct[1]); } if (notify) { // 记录日志 // ... } // 注解@10.3.2 cacheData.checkListenerMd5(); } catch (Exception e) { //... } }注解@10.3.1 向server发起ConfigQueryRequest,查询配置内容
注解@10.3.2 回调注册的Listener逻辑见 注解@9
注解@10.4 key没有变化的,内容由server同步,设置SyncWithServer=true,下一轮逻辑会由 注解@9 部分执行
备注: 从整个注解@10 注册Listener后,会构建与server的RPC通道rpcClient;向server发起变更查询请求configChangeListenRequest,server端通过比较缓存的md5值,返回client变更的key列表;client通过变更的key列表向server发起配置查询请求ConfigQueryRequest,获取变更内容,并回调我们注册的Listener。
本文转载自微信公众号「瓜农老梁」,可以通过以下二维码关注。转载本文请联系瓜农老梁公众号。