手把手带你撸zookeeper源码-zookeeper确定好角色后会做什么?

悠悠 2023-03-04 14:25 18阅读 0赞

接上文 手把手带你撸zookeeper源码-zookeeper启动(五)leader选举投票归档-确认当前zk服务的角色

上篇文章主要分析了leader选举的最终阶段,根据zk集群的相互投票之后,进行投票归档,然后判断某个zk获得投票数是否大于集群数量的一半,如果未超过一半,则继续下一轮的投票选举,如果有某个zk有超过一半的选票,则leader确定,然后其他zk服务则为follower或者observer

不知道大家有没有搭建过三台zk的集群,如果你是按照myid从小到大依次启动三台服务器,一般情况下都是第二台服务器是会成为leader的,相信大家看了之前的代码分析应该能知道为什么了吧?

接下来还会有很多篇文章来剖析zookeeper集群之间的数据如何基于2pc的方式来进行数据同步,客户端和服务端的会话如何创建和维护、zookeeper内部的数据结构是如何保存的?如何创建、删除临时节点、持久节点、顺序节点,zookeeper的监听回调通知如何实现的,以及zookeeper的故障恢复是如何做的?东西还有很多,我们一一去剖析,一步一个脚印的学习

今天这篇文章我们来分析一下,既然leader已经选举出来了,接下来会做什么呢?

上篇文章我们分析完了FastLeaderElection#lookForLeader这个方法,它返回了当前一个Vote对象,确定了leader、follower、observer的角色,此方法也就结束了。

  1. if (n == null) {
  2. //如果当前服务器是的sid和投票为leader的sid一样
  3. //则设置peerState状态为LEADING,否则要么为FOLLOWING或者OBSERVING
  4. self.setPeerState((proposedLeader == self.getId()) ?
  5. ServerState.LEADING: learningState());
  6. Vote endVote = new Vote(proposedLeader,
  7. proposedZxid,
  8. logicalclock.get(),
  9. proposedEpoch);
  10. leaveInstance(endVote);
  11. return endVote;
  12. }

我们接着看调用这行代码的地方,在QuorumPeer中的run方法中,把上面返回的Vote对象赋值给currentVote,接着会进行下一轮的循环,此时zk的角色都已经确定了,然后会进入到响应的分支当中

我们一个个看,如果peerState == LEADING,则进入如下分支

  1. case LEADING:
  2. LOG.info("LEADING");
  3. try {
  4. setLeader(makeLeader(logFactory));
  5. leader.lead();
  6. setLeader(null);
  7. } catch (Exception e) {
  8. LOG.warn("Unexpected exception",e);
  9. } finally {
  10. if (leader != null) {
  11. leader.shutdown("Forcing shutdown");
  12. setLeader(null);
  13. }
  14. setPeerState(ServerState.LOOKING);
  15. }
  16. break;

我们今天主要分析一下看看leader角色zookeeper主要做了什么,我们一步步来分析

  1. setLeader(makeLeader(logFactory));

这一行代码主要是创建一个Leader对象,并且把创建出来的leader对象赋值给当前leader变量,下面会调用leader.lead(),这个方法很重要,等会分析,我们看看创建Leader对象都干了啥

  1. protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
  2. return new Leader(this, new LeaderZooKeeperServer(logFactory,
  3. this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
  4. }

logFactory这个参数就是解析完zoo.cfg文件之后,把属性封装到了QuorumPeerConfig对象中,这个对象中肯定有我们配置的日志目录,和数据目录,然后根据这两个目录会创建一个FileTxnSnapLog对象传递给QuorumPeer对象,大家可以看一下之前的代码

然后创建了LeaderZookeeperServer对象,在这个方法里面会有一系列的调用链来处理客户端发送过来的请求, 如2PC阶段如何处理的

  1. new ZooKeeperServer.BasicDataTreeBuilder()
  2. static public class BasicDataTreeBuilder implements DataTreeBuilder {
  3. public DataTree build() {
  4. return new DataTree();
  5. }
  6. }

猜想一下,BasicDataTreeBuilder对象提供了一个build()方法,返回了一个DataTree对象,这个DataTree肯定就是zookeeper内纯的数据结构对象,然后会在某个地方会调用这个build方法来创建一个DataTree内纯对象, 然后客户端发送增删改查节点的时候肯定就是操作的这个数据结构,是不是这样的,我们之后再具体分析

最后创建一个Leader对象

  1. Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
  2. this.self = self;
  3. this.proposalStats = new ProposalStats();
  4. if (self.getQuorumListenOnAllIPs()) {
  5. ss = new ServerSocket(self.getQuorumAddress().getPort());
  6. } else {
  7. ss = new ServerSocket();
  8. }
  9. ss.setReuseAddress(true);
  10. if (!self.getQuorumListenOnAllIPs()) {
  11. ss.bind(self.getQuorumAddress());
  12. }
  13. this.zk=zk;
  14. }

在我们的配置文件中有server.x=zk01:2888:3888这样的配置,在这段代码中,就是对当前服务器的2888端口进行监听,等待其他follower和当前leader进行连接

创建完Leader对象之后,最终会调用leader.lead()方法

  1. void lead() throws IOException, InterruptedException {
  2. self.end_fle = Time.currentElapsedTime();
  3. //记住leader选举的时间差
  4. long electionTimeTaken = self.end_fle - self.start_fle;
  5. self.setElectionTimeTaken(electionTimeTaken);
  6. LOG.info("LEADING - LEADER ELECTION TOOK - {}", electionTimeTaken);
  7. self.start_fle = 0;
  8. self.end_fle = 0;
  9. //JMX监控
  10. zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
  11. try {
  12. self.tick.set(0);
  13. //加载硬盘上的数据到内存中
  14. zk.loadData();
  15. leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
  16. // Start thread that waits for connection requests from
  17. // new followers.
  18. cnxAcceptor = new LearnerCnxAcceptor();
  19. cnxAcceptor.start();
  20. readyToStart = true;
  21. long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
  22. zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
  23. synchronized(this){
  24. lastProposed = zk.getZxid();
  25. }
  26. newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
  27. null, null);
  28. if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
  29. LOG.info("NEWLEADER proposal has Zxid of "
  30. + Long.toHexString(newLeaderProposal.packet.getZxid()));
  31. }
  32. waitForEpochAck(self.getId(), leaderStateSummary);
  33. self.setCurrentEpoch(epoch);
  34. try {
  35. waitForNewLeaderAck(self.getId(), zk.getZxid());
  36. } catch (InterruptedException e) {
  37. shutdown("Waiting for a quorum of followers, only synced with sids: [ "
  38. + getSidSetString(newLeaderProposal.ackSet) + " ]");
  39. HashSet<Long> followerSet = new HashSet<Long>();
  40. for (LearnerHandler f : learners)
  41. followerSet.add(f.getSid());
  42. if (self.getQuorumVerifier().containsQuorum(followerSet)) {
  43. LOG.warn("Enough followers present. "
  44. + "Perhaps the initTicks need to be increased.");
  45. }
  46. Thread.sleep(self.tickTime);
  47. self.tick.incrementAndGet();
  48. return;
  49. }
  50. startZkServer();
  51. String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
  52. if (initialZxid != null) {
  53. long zxid = Long.parseLong(initialZxid);
  54. zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
  55. }
  56. if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
  57. self.cnxnFactory.setZooKeeperServer(zk);
  58. }
  59. boolean tickSkip = true;
  60. while (true) {
  61. Thread.sleep(self.tickTime / 2);
  62. if (!tickSkip) {
  63. self.tick.incrementAndGet();
  64. }
  65. HashSet<Long> syncedSet = new HashSet<Long>();
  66. // lock on the followers when we use it.
  67. syncedSet.add(self.getId());
  68. for (LearnerHandler f : getLearners()) {
  69. if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
  70. syncedSet.add(f.getSid());
  71. }
  72. f.ping();
  73. }
  74. // check leader running status
  75. if (!this.isRunning()) {
  76. shutdown("Unexpected internal error");
  77. return;
  78. }
  79. if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
  80. return;
  81. }
  82. tickSkip = !tickSkip;
  83. }
  84. } finally {
  85. zk.unregisterJMX(this);
  86. }
  87. }

先分析第一个关键点

  1. cnxAcceptor = new LearnerCnxAcceptor();
  2. cnxAcceptor.start();

LeanerCnxAcceptor是一个线程,这里开启一个线程,用来监听其他follower来和当前leader进行连接的,我们看一下LeanerCnxAcceptor.run方法

  1. @Override
  2. public void run() {
  3. try {
  4. while (!stop) {
  5. try{
  6. Socket s = ss.accept();
  7. s.setSoTimeout(self.tickTime * self.initLimit);
  8. s.setTcpNoDelay(nodelay);
  9. BufferedInputStream is = new BufferedInputStream(
  10. s.getInputStream());
  11. LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
  12. fh.start();
  13. } catch (SocketException e) {
  14. if (stop) {
  15. stop = true;
  16. } else {
  17. throw e;
  18. }
  19. } catch (SaslException e){
  20. LOG.error("Exception while connecting to quorum learner", e);
  21. }
  22. }
  23. } catch (Exception e) {
  24. LOG.warn("Exception while accepting follower", e);
  25. }
  26. }

这里通过ServerSocket.accept()进行阻塞等待其他follower进行连接,如果有连接进来之后则会创建一个LearnerHandler对象,它也是一个线程,然后把socket交给LearnerHandler,由它来处理和连接进来的socket进行通信处理,这里也是用的阻塞bio,每当有一个follower或者observer进来连接时,都会创建一个单独线程去处理连接以及发送的数据

我们可以进入到LearnerHandler.run方法中去看看,现在有某个follower连接进来了,然后其对应的socket交给了LearnerHandler,接下来就是读取socket中的数据

先截取一部分代码看一下

  1. leader.addLearnerHandler(this);
  2. tickOfNextAckDeadline = leader.self.tick.get()
  3. + leader.self.initLimit + leader.self.syncLimit;
  4. ia = BinaryInputArchive.getArchive(bufferedInput);
  5. bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
  6. oa = BinaryOutputArchive.getArchive(bufferedOutput);
  7. QuorumPacket qp = new QuorumPacket();
  8. ia.readRecord(qp, "packet");//读取follower发送过来的注册数据包
  9. if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
  10. LOG.error("First packet " + qp.toString()
  11. + " is not FOLLOWERINFO or OBSERVERINFO!");
  12. return;
  13. }

这块代码中用到了jute序列化协议,从输入流中读取数据,然后对数据进行反序列化,Jute序列化协议是zookeeper内部封装的一个序列化协议,然后通过特定的格式进行数据传输,从而解决传输过程当中可能出现的粘包拆包问题

OK,做个总结,今天大概梳理了一下,当前zk如果是leader时,会创建一个Leader对象,在创建Leader对象时会创建一个LeaderZookeeperServer,这个对象里面封装了一系列的RequestProcessor调用链,还有一个BasicDataTreeBuilder,其中的build方法返回一个DataTree对象。最后创建Leader对象时会创建ServerSocket然后监听端口等待其他Follower或者Observer来进行连接。所有的连接都会交给一个线程LearnerHandler进行处理,然后读取follower发送过来的数据,通过jute进行序列化反序列化

每篇文章不用太长,大家把一个点学会学透,每天进步一点点

下篇文章先讲一下Jute序列化协议是如何进行序列化反序列化的,以及它的格式是什么样的,怎么解决粘包拆包的问题的

发表评论

表情:
评论列表 (有 0 条评论,18人围观)

还没有评论,来说两句吧...

相关阅读