nacos的配置中⼼--服务端
⼀:配置服务端存储模型
1.1:概述
Nacos Config提供了配置管理的功能,它允许⽤户在nacos上配置key-value对,并在客户端订阅需要的配置。当配置发⽣变更时,订阅的客户端会获得通知,随后拉取最新的key-value对。
Config Server为了最⼤程度保证可⽤性采⽤了⼀种三层的存储架构设计,mysql - 本地⽂件 - 内存缓存:
1.2:数据库
Config Server所有的key-value配置信息都最终存储在mysql数据库中,当中包含四张核⼼表(table):
config_info - 存储配置信息,包含id/data_id/group_id/tenant_id/content/md5/gmt_created/gmt_modified/app_name等列。当中data_id + group_id + tenant_id三者唯⼀确定⼀条key-value配置。
config_tags_relation - 存储配置上附加的tag,包含id/tag_name/tag_type/data_id/group_id/tenant_id/nid等列。
config_info_beta - 存储beta环境的特殊配置值,除了config_info表中的列之外新增了beta_ips列。
config_info_tag - 存储某个tag对应的特殊配置值,除了config_info表中的列之外新增了tag_id列。
1.3:本地磁盘
mysql数据库中存储的是最终的配置信息,config rver在启动后会周期性(360min)的从mysql中将所有配置信息dump到本地⽂件系统中。配置会被存储到⼀个特殊的⽬录/{ur.home}/nacos/data/config-data/{groupId}/{dataId}下,每条配置存在⼀个独⽴的⽂件中。
四字匾额大全
config rver中关于配置的读取都是⾛本地⽂件系统中的dump,这种设计⼀⽅⾯提升了系统的可⽤性(防⽌mysql奔溃导致config不可⽤),另⼀⽅⾯极⼤降低了mysql数据库的负载,使得config rver的⽔平扩张变得⾮常容易。
1.4:缓存
当config rver启动时会⼀次性把mysql中存储的所有配置dump到本地⽂件系统中,并设置⼀个定时器周期性(默认6h)做全量dump。config rver也有⼀种quick start模式,允许重⽤⽂件系统中保留的配置数据,做增量dump。
配置信息的写⾸先进⼊到mysql数据库中。mysql插⼊成功之后rver会⽣成⼀个ConfigDataChangeEvent事件,在AsyncNotifyService中将捕获这个事件,对当前每⼀个config rver发起/dataChange调⽤。
/dataChange调⽤在CommunicationController中被处理,通过ConfigService将变动的数据dump到本地⽂件中并更新内存缓存。
配置的读取及订阅都是从内存Cache + 本地⽂件中完成。
⼆、数据存储
2.1:流程图
整个流程分为两⼤部分:
⼊库。插⼊mysql数据库,发起ConfigDataChangeEvent,调⽤所有rver上的/dataChange接⼝。
dump。rver响应/dataChange请求,异步dump数据库配置信息到本地。
2.2:ConfigController.publishConfig
处理config获取/订阅/变更相关的http请求。
2.2.1:⼊⼝
其中进⾏⼀系列的逻辑判断,但是可以看出主要做了两件事:持久化和事件发布。
public Boolean publishConfig(HttpServletRequest request, HttpServletRespon respon,
@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
@RequestParam(value = "tenant", required = fal, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = fal) String tag,
@RequestParam(value = "appName", required = fal) String appName,
@RequestParam(value = "src_ur", required = fal) String srcUr,
@RequestParam(value = "config_tags", required = fal) String configTags,
@RequestParam(value = "desc", required = fal) String desc,
@RequestParam(value = "u", required = fal) String u,
@RequestParam(value = "effect", required = fal) String effect,
@RequestParam(value = "type", required = fal) String type,
@RequestParam(value = "schema", required = fal) String schema) throws NacosException {
.......
//进⾏持久化保存
persistService.inrtOrUpdate(srcIp, srcUr, configInfo, time, configAdvanceInfo, true);
//配置更新事件
ConfigChangePublisher
.notifyConfigChange(new ConfigDataChangeEvent(fal, dataId, group, tenant, Time()));
.......
十月的最后一天}
其中持久化主要交互到数据库,进⾏配置数据的插⼊和历史表的插⼊。
后⾯⼀步,封装成为ConfigDataChangeEvent进⾏事件发布,只需要找到监听位置进⾏跟踪后续逻辑。事件的发布和监听,是nacos⾃⼰写的逻辑,将任务发布即将任务加⼊到某个任务队列,另外有⼀个线程在阻塞监听队列的数据,⼀旦发现队列中有数据,会根据任务类型找到任务的订阅者,由订阅者处理新发布的事件。
2.2.2 ConfigDataChangeEvent监听
监听事件的处理内容:
@Override
public void onEvent(Event event) {
// Generate ConfigDataChangeEvent concurrently
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = up;
String tenant = ant;
String tag = evt.tag;
Collection<Member> ipList = memberManager.allMembers();
// In fact, any type of queue here can be
//初始化⼀个队列
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
for (Member member : ipList) {
/
/遍历nacos集群的成员,并在队列中添加通知任务
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, Address(),
evt.isBeta));
}
//开辟新的任务,执⾏队列中的任务
}
}
2.2.3 不同节点同步
private void executeAsyncInvoke() {
while (!queue.isEmpty()) {
/
/弹出队列的数据
NotifySingleTask task = queue.poll();
String targetIp = TargetIP();
//判断是否是集群成员
if (memberManager.hasMember(targetIp)) {
// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwi notify
boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);
//判断节点是否为⾮监控节点,如果不健康则延时进⾏调⽤,并根据尝试次数增加延时时长
if (unHealthNeedDelay) {
// target ip is unhealthy, then put it in the notification list
ConfigTraceService.DataId(), Group(), Tenant(), null,
0, task.target);
// get delay time and t fail count to the task
asyncTaskExecute(task);
} el {
//对于健康节点,直接执⾏通知逻辑
HttpGet request = new HttpGet(task.url);
request.tHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,
String.LastModified()));
request.tHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, SelfIp());
if (task.isBeta) {
request.tHeader("isBeta", "true");
}
//执⾏请求,并带上回调函数
}
}
}
对于⾮监控节点延长延时时间,主要是根据次数增减加时长
private static int getDelayTime(NotifySingleTask task) {
int failCount = FailCount();
int delay = MIN_RETRY_INTERVAL + failCount * failCount * INCREASE_STEPS;
if (failCount <= MAX_COUNT) {
task.tFailCount(failCount + 1);
}
return delay;
}
视频拜年
对于回调函数的内容,⽆⾮就是对http请求成功或者失败的判定,以及根据结果进⾏后续的逻辑处理:
芸豆炒肉
⽆论是请求成功结果状态码不正确还是调⽤失败,异或是取消,都会添加任务到队列中进⾏重试。
@Override
public void completed(HttpRespon respon) {
long delayed = System.currentTimeMillis() - LastModified();
if (StatusLine().getStatusCode() == HttpStatus.SC_OK) {
......
} el {
......
asyncTaskExecute(task);
.......
}
HttpClientUtils.cloQuietly(respon);
}
@Override
public void failed(Exception ex) {我的自传
......
asyncTaskExecute(task);
......
}
@Override
public void cancelled() {
......
asyncTaskExecute(task);
......
}
private NotifySingleTask task;
private CloableHttpAsyncClient httpClient;
}
2.3:ifyConfigInfo
该接⼝,对应与2.2中的通知逻辑,该部分主要进⾏两件事情:
(1):将配置数据更新到缓存和磁盘⽂件之中;
(2):如果有客户端长连接监听配置信息的变化,此时会找到对应key的长连接的请求,并进⾏响应。
2.3.2 接收到请求
if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
} el {
dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
}
dump的⽅法,根据dataId, group, tenant(命名空间编号),组装成的key。
public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {
String groupKey = Key(dataId, group, tenant);
//添加任务
dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));
}
//添加任务到tasks中,添加任务时候使⽤lock,⼤概是防⽌在任务添加的时候,执⾏该类的processor⽅法。public void addTask(String type, AbstractTask task) {
this.lock.lock();
try {
AbstractTask oldTask = tasks.put(type, task);
if (null != oldTask) {
<(oldTask);
}
} finally {
this.lock.unlock();
}
该类中有⼀个processor的⽅法,该⽅法是当前类循环执⾏,每次执⾏间隙会休眠:
public void run() {
while (!TaskManager.()) {
try {
Thread.sleep(100);
TaskManager.this.process();
} catch (Throwable e) {
LogUtil.("execute dump process has error : {}", e);
}
}
}
在processor⽅法两阶段事情:
(1):判断当前任务是否还需要继续执⾏,不需要就将任务对队列中去除;
(2):找到TaskProcessor,并调⽤result = processor.Key(), task);⽅法
protected void process() {
for (Map.Entry<String, AbstractTask> entry : Set()) {
AbstractTask task = null;
this.lock.lock();
try {
// Getting task.
//判断当前任务是否还需要继续执⾏,不需要就删除
task = Value();
if (null != task) {
if (!task.shouldProcess()) {
// If current task needn't to process, then it will skip.
continue;
}
// Remove task from task maps.
Key());
}
} finally {
this.lock.unlock();
}
if (null != task) {
// Getting task processor.找到对应的TaskProcessor,此时会使⽤默认this.processor = new DumpProcessor(this);
TaskProcessor processor = (Key());
if (null == processor) {
适应作文// If has no related typpe processor, then it will u default processor.
processor = DefaultTaskProcessor();
}
if (null != processor) {
boolean result = fal;
try {
// Execute the task.
应急演练图片result = processor.Key(), task);
} catch (Throwable t) {
<("task_fail", "处理task失败", t);
}
if (!result) {
// If task is executed failed, the t lastProcessTime.
task.tLastProcessTime(System.currentTimeMillis());
// Add task into task map again.
this.Key(), task);
}
}
}
}
if (tasks.isEmpty()) {
前哨信息港this.lock.lock();
try {
} finally {
this.lock.unlock();
}
}
}
2.3.3 DumpProcessor.process
其中⼀⼤坨代码,只是进⾏两个操作:包装数据和调⽤figDump(build.build())⽅法:
//包装数据
ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
.group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);
......
//查询数据,查出最新数据
ConfigInfo4Beta cf = persistService.findConfigInfo(dataId, group, tenant);
//如果查出是null的处理
build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps());
//进⾏进步调⽤
figDump(build.build());
}
2.3.figDump
其中的核⼼代码:
//对⽐md5值之后,保存到缓存和本地⽂件
result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type);
直接进⼊ConfigCacheService.dump:
开始部分:
尝试获取⼀次写锁,如果获取失败,就会直接返回,不再进⾏数据写⼊,写锁是代码层⾯实现,不深⼊。
final int lockResult = tryWriteLock(groupKey);
asrt (lockResult != 0);
if (lockResult < 0) {
DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
return fal;
}
接下来进⼊主题:
//获取当前内容的md5值
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
if (md5.ContentMd5(groupKey))) {
DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
+ "lastModifiedNew={}", groupKey, md5, LastModifiedTs(groupKey),
lastModifiedTs);
} el if (!PropertyUtil.isDirectRead()) {
//保存到本地磁盘
DiskUtil.saveToDisk(dataId, group, tenant, content);
}
//更新MD5值
updateMd5(groupKey, md5, lastModifiedTs);
其中保存本地磁盘不再细化,即找到⽬标⽂件,将数据写⼊。后⾯updateMd5值:
public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
CacheItem cache = makeSure(groupKey);
if (cache.md5 == null || !cache.md5.equals(md5)) {
cache.md5 = md5;
cache.lastModifiedTs = lastModifiedTs;
NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
}
}
其中makerSure就是获取缓存数据,上⾯已经叙述缓存中只保存对应的md5值和更新时间,读写锁等信息,不包括具体的配置数据。此时可以看到发布LocalDataChangeEvent事件,监听该事件部分在下⼀个章节进⾏描述。