三.HA模式。
前言:在集群模式下,Vert.x框架拥有HA(HighAvailability)能力。通俗的解释是集群中的一个节点跪了,原来运行在失败节点上的Verticle会在其他节点上自动启动。当然,前提是Verticle被设置了HA模式。
原理:
首先,HA的构造方法里传入了ClusterManager ,利用这个类实现node的add/left 监听, 这样集群的每个节点获得其他节点的add/left。每当add事件发生,把节点的信息存到集群贡献的map中,除非集群崩溃,信息都回在。每当left事件发生,存活的节点确定自己是否与失败节点在同一个组中: 如果不是同一个组,那么它将不会作为故障转移节点的候选者。群集中的节点仅故障转移到同一组中的其他节点; 如果是同一个组,那么用失败节点的UUID计算Hashcode,然后用这个Hashcode对集群节点的个数做取模(%)运算,以取模的结果作为索引从集群所有节点的List拿到对应的节点,判断这个计算的出来的节点是不是自身。如果是自身就re-deploy那个失败的节点。其中有个重要的逻辑检查一个“集群最小节点数”quorum的值。(quorum本意:会议法定最小参加人数)。官方说可以避免quorum 丢失后导致的竞争条件,如果不检查就要使用排他锁,而排他锁太棘手,很容易导致死锁。(原文: avoid race conditions resulting in modules being deployed after a quorum has been lost,and without having to resort to exclusive locking which is actually quite tricky here, and prone to deadlock。)
让我来猜测、估计、YY这段话的原意:框架实现了最小quorum台服务器存活才称为集群,才实现HA模式。即 if(aliveServerNum > quorum) { // doHA } 但是这个aliveServerNum 必然分别维护在这N台服务上。如何维护这个aliveServerNum? 一台服务器跪了,需要aliveServerNum -- 。一台服务器加了,需要aliveServerNum++。如果有多台服务器同时加入,代码需要设计为 // get lock { aliveServerNum++ } // release lock 那么,可以想象的场景:S1,S2服务器发生重启,要各自通知到 A1, A2,A3...An 个alive的服务器。 S1对 A1 的aliveServerNum++锁定,请求 A2的锁。 S2对 A2 的aliveServerNum++锁定,请求 A1的锁。与其费工夫维护aliveServerNum,还不如用 checkQuorum()方法和boolean attainedQuorum代替。即
checkQuorum() if(attainedQuorum) { // doHA } 但是,这里我有个疑问是: 如果lock是集群锁才有上面的死锁问题,如果lock是单机锁synchronize,好像能解决问题又不会有死锁?但无论如何,不维护aliveServerNum是更明智的选择。 代码: private final VertxInternal vertx; // vertx private final DeploymentManager deploymentManager; // deploymentManager private final ClusterManager clusterManager; // 接口,重要方法是 join leave nodeListenerprivate final int quorumSize; // 集群最小节点数
private final String group; // 相同group才有HA逻辑 private final JsonObject haInfo; private final Map<String, String> clusterMap; // // 别的属性是单机的,当服务器跪掉就丢失了 // 为了failover时还能取得当时的信息。需要把信息变成 // 集群共享,在需要时用clusterManager.getSyncMap()取得。 haInfo和clusterMap就是用来做这件事 // clusterMap 的存在意义是方便用nodeID查找。private final String nodeID; // 节点ID,唯一性。
private final Queue<Runnable> toDeployOnQuorum = new ConcurrentLinkedQueue<>(); // 如果节点跪了时,集群不在 quorum 状态。 那么就把信息放在这个队列容器中。然后用定时器(vertx.setPeriodic)不停的检测。 等达到attainedQuorum再deploy.private final boolean enabled; // 是否HA的总开关。
private long quorumTimerID; // 见上面 toDeployOnQuorum 。定时器的ID,达到条件后调用vertx.cancelTimer(quorumTimerID);取消
private volatile boolean attainedQuorum; // 见 “原理”
private volatile FailoverCompleteHandler failoverCompleteHandler; // 提供自定义的重启逻辑插入 private volatile boolean failDuringFailover; // For testing: private volatile boolean stopped; // 状态量 private volatile boolean killed; // 状态量 private Consumer<Set<String>> clusterViewChangedHandler;HAManager 的构造方法
clusterManager.nodeListener(new NodeListener() { @Override public void nodeAdded(String nodeID) { HAManager.this.nodeAdded(nodeID); } @Override public void nodeLeft(String leftNodeID) { HAManager.this.nodeLeft(leftNodeID); }});
存在 nodeAdded 和 nodeLeft的监听。clusterManager 是接口,具体还要看由实现类。
private void doDeployVerticle(final String verticleName, DeploymentOptions deploymentOptions, final Handler这个方法一开始看了让人蒙圈,没有搞明白前面的asyncResult怎么就succeeded了。其实,方法前部所有的代码都是为了定义一个wrappedHandler ,定义完了在方法最后一句传到deploymentManager.deployVerticle里。跟踪代码,在若干方法栈后才有对这个wrappedHandler.handle()方法调用:> doneHandler) { final Handler > wrappedHandler = asyncResult -> { if (asyncResult.succeeded()) { // Tell the other nodes of the cluster about the verticle for HA purposes addToHA(asyncResult.result(), verticleName, deploymentOptions); } if (doneHandler != null) { doneHandler.handle(asyncResult); } else if (asyncResult.failed()) { log.error("Failed to deploy verticle", asyncResult.cause()); } }; deploymentManager.deployVerticle(verticleName, deploymentOptions, wrappedHandler);}
completionHandler.handle(result);
所以,额外说句,这个方法的名字其实有问题。一般名为doX的方法,都是处理最底层的X业务,而这里其实是交付给了deploymentManager去做。
private String chooseHashedNode(String group, int hashCode)
这个方法会计算出重新部署的节点,然后每个存活的节点都调用,判断结果是不是节点自己。
String chosen = chooseHashedNode(group, failedNodeID.hashCode());if (chosen != null && chosen.equals(this.nodeID)) {
如果是自己,才继续恢复流程,调用核心业务方法。如果不是自己,没有事情发生。
private void processFailover(JsonObject failedVerticle)
processFailover()使用了 CountDownLatch类 处理线程阻塞和超时处理逻辑。
别的方法都很简单的,看名字就知道内容。