博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Vert.x系列(四)-- HAManager源码分析
阅读量:7183 次
发布时间:2019-06-29

本文共 4374 字,大约阅读时间需要 14 分钟。

hot3.png

三.HA模式。

前言:在集群模式下,Vert.x框架拥有HA(HighAvailability)能力。通俗的解释是集群中的一个节点跪了,原来运行在失败节点上的Verticle会在其他节点上自动启动。当然,前提是Verticle被设置了HA模式。

原理:

首先,HA的构造方法里传入了ClusterManager ,利用这个类实现node的add/left 监听,
这样集群的每个节点获得其他节点的add/left。每当add事件发生,把节点的信息存到集群贡献的map中,除非集群崩溃,信息都回在。每当left事件发生,存活的节点确定自己是否与失败节点在同一个组中:
如果不是同一个组,那么它将不会作为故障转移节点的候选者。群集中的节点仅故障转移到同一组中的其他节点;
如果是同一个组,那么用失败节点的UUID计算Hashcode,然后用这个Hashcode对集群节点的个数做取模(%)运算,以取模的结果作为索引从集群所有节点的List拿到对应的节点,判断这个计算的出来的节点是不是自身。如果是自身就re-deploy那个失败的节点。

其中有个重要的逻辑检查一个“集群最小节点数”quorum的值。(quorum本意:会议法定最小参加人数)。官方说可以避免quorum 丢失后导致的竞争条件,如果不检查就要使用排他锁,而排他锁太棘手,很容易导致死锁。(原文: avoid race conditions resulting in modules being deployed after a quorum has been lost,and without having to resort to exclusive locking which is actually quite tricky here, and prone to deadlock。)

让我来猜测、估计、YY这段话的原意:框架实现了最小quorum台服务器存活才称为集群,才实现HA模式。即 if(aliveServerNum > quorum) {
//  doHA
}
但是这个aliveServerNum 必然分别维护在这N台服务上。如何维护这个aliveServerNum?
一台服务器跪了,需要aliveServerNum -- 。一台服务器加了,需要aliveServerNum++。如果有多台服务器同时加入,代码需要设计为
// get lock 
{
 aliveServerNum++
// release lock
那么,可以想象的场景:S1,S2服务器发生重启,要各自通知到
A1, A2,A3...An 个alive的服务器。
S1对 A1 的aliveServerNum++锁定,请求 A2的锁。
S2对 A2 的aliveServerNum++锁定,请求 A1的锁。 

与其费工夫维护aliveServerNum,还不如用 checkQuorum()方法和boolean attainedQuorum代替。即

checkQuorum()
if(attainedQuorum) {
//  doHA
}
但是,这里我有个疑问是:
如果lock是集群锁才有上面的死锁问题,如果lock是单机锁synchronize,好像能解决问题又不会有死锁?但无论如何,不维护aliveServerNum是更明智的选择。

代码:
private final VertxInternal vertx;   // vertx
private final DeploymentManager deploymentManager;  // deploymentManager
private final ClusterManager clusterManager;  //  接口,重要方法是 join leave nodeListener

private final int quorumSize;          // 集群最小节点数

private final String group;           // 相同group才有HA逻辑
private final JsonObject haInfo;                       
private final Map<String, String> clusterMap; //
// 别的属性是单机的,当服务器跪掉就丢失了
// 为了failover时还能取得当时的信息。需要把信息变成
// 集群共享,在需要时用clusterManager.getSyncMap()取得。 haInfo和clusterMap就是用来做这件事
// clusterMap 的存在意义是方便用nodeID查找。

private final String nodeID;      // 节点ID,唯一性。

private final Queue<Runnable> toDeployOnQuorum = new ConcurrentLinkedQueue<>();
// 如果节点跪了时,集群不在 quorum 状态。 那么就把信息放在这个队列容器中。然后用定时器(vertx.setPeriodic)不停的检测。 等达到attainedQuorum再deploy.

private final boolean enabled;  // 是否HA的总开关。

private long quorumTimerID;  //  见上面 toDeployOnQuorum 。定时器的ID,达到条件后调用vertx.cancelTimer(quorumTimerID);取消

private volatile boolean attainedQuorum; // 见 “原理”

private volatile FailoverCompleteHandler failoverCompleteHandler; // 提供自定义的重启逻辑插入
private volatile boolean failDuringFailover; // For testing:
private volatile boolean stopped; // 状态量
private volatile boolean killed; // 状态量
private Consumer<Set<String>> clusterViewChangedHandler;

HAManager 的构造方法

clusterManager.nodeListener(new NodeListener() {  @Override  public void nodeAdded(String nodeID) {    HAManager.this.nodeAdded(nodeID);  }  @Override  public void nodeLeft(String leftNodeID) {    HAManager.this.nodeLeft(leftNodeID);  }});

存在 nodeAdded 和 nodeLeft的监听。clusterManager 是接口,具体还要看由实现类。

private void doDeployVerticle(final String verticleName, DeploymentOptions deploymentOptions,                              final Handler
> doneHandler) { final Handler
> wrappedHandler = asyncResult -> { if (asyncResult.succeeded()) { // Tell the other nodes of the cluster about the verticle for HA purposes addToHA(asyncResult.result(), verticleName, deploymentOptions); } if (doneHandler != null) { doneHandler.handle(asyncResult); } else if (asyncResult.failed()) { log.error("Failed to deploy verticle", asyncResult.cause()); } }; deploymentManager.deployVerticle(verticleName, deploymentOptions, wrappedHandler);}

这个方法一开始看了让人蒙圈,没有搞明白前面的asyncResult怎么就succeeded了。其实,方法前部所有的代码都是为了定义一个wrappedHandler ,定义完了在方法最后一句传到deploymentManager.deployVerticle里。跟踪代码,在若干方法栈后才有对这个wrappedHandler.handle()方法调用:

completionHandler.handle(result);

所以,额外说句,这个方法的名字其实有问题。一般名为doX的方法,都是处理最底层的X业务,而这里其实是交付给了deploymentManager去做。

private String chooseHashedNode(String group, int hashCode)

这个方法会计算出重新部署的节点,然后每个存活的节点都调用,判断结果是不是节点自己。

String chosen = chooseHashedNode(group, failedNodeID.hashCode());if (chosen != null && chosen.equals(this.nodeID)) {

如果是自己,才继续恢复流程,调用核心业务方法。如果不是自己,没有事情发生。

private void processFailover(JsonObject failedVerticle)

processFailover()使用了 CountDownLatch类 处理线程阻塞和超时处理逻辑。

别的方法都很简单的,看名字就知道内容。

转载于:https://my.oschina.net/u/2382040/blog/3026728

你可能感兴趣的文章
photoshop快捷键大全
查看>>
Android -- EventBus使用
查看>>
利用gulp搭建本地服务器,并能模拟ajax
查看>>
Java一些八卦集合类
查看>>
linux进程地址空间--vma的基本操作【转】
查看>>
【转】SQLite3的各个函数(全)
查看>>
基于DotNet构件技术的企业级敏捷软件开发平台 AgileEAS.NET - 插件运行容器
查看>>
LintCode: Minimum Path Sum
查看>>
LintCode: Maximum Subarray
查看>>
大数据在金融和贸易中的作用
查看>>
开发者必读 移动端页面优化的10个好方法
查看>>
Nest 为何刚宣布开放 API,就能吸引到重量级盟友?
查看>>
JavaScript之this指针深入详解
查看>>
服务器运行过程中如何进行维护
查看>>
Web前端知识杂乱 如何分清主次和学习优先级?
查看>>
数据驱动的迷思
查看>>
软件工程师欲发动DDoS攻击白宫网站 抗议特朗普就任总统
查看>>
在中关村问小米
查看>>
Duolingo推出聊天机器人功能帮助你学习外语
查看>>
《企业级ios应用开发实战》一3.3 MVC模式
查看>>