在前面我已经详细的讲到了一些NameNode的后台工作线程,但是关于PendingReplicationBlocks$PendingReplicationMonitor和FSNamesystem$ReplicationMonitor,它们俩实在是让我很纠结,因为我真的不知道应该先讲哪一个。经过好几天的思量之后。还是决定先讲一讲PendingReplicationMonitor吧!难过

         先来看看PendingReplicationMonitor在NameNode中是干什么的吧,但恰恰是这儿让我头疼的地方!还是先简单的说说ReplicationMonitor,NameNode节点用它来定期的检测数据块Block是否满足设定的副本数量,对于没有达到副本数量的Blocks,NameNode会命令响应的DataNode节点去copy它,使它满足副本数量,DataNode复制Block是需要时间的,同时DataNode在复制Block的时候可能会出现意外的情况而失败,那么NameNode是如何发现DataNode节点复制Block的副本失败的呢?这就需要一种检查机制,也就是PendingReplicationMonitor要干的事情了。好了,让源代码来说话吧。先看看PendingReplicationBlocks类:

pendingReplications:存放正在被数据节点copy副本的Blocks,key是block,value是副本数量

timeOutItems:存放数据节点copy副本超时的Blocks(copy失败,需要重新安排数据节点copy副本)

timerThread:PendingReplicationMonitor监控线程

timeout:数据节点copy副本的超时时间(由配置文件的dfs.replication.pending.timeout.sec项设置(单位秒))

defaultRecheckInterval:一次检测所有正在copy的Block是否超时的间隔时间

      当NameNode节点安排数据节点copy Block的副本时,会把该Block放入pendingReplications中,当然还有copy数量;当数据节点完成Block的接受或者是copy时,会调用远程方法blocksReceive,在blocksReceive方法里面,会删除pendingReplications中与block相关的一些信息。下面来具体看看PendingReplicationMonitor

class PendingReplicationMonitor implements Runnable {

      
    public void run() {
      while (fsRunning) {
        long period = Math.min(defaultRecheckInterval, timeout);
        try {
           //检测正在copy的block副本是否copy失败
          pendingReplicationCheck();
          
          Thread.sleep(period);
        } catch (InterruptedException ie) {
          FSNamesystem.LOG.debug( "PendingReplicationMonitor thread received exception. " + ie);
        }
      }
    }
    /**
     * Iterate through all items and detect timed-out items
     */
    void pendingReplicationCheck() {
      synchronized (pendingReplications) {
        Iterator iter = pendingReplications.entrySet().iterator();
        long now = FSNamesystem.now();
        while (iter.hasNext()) {
          Map.Entry entry = (Map.Entry) iter.next();
          PendingBlockInfo pendingBlock = (PendingBlockInfo) entry.getValue();
          if (now > pendingBlock.getTimeStamp() + timeout) {
            Block block = (Block) entry.getKey();
            synchronized (timedOutItems) {
              timedOutItems.add(block);
            }
            FSNamesystem.LOG.warn("PendingReplicationMonitor timed out block " + block);
            
            iter.remove();
          }
        }
      }
    }
  }
从上面的代码可以看出,PendingReplicationMonitor线程会把copy失败的Blocks放到timedOutItems中,那么对于这些Blocks,NameNode节点是如何处理的呢?实际上他们会被ReplicationMonitor再次借交给其它的数据节点备份,就这么简单。