首页 > 科技 > 聊聊nacos ServiceManager的UpdatedServiceProcessor

聊聊nacos ServiceManager的UpdatedServiceProcessor

本文主要研究一下nacos ServiceManager的UpdatedServiceProcessor

ServiceManager.init

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component
@DependsOn("nacosApplicationContext")
public class ServiceManager implements RecordListener {

/**
* Map>
*/
private Map> serviceMap = new ConcurrentHashMap<>();

private LinkedBlockingDeque toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);

private Synchronizer synchronizer = new ServiceStatusSynchronizer();

private final Lock lock = new ReentrantLock();

@Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService;

@Autowired
private SwitchDomain switchDomain;

@Autowired
private DistroMapper distroMapper;

@Autowired
private ServerListManager serverListManager;

@Autowired
private PushService pushService;

private final Object putServiceLock = new Object();

@PostConstruct
public void init() {

UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);

UtilsAndCommons.SERVICE_UPDATE_EXECUTOR.submit(new UpdatedServiceProcessor());

try {
Loggers.SRV_LOG.info("listen for service meta change");
consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);
} catch (NacosException e) {
Loggers.SRV_LOG.error("listen for service meta change failed!");
}
}

//......
}
  • ServiceManager的init方法往UtilsAndCommons.SERVICE_UPDATE_EXECUTOR提交了UpdatedServiceProcessor任务

UpdatedServiceProcessor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

 private class UpdatedServiceProcessor implements Runnable {
//get changed service from other server asynchronously
@Override
public void run() {
ServiceKey serviceKey = null;

try {
while (true) {
try {
serviceKey = toBeUpdatedServicesQueue.take();
} catch (Exception e) {
Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");
}

if (serviceKey == null) {
continue;
}
GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey));
}
} catch (Exception e) {
Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);
}
}
}
  • UpdatedServiceProcessor实现了Runnable方法,其run方法会不断循环从toBeUpdatedServicesQueue获取元素,然后使用GlobalExecutor.submitServiceUpdate提交ServiceUpdater

ServiceUpdater

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

 private class ServiceUpdater implements Runnable {

String namespaceId;
String serviceName;
String serverIP;

public ServiceUpdater(ServiceKey serviceKey) {
this.namespaceId = serviceKey.getNamespaceId();
this.serviceName = serviceKey.getServiceName();
this.serverIP = serviceKey.getServerIP();
}

@Override
public void run() {
try {
updatedHealthStatus(namespaceId, serviceName, serverIP);
} catch (Exception e) {
Loggers.SRV_LOG.warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}",
serviceName, serverIP, e);
}
}
}
  • ServiceUpdater实现了Runnable接口,其run方法执行的是updatedHealthStatus

ServiceManager.updatedHealthStatus

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component
@DependsOn("nacosApplicationContext")
public class ServiceManager implements RecordListener {

/**
* Map>
*/
private Map> serviceMap = new ConcurrentHashMap<>();

private LinkedBlockingDeque toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);

private Synchronizer synchronizer = new ServiceStatusSynchronizer();

private final Lock lock = new ReentrantLock();

@Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService;

@Autowired
private SwitchDomain switchDomain;

@Autowired
private DistroMapper distroMapper;

@Autowired
private ServerListManager serverListManager;

@Autowired
private PushService pushService;

private final Object putServiceLock = new Object();

//......

public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) {
Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
JSONObject serviceJson = JSON.parseObject(msg.getData());

JSONArray ipList = serviceJson.getJSONArray("ips");
Map ipsMap = new HashMap<>(ipList.size());
for (int i = 0; i < ipList.size(); i++) {

String ip = ipList.getString(i);
String[] strings = ip.split("_");
ipsMap.put(strings[0], strings[1]);
}

Service service = getService(namespaceId, serviceName);

if (service == null) {
return;
}

boolean changed = false;

List instances = service.allIPs();
for (Instance instance : instances) {

boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIPAddr()));
if (valid != instance.isHealthy()) {
changed = true;
instance.setHealthy(valid);
Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}@{}{}",
serviceName, (instance.isHealthy() ? "ENABLED" : "DISABLED"),
instance.getIp(), instance.getPort(), instance.getClusterName());
}
}

if (changed) {
pushService.serviceChanged(service);
}

StringBuilder stringBuilder = new StringBuilder();
List allIps = service.allIPs();
for (Instance instance : allIps) {
stringBuilder.append(instance.toIPAddr()).append("_").append(instance.isHealthy()).append(",");
}

if (changed && Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}",
service.getNamespaceId(), service.getName(), stringBuilder.toString());
}

}

//......
}
  • updatedHealthStatus方法会从synchronizer获取msg,组装ipsMap,之后通过service.allIPs()获取instances信息,然后遍历instances从ipsMap获取实例的valid状态,如果与instance的isHealthy()对不上则标记为changed,更新instance的healthy;对于changed的则通过pushService.serviceChanged(service)发布事件,最后打印日志

小结

  • ServiceManager的init方法往UtilsAndCommons.SERVICE_UPDATE_EXECUTOR提交了UpdatedServiceProcessor任务
  • UpdatedServiceProcessor实现了Runnable方法,其run方法会不断循环从toBeUpdatedServicesQueue获取元素,然后使用GlobalExecutor.submitServiceUpdate提交ServiceUpdater
  • ServiceUpdater实现了Runnable接口,其run方法执行的是updatedHealthStatus

doc

  • ServiceManager

本文来自投稿,不代表本人立场,如若转载,请注明出处:http://www.souzhinan.com/kj/208014.html