`
iwinit
  • 浏览: 452439 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

[HBase]Region assignment

阅读更多

 

接上文,我们创建表t1,列族c1,hbase.root目录为/new。当创建空表时,系统会自动生成一个空region,我们以这个region分配过程看下Region是如何在HMaster和Region server(以下简称rs)中创建的。大致过程如下:

1.HMaster指定分配计划,一个region只会分配给一个rs,多个rs均匀分配

2.多个rs并发执行assiagnment操作

3.先在zk的/hbase/assiangment目录下创建region节点,状态为‘offline’

4.RPC对应rs,请求分配region

5.master端开始等待所有region都被分配,通过zk的节点状态通信

6.rs端收到请求,执行异步OpenRegion操作

7.rs先把zk节点状态改为'opening'

8.rs执行open region操作,并初始化region,主要是创建region的HDFS目录,初始化Store

9.rs修改meta表中region对应的记录信息

10.rs修改zk节点中的状态为'opened'

11.master收到'opened'信息,认为该region已经assiagnment成功

12.所有region都成功后,master认为region批量创建成功

大概类图 

在HMaster端提供了BulkAssigner,用来批量分配region,默认采用随即均匀分配,分配过程是一个rpc调用

 

public boolean bulkAssign(boolean sync) throws InterruptedException,
      IOException {
    boolean result = false;
    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
    builder.setDaemon(true);
    builder.setNameFormat(getThreadNamePrefix() + "-%1$d");
    builder.setUncaughtExceptionHandler(getUncaughtExceptionHandler());
    int threadCount = getThreadCount();
    java.util.concurrent.ExecutorService pool =
      Executors.newFixedThreadPool(threadCount, builder.build());
    try {
	//提交任务,任务为SingleServerBulkAssigner
      populatePool(pool);
      // How long to wait on empty regions-in-transition.  If we timeout, the
      // RIT monitor should do fixup.
	//等待
      if (sync) result = waitUntilDone(getTimeoutOnRIT());
    } finally {
      // We're done with the pool.  It'll exit when its done all in queue.
      pool.shutdown();
    }
    return result;
  }

等待过程

 

 

  boolean waitUntilNoRegionsInTransition(final long timeout, Set<HRegionInfo> regions)
  throws InterruptedException {
    // Blocks until there are no regions in transition.
	//如果带处理的region有一个还在事务列表中,则继续等
	//超时时间由hbase.bulk.assignment.waiton.empty.rit设置,默认5分钟
    long startTime = System.currentTimeMillis();
    long remaining = timeout;
    boolean stillInTransition = true;
    synchronized (regionsInTransition) {
      while (regionsInTransition.size() > 0 && !this.master.isStopped() &&
          remaining > 0 && stillInTransition) {
        int count = 0;
        for (RegionState rs : regionsInTransition.values()) {
          if (regions.contains(rs.getRegion())) {
            count++;
            break;
          }
        }
        if (count == 0) {
          stillInTransition = false;
          break;
        }
        regionsInTransition.wait(remaining);
        remaining = timeout - (System.currentTimeMillis() - startTime);
      }
    }
    return stillInTransition;
  }

 AssignmentManager提供了assign(final ServerName destination,final List<HRegionInfo> regions)给每个rs批量assign region

 

 

void assign(final ServerName destination,
      final List<HRegionInfo> regions) {
    ....
	//强制初始化状态为offline
    List<RegionState> states = new ArrayList<RegionState>(regions.size());
    synchronized (this.regionsInTransition) {
      for (HRegionInfo region: regions) {
        states.add(forceRegionStateToOffline(region));
      }
    }
    .....
    
    // Presumption is that only this thread will be updating the state at this
    // time; i.e. handlers on backend won't be trying to set it to OPEN, etc.
	//给每个带分配的region创建zk的节点,目录为/hbase/unassigned,并初始化状态为offline。
	//节点创建成功后,在callback中调用zk的exist,设置watcher,在exist操作的callback中将region的状态设为‘PENDING_OPEN’,递增counter
	//所有region都需要设置成功
    AtomicInteger counter = new AtomicInteger(0);
    CreateUnassignedAsyncCallback cb =
      new CreateUnassignedAsyncCallback(this.watcher, destination, counter);
    for (RegionState state: states) {
      if (!asyncSetOfflineInZooKeeper(state, destination, cb, state)) {
        return;
      }
    }
    // Wait until all unassigned nodes have been put up and watchers set.
    int total = regions.size();
    for (int oldCounter = 0; true;) {
      int count = counter.get();
      if (oldCounter != count) {
        LOG.info(destination.toString() + " unassigned znodes=" + count +
          " of total=" + total);
        oldCounter = count;
      }
      if (count == total) break;
      Threads.sleep(1);
    }
    // Move on to open regions.
    try {
      // Send OPEN RPC. If it fails on a IOE or RemoteException, the
      // TimeoutMonitor will pick up the pieces.
	//发送RPC请求给rs,如果rpc失败,可重试,最大超时时间60s
      long maxWaitTime = System.currentTimeMillis() +
        this.master.getConfiguration().
          getLong("hbase.regionserver.rpc.startup.waittime", 60000);
      while (!this.master.isStopped()) {
        try {
          this.serverManager.sendRegionOpen(destination, regions);
          break;
        } catch (RemoteException e) {
          IOException decodedException = e.unwrapRemoteException();
          if (decodedException instanceof RegionServerStoppedException) {
            LOG.warn("The region server was shut down, ", decodedException);
            // No need to retry, the region server is a goner.
            return;
          } else if (decodedException instanceof ServerNotRunningYetException) {
            // This is the one exception to retry.  For all else we should just fail
            // the startup.
            long now = System.currentTimeMillis();
            if (now > maxWaitTime) throw e;
            LOG.debug("Server is not yet up; waiting up to " +
                (maxWaitTime - now) + "ms", e);
            Thread.sleep(1000);
          }

          throw decodedException;
        }
      }
    } 
	.......
  }

 rs的RPC接口HRegionInterface.openRegions(final List<HRegionInfo> regions),rs初始化region,并通过zk状态告知master是否成功,这是一个异步过程。

 

用户表open region为OpenRegionHandler,处理

 

public void process() throws IOException {
    try {
     .....

      // If fails, just return.  Someone stole the region from under us.
      // Calling transitionZookeeperOfflineToOpening initalizes this.version.
	//将/hbase/unassigned下的节点状态从‘offline’改成‘opening’
      if (!transitionZookeeperOfflineToOpening(encodedName,
          versionOfOfflineNode)) {
        LOG.warn("Region was hijacked? It no longer exists, encodedName=" +
          encodedName);
        return;
      }

      // Open region.  After a successful open, failures in subsequent
      // processing needs to do a close as part of cleanup.
	//执行open操作
      region = openRegion();
      if (region == null) {
        tryTransitionToFailedOpen(regionInfo);
        return;
      }
      boolean failed = true;
	//open成功后,先更新下zk中的节点时间,再修改meta表中的region记录
	//主要是修改meta表中的serverstartcode和server列
      if (tickleOpening("post_region_open")) {
        if (updateMeta(region)) {
          failed = false;
        }
      }
	//如果修改失败,或者进入stop阶段,关闭region,将zk节点状态设为‘FAILED_OPEN’
      if (failed || this.server.isStopped() ||
          this.rsServices.isStopping()) {
        cleanupFailedOpen(region);
        tryTransitionToFailedOpen(regionInfo);
        return;
      }
	//将zk节点状态设为‘OPENED’,如果失败,关闭region
      if (!transitionToOpened(region)) {
        // If we fail to transition to opened, it's because of one of two cases:
        //    (a) we lost our ZK lease
        // OR (b) someone else opened the region before us
        // In either case, we don't need to transition to FAILED_OPEN state.
        // In case (a), the Master will process us as a dead server. In case
        // (b) the region is already being handled elsewhere anyway.
        cleanupFailedOpen(region);
        return;
      }
      // Successful region open, and add it to OnlineRegions
	//添加到online列表
      this.rsServices.addToOnlineRegions(region);

      .....
  }

 Region初始化

 

 

private long initializeRegionInternals(final CancelableProgressable reporter,
      MonitoredTask status) throws IOException, UnsupportedEncodingException {
    .....

    // Write HRI to a file in case we need to recover .META.
    status.setStatus("Writing region info on filesystem");
	//写入.regioninfo文件,内容是HRegionInfo序列化的内容,region的元信息
    checkRegioninfoOnFilesystem();

    // Remove temporary data left over from old regions
    status.setStatus("Cleaning up temporary data from old regions");
	//.tmp目录删除
    cleanupTmpDir();

    // Load in all the HStores.
    //
    // Context: During replay we want to ensure that we do not lose any data. So, we
    // have to be conservative in how we replay logs. For each store, we calculate
    // the maxSeqId up to which the store was flushed. And, skip the edits which
    // is equal to or lower than maxSeqId for each store.
	//每个family启动一个线程加载store
	//等全部store都加载后,取最大的seqId和memstoreTS
    Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
        Bytes.BYTES_COMPARATOR);
    long maxSeqId = -1;
    // initialized to -1 so that we pick up MemstoreTS from column families
    long maxMemstoreTS = -1;

    if (this.htableDescriptor != null &&
        !htableDescriptor.getFamilies().isEmpty()) {
      // initialize the thread pool for opening stores in parallel.
      ThreadPoolExecutor storeOpenerThreadPool =
        getStoreOpenAndCloseThreadPool(
          "StoreOpenerThread-" + this.regionInfo.getRegionNameAsString());
      CompletionService<Store> completionService =
        new ExecutorCompletionService<Store>(storeOpenerThreadPool);

      // initialize each store in parallel
      for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
        status.setStatus("Instantiating store for column family " + family);
        completionService.submit(new Callable<Store>() {
          public Store call() throws IOException {
            return instantiateHStore(tableDir, family);
          }
        });
      }
      try {
        for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
          Future<Store> future = completionService.take();
          Store store = future.get();

          this.stores.put(store.getColumnFamilyName().getBytes(), store);
          long storeSeqId = store.getMaxSequenceId();
          maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
              storeSeqId);
          if (maxSeqId == -1 || storeSeqId > maxSeqId) {
            maxSeqId = storeSeqId;
          }
          long maxStoreMemstoreTS = store.getMaxMemstoreTS();
          if (maxStoreMemstoreTS > maxMemstoreTS) {
            maxMemstoreTS = maxStoreMemstoreTS;
          }
        }
      ......
    }
    mvcc.initialize(maxMemstoreTS + 1);
    // Recover any edits if available.
    maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
        this.regiondir, maxSeqIdInStores, reporter, status));

	.......
 
    this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
    // Use maximum of log sequenceid or that which was found in stores
    // (particularly if no recovered edits, seqid will be -1).
	//递增seqid
    long nextSeqid = maxSeqId + 1;
    ......
    return nextSeqid;
  }

 rs端的处理就是这些,master端通过zk的watcher监听rs端的region状态修改,AssignmentManager的nodeDataChanged方法就是用来处理这个的。

 

 

  public void nodeDataChanged(String path) {
    if(path.startsWith(watcher.assignmentZNode)) {
      try {
        Stat stat = new Stat();
	//当data变化时,获取data,然后再设置watcher,下次继续处理
        RegionTransitionData data = ZKAssign.getDataAndWatch(watcher, path, stat);
        if (data == null) {
          return;
        }
        handleRegion(data, stat.getVersion());
      } catch (KeeperException e) {
        master.abort("Unexpected ZK exception reading unassigned node data", e);
      }
    }
  }

 当rs把region状态设为opening时

 

 

case RS_ZK_REGION_OPENING:
          .....
          // Transition to OPENING (or update stamp if already OPENING)
	//更新时间
          regionState.update(RegionState.State.OPENING,
              data.getStamp(), data.getOrigin());
          break;

 当rs把region状态设为‘opened‘时

 

 

case RS_ZK_REGION_OPENED:
          ......
          // Handle OPENED by removing from transition and deleted zk node
	//内存状态改为open
          regionState.update(RegionState.State.OPEN,
              data.getStamp(), data.getOrigin());
          this.executorService.submit(
            new OpenedRegionHandler(master, this, regionState.getRegion(),
              data.getOrigin(), expectedVersion));
          break;

 OpenedRegionHandler主要是删除之前创建的/hbase/unassigned下的region节点

  public void process() {
    // Code to defend against case where we get SPLIT before region open
    // processing completes; temporary till we make SPLITs go via zk -- 0.92.
    RegionState regionState = this.assignmentManager.isRegionInTransition(regionInfo);
    boolean openedNodeDeleted = false;
    if (regionState != null
        && regionState.getState().equals(RegionState.State.OPEN)) {
      openedNodeDeleted = deleteOpenedNode(expectedVersion);
      if (!openedNodeDeleted) {
        LOG.error("The znode of region " + regionInfo.getRegionNameAsString()
            + " could not be deleted.");
      }
    } 
	......
  }

 节点删除后,又有zk通知,AssignmentManager的nodeDeleted方法

  public void nodeDeleted(final String path) {
    if (path.startsWith(this.watcher.assignmentZNode)) {
      String regionName = ZKAssign.getRegionName(this.master.getZooKeeper(), path);
      RegionState rs = this.regionsInTransition.get(regionName);
      if (rs != null) {
        HRegionInfo regionInfo = rs.getRegion();
        if (rs.isSplit()) {
          LOG.debug("Ephemeral node deleted, regionserver crashed?, " +
            "clearing from RIT; rs=" + rs);
          regionOffline(rs.getRegion());
        } else {
          LOG.debug("The znode of region " + regionInfo.getRegionNameAsString()
              + " has been deleted.");
          if (rs.isOpened()) {
            makeRegionOnline(rs, regionInfo);
          }
        }
      }
    }
  }

 region上线,将region从transition列表中删除,并更新servers和regions列表

  void regionOnline(HRegionInfo regionInfo, ServerName sn) {
    synchronized (this.regionsInTransition) {
      RegionState rs =
        this.regionsInTransition.remove(regionInfo.getEncodedName());
      if (rs != null) {
        this.regionsInTransition.notifyAll();
      }
    }
    synchronized (this.regions) {
      // Add check
      ServerName oldSn = this.regions.get(regionInfo);
      if (oldSn != null && serverManager.isServerOnline(oldSn)) {
        LOG.warn("Overwriting " + regionInfo.getEncodedName() + " on old:"
            + oldSn + " with new:" + sn);
        // remove region from old server
        Set<HRegionInfo> hris = servers.get(oldSn);
        if (hris != null) {
          hris.remove(regionInfo);
        }
      }
      
      if (isServerOnline(sn)) {
        this.regions.put(regionInfo, sn);
        addToServers(sn, regionInfo);
        this.regions.notifyAll();
      } else {
        LOG.info("The server is not in online servers, ServerName=" + 
          sn.getServerName() + ", region=" + regionInfo.getEncodedName());
      }
    }
    // Remove plan if one.
    clearRegionPlan(regionInfo);
    // Add the server to serversInUpdatingTimer
    addToServersInUpdatingTimer(sn);
  }

 

小节

region assignment主要关键点

1.region load balance,默认是随即均匀分配

2.master在/hbase/unassigned下建立region节点,方便后续和rs交互

3.rs初始化region在HDFS上的文件目录,包括.regioninfo文件和family目录

4.rs open region之后,将状态设为’opened‘,master认为region assignment成功,删除节点,并将region保存到online列表

 

  • 大小: 163.7 KB
分享到:
评论

相关推荐

    HBASERegion数量增多问题描述及解决方案.docx

    HBASERegion数量增多问题描述及解决方案.docx

    HBaseRegion自动切分的所有细节都在这里了

    本文来自于36大数据,这篇文章将会对这些细节进行基本的说明,一方面可以让大家对HBase中Region自动切分有更加深入的理解,另一方面如果想实现类似的功能也可以参考HBase的实现方案。Region自动切分是HBase能够拥有...

    Hbase的region合并与拆分

    1、region 拆分机制 ...当region大小大于某个阈值(hbase.hregion.max.filesize=10G)之后就会触发切分,一个region等分为2个region。 但是在生产线上这种切分策略却有相当大的弊端:切分策略对于大表和小表没有

    hbase-region-inspector, HBase区域统计信息的可视化仪表板.zip

    hbase-region-inspector, HBase区域统计信息的可视化仪表板 hbase-region-inspectorHBase区域统计信息的可视化仪表板。 用法下载与HBase集群版本匹配的可执行二进制插件,添加execute权限,并使用以下命令行参数启动...

    hbase-packet-inspector:分析HBase RegionServers的网络流量

    hbase-packet-inspector hbase-packet-inspector (HPI)是用于分析HBase RegionServers网络流量的命令行工具。 HPI读取tcpdump文件或捕获网络接口的实时数据包流,以提取有关客户端请求和响应的信息。 您可以对其...

    HBASE学习分享

    HBASE的主要原理解读:包括HBase 读写逻辑、HBase region拆分和合并

    HbaseTemplate 操作hbase

    java 利用 sping-data-hadoop HbaseTemplate 操作hbase find get execute 等方法 可以直接运行

    pinpoint的hbase初始化脚本hbase-create.hbase

    搭建pinpoint需要的hbase初始化脚本hbase-create.hbase

    java大数据作业_3HBase

    2. 请简述HBase中数据写入最后导致Region分裂的全过程 3. 如果设计一个笔记的表,表中要求有笔记的属性和笔记的内容,怎么做 4. HBase部署时如何指定多个zookeeper 5. HBase shell是基于哪种JVM运行的语言实现的 6. ...

    HBase数据库设计.doc

    1. HBase有哪些基本的特征? 1 HBase特征: 1 2. HBase相对于关系数据库能解决的问题是什么? 2 HBase与关系数据的区别? 2 HBase与RDBMS的区别? 2 3. HBase的数据模式是怎么样的?即有哪些元素?如何存储?等 3 1...

    HBase(hbase-2.4.9-bin.tar.gz)

    HBase(hbase-2.4.9-bin.tar.gz)是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System...

    hbase-sdk是基于hbase-client和hbase-thrift的原生API封装的一款轻量级的HBase ORM框架

    hbase-sdk是基于hbase-client和hbase-thrift的原生API封装的一款轻量级的HBase ORM框架。 针对HBase各版本API(1.x~2.x)间的差异,在其上剥离出了一层统一的抽象。并提供了以类SQL的方式来读写HBase表中的数据。对...

    HBase学习利器:HBase实战

    HBase开发实战,HBase学习利器:HBase实战

    Hbase资源整理集合

    HBase 官方文档.pdf HBase的操作和编程.pdf HBase Cpressr优化与实验 郭磊涛.pdf null【HBase】Data Migratin frm Gri t Clu Cmputing - Natural Sienes .pdf 分布式数据库HBase快照的设计与实现.pdf 【HBase】...

    hbase-2.3.5单机一键部署工具

    注意:zookeeper3.4.13和hbase2.3.5都是采用docker-compose方式部署 原文链接:https://blog.csdn.net/m0_37814112/article/details/120915194 说明:使用外部zookeeper3.4.13之hbase2.3.5一键部署工具,支持部署、...

    HBase开启审计日志

    HBase开启审计日志

    hbase资料api

    HBASE

    HBase3.0参考指南

    HBase3.0参考指南 This is the official reference guide for the HBase version it ships with. Herein you will find either the definitive documentation on an HBase topic as of its standing when the ...

    实验三:熟悉常用的HBase操作

    A.3实验三:熟悉常用的HBase操作 本实验对应第5章的内容。 A.3.1 实验目的 (1)理解HBase在Hadoop体系结构中的角色。(2)熟练使用HBase操作常用的 Shell命令。(3)熟悉HBase操作常用的 Java API。 A.3.2 实验平台 (1...

Global site tag (gtag.js) - Google Analytics