Nacos, as a configuration center, necessarily needs to ensure high availability of service nodes, so how does Nacos implement clustering?

The following diagram, represents the deployment diagram of Nacos cluster.

nacos cluster

How Nacos clusters work

Nacos is an uncentralized node design in the cluster architecture as a configuration center. Since there are no master-slave nodes and no election mechanism, it is necessary to add virtual IPs (VIPs) in order to enable hot standby.

The data storage of Nacos is divided into two parts

  1. Mysql database storage, where all Nacos nodes share the same copy of data, and the copy mechanism of data is solved by Mysql’s own master-slave scheme, thus ensuring data reliability.
  2. local disk of each node, which stores a full amount of data, with the following path: /data/program/nacos-1/data/config-data/${GROUP} .

In Nacos design, Mysql is a central data repository and the data in Mysql is considered to be absolutely correct. In addition, Nacos writes a copy of the data in Mysql to local disk at startup.

The advantage of this design is that it improves performance. When a client needs to request a configuration item, the server will want Ian to read the corresponding file from disk and return it, which is more efficient than the database.

When the configuration is changed.

  1. Nacos saves the changed configuration to the database and then writes it to a local file.
  2. Then an HTTP request is sent to other nodes in the cluster, and the other nodes receive the event and dump the data just written from Mysql to the local file.

In addition, when NacosServer is started, it will start a timed task to dump the full amount of data to the local file every 6 hours.

Configuration change synchronization portal

When a configuration is modified, deleted, or added, a notifyConfigChange event is issued.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@PostMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
        @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
        @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
        @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
        @RequestParam(value = "appName", required = false) String appName,
        @RequestParam(value = "src_user", required = false) String srcUser,
        @RequestParam(value = "config_tags", required = false) String configTags,
        @RequestParam(value = "desc", required = false) String desc,
        @RequestParam(value = "use", required = false) String use,
        @RequestParam(value = "effect", required = false) String effect,
        @RequestParam(value = "type", required = false) String type,
        @RequestParam(value = "schema", required = false) String schema) throws NacosException {
    
   //Omit..
    if (StringUtils.isBlank(betaIps)) {
        if (StringUtils.isBlank(tag)) {
            persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
            ConfigChangePublisher
                    .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
        } else {
            persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);
            ConfigChangePublisher.notifyConfigChange(
                    new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
        }
    }//Omit
    return true;
}

AsyncNotifyService

Configure data change events with a dedicated listener, AsyncNotifyService, which will handle synchronization events after data changes.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Autowired
public AsyncNotifyService(ServerMemberManager memberManager) {
    this.memberManager = memberManager;
    
    // Register ConfigDataChangeEvent to NotifyCenter.
    NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
    
    // Register A Subscriber to subscribe ConfigDataChangeEvent.
    NotifyCenter.registerSubscriber(new Subscriber() {
        
        @Override
        public void onEvent(Event event) {
            // Generate ConfigDataChangeEvent concurrently
            if (event instanceof ConfigDataChangeEvent) {
                ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
                long dumpTs = evt.lastModifiedTs;
                String dataId = evt.dataId;
                String group = evt.group;
                String tenant = evt.tenant;
                String tag = evt.tag;
                Collection<Member> ipList = memberManager.allMembers(); //Get the list of ip's in the cluster
                
                // Build NotifySingleTask and add it to the queue.
                Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
                for (Member member : ipList) { //Iterate over each node in the cluster
                    queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
                            evt.isBeta));
                }
                //Asynchronous execution of tasks, AsyncTask
                ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
            }
        }
        
        @Override
        public Class<? extends Event> subscribeType() {
            return ConfigDataChangeEvent.class;
        }
    });
}

AsyncTask

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Override
public void run() {
    executeAsyncInvoke();
}

private void executeAsyncInvoke() {
    while (!queue.isEmpty()) {//Iterate over the data in the queue until it is empty
        NotifySingleTask task = queue.poll(); //Get task
        String targetIp = task.getTargetIP(); //Get target ip
        
        if (memberManager.hasMember(targetIp)) { //If the list of ip's in the cluster contains the target ip
            // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
            //Determine the health status of the target ip
            boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp); //
            if (unHealthNeedDelay) { //If the target service is non-healthy, it continues to be added to the queue and deferred for further execution.
                // target ip is unhealthy, then put it in the notification list
                ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
                        task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
                        0, task.target);
                // get delay time and set fail count to the task
                asyncTaskExecute(task);
            } else {
                //Build header
                Header header = Header.newInstance();
                header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));
                header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
                if (task.isBeta) {
                    header.addParam("isBeta", "true");
                }
                AuthHeaderUtil.addIdentityToHeader(header);
                //Initiate a remote call via restTemplate, and if the call is successful, execute the AsyncNotifyCallBack callback method
                restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));
            }
        }
    }
}

The target node receives the request

The request address for data synchronization is: task.url=http://192.168.8.16:8848/nacos/v1/cs/communication/dataChange?dataId=log.yaml&group=DEFAULT_GROUP

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
        @RequestParam("group") String group,
        @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
        @RequestParam(value = "tag", required = false) String tag) {
    dataId = dataId.trim();
    group = group.trim();
    String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
    long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
    String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
    String isBetaStr = request.getHeader("isBeta");
    if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
        dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
    } else {
        //
        dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
    }
    return true;
}

The dumpService.dump is used to implement the configuration update with the following code

The current task will be added to DumpTaskMgr to manage it.

1
2
3
4
5
6
7
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
        boolean isBeta) {
    String groupKey = GroupKey2.getKey(dataId, group, tenant);
    String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
    dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
    DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}

TaskManager.addTask, call the parent class first to finish adding the task.

1
2
3
4
5
@Override
public void addTask(Object key, AbstractDelayTask newTask) {
    super.addTask(key, newTask);
    MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
}

In this scenario design, it is usually done using the producer-consumer pattern, so it is not hard to guess here that the task will be saved to a queue and then there will be another thread to execute it.

NacosDelayTaskExecuteEngine

The parent class of TaskManager is NacosDelayTaskExecuteEngine,

This class has a member property protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks; that holds the deferred task type AbstractDelayTask.

In the constructor of this class, a delayed task is initialized, where the specific task is ProcessRunnable.

1
2
3
4
5
6
7
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
    super(logger);
    tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
    processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
    processingExecutor
            .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}

ProcessRunnable

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
private class ProcessRunnable implements Runnable {
    
    @Override
    public void run() {
        try {
            processTasks();
        } catch (Throwable e) {
            getEngineLog().error(e.toString(), e);
        }
    }
}

processTasks

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected void processTasks() {
    //Get all tasks
    Collection<Object> keys = getAllTaskKeys();
    for (Object taskKey : keys) {
        AbstractDelayTask task = removeTask(taskKey);
        if (null == task) {
            continue;
        }
        //Get the task processor, here the DumpProcessor is returned
        NacosTaskProcessor processor = getProcessor(taskKey);
        if (null == processor) {
            getEngineLog().error("processor not found for task, so discarded. " + task);
            continue;
        }
        try {
            // ReAdd task if process failed
            if (!processor.process(task)) {
                retryFailedTask(taskKey, task);
            }
        } catch (Throwable e) {
            getEngineLog().error("Nacos task execute error : " + e.toString(), e);
            retryFailedTask(taskKey, task);
        }
    }
}

DumpProcessor.process

Retrieves the latest data from the database and then updates the local cache and disk.