hbase.mapreduce Description from JavaDoc

From org.apache.hadoop.hbase.mapreduce HBase 0.90.4 API.

Package org.apache.hadoop.hbase.mapreduce Description

Provides HBase MapReduce Input/OutputFormats, a table indexing MapReduce job, and utility

Table of Contents

HBase, MapReduce and the CLASSPATH

MapReduce jobs deployed to a MapReduce cluster do not by default have access to the HBase configuration under $HBASE_CONF_DIR nor to HBase classes. You could add hbase-site.xml to$HADOOP_HOME/conf and add HBase jars to the $HADOOP_HOME/lib and copy these changes across your cluster (or edit conf/hadoop-env.sh and add them to the HADOOP_CLASSPATH variable) but this will pollute your hadoop install with HBase references; its also obnoxious requiring restart of the hadoop cluster before it’ll notice your HBase additions.

As of 0.90.x, HBase will just add its dependency jars to the job configuration; the dependencies just need to be available on the local CLASSPATH. For example, to run the bundled HBaseRowCounter mapreduce job against a table named usertable, type:

$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-0.90.0.jar rowcounter usertable

Expand $HBASE_HOME and $HADOOP_HOME in the above appropriately to suit your local environment. The content of HADOOP_CLASSPATH is set to the HBase CLASSPATH via backticking the command${HBASE_HOME}/bin/hbase classpath.

When the above runs, internally, the HBase jar finds its zookeeper and guava, etc., dependencies on the passed HADOOP_CLASSPATH and adds the found jars to the mapreduce job configuration. See the source at TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job) for how this is done.

The above may not work if you are running your HBase from its build directory; i.e. you’ve done $ mvn test install at ${HBASE_HOME} and you are now trying to use this build in your mapreduce job. If you get

java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper
...

exception thrown, try doing the following:

$ HADOOP_CLASSPATH=${HBASE_HOME}/target/hbase-0.90.0-SNAPSHOT.jar:`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/target/hbase-0.90.0-SNAPSHOT.jar rowcounter usertable

Notice how we preface the backtick invocation setting HADOOP_CLASSPATH with reference to the built HBase jar over in the target directory.

 

Bundled HBase MapReduce Jobs

The HBase jar also serves as a Driver for some bundled mapreduce jobs. To learn about the bundled mapreduce jobs run:

$ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-0.90.0-SNAPSHOT.jar
An example program must be given as the first argument.
Valid program names are:
  copytable: Export a table from local cluster to peer cluster
  completebulkload: Complete a bulk data load.
  export: Write table data to HDFS.
  import: Import data written by Export.
  importtsv: Import data in TSV format.
  rowcounter: Count rows in HBase table

HBase as MapReduce job data source and sink

HBase can be used as a data source, TableInputFormat, and data sink, TableOutputFormat or MultiTableOutputFormat, for MapReduce jobs. Writing MapReduce jobs that read or write HBase, you’ll probably want to subclass TableMapper and/or TableReducer. See the do-nothing pass-through classes IdentityTableMapper and IdentityTableReducer for basic usage. For a more involved example, seeRowCounter or review the org.apache.hadoop.hbase.mapreduce.TestTableMapReduce unit test.

Running mapreduce jobs that have HBase as source or sink, you’ll need to specify source/sink table and column names in your configuration.

Reading from HBase, the TableInputFormat asks HBase for the list of regions and makes a map-per-region or mapred.map.tasks maps, whichever is smaller (If your job only has two maps, up mapred.map.tasks to a number > number of regions). Maps will run on the adjacent TaskTracker if you are running a TaskTracer and RegionServer per node. Writing, it may make sense to avoid the reduce step and write yourself back into HBase from inside your map. You’d do this when your job does not need the sort and collation that mapreduce does on the map emitted data; on insert, HBase ‘sorts’ so there is no point double-sorting (and shuffling data around your mapreduce cluster) unless you need to. If you do not need the reduce, you might just have your map emit counts of records processed just so the framework’s report at the end of your job has meaning or set the number of reduces to zero and use TableOutputFormat. See example code below. If running the reduce step makes sense in your case, its usually better to have lots of reducers so load is spread across the HBase cluster.

There is also a new HBase partitioner that will run as many reducers as currently existing regions. The HRegionPartitioner is suitable when your table is large and your upload is not such that it will greatly alter the number of existing regions when done; otherwise use the default partitioner.

Bulk import writing HFiles directly

If importing into a new table, its possible to by-pass the HBase API and write your content directly to the filesystem properly formatted as HBase data files (HFiles). Your import will run faster, perhaps an order of magnitude faster if not more. For more on how this mechanism works, see Bulk Loads documentation.

Example Code

Sample Row Counter

See RowCounter. This job uses TableInputFormat and does a count of all rows in specified table. You should be able to run it by doing: % ./bin/hadoop jar hbase-X.X.X.jar. This will invoke the hbase MapReduce Driver class. Select ‘rowcounter’ from the choice of jobs offered. This will emit rowcouner ‘usage’. Specify tablename, column to count and output directory. You may need to add the hbase conf directory to $HADOOP_HOME/conf/hadoop-env.sh#HADOOP_CLASSPATH so the rowcounter gets pointed at the right hbase cluster (or, build a new jar with an appropriate hbase-site.xml built into your job jar).

HbaseBook:Chapter 10. Architecture

From The Apache Book.

Chapter 10. Architecture

10.1. Client

The HBase client HTable is responsible for finding RegionServers that are serving the particular row range of interest. It does this by querying the .META.and -ROOT- catalog tables (TODO: Explain). After locating the required region(s), the client directly contacts the RegionServer serving that region (i.e., it does not go through the master) and issues the read or write request. This information is cached in the client so that subsequent requests need not go through the lookup process. Should a region be reassigned either by the master load balancer or because a RegionServer has died, the client will requery the catalog tables to determine the new location of the user region.

Administrative functions are handled through HBaseAdmin

10.1.1. Connections

For connection configuration information, see Section 2.6.4, “Client configuration and dependencies connecting to an HBase cluster”.

HTable instances are not thread-safe. When creating HTable instances, it is advisable to use the same HBaseConfiguration instance. This will ensure sharing of ZooKeeper and socket instances to the RegionServers which is usually what you want. For example, this is preferred:

HBaseConfiguration conf = HBaseConfiguration.create();
HTable table1 = new HTable(conf, "myTable");
HTable table2 = new HTable(conf, "myTable");

as opposed to this:

HBaseConfiguration conf1 = HBaseConfiguration.create();
HTable table1 = new HTable(conf1, "myTable");
HBaseConfiguration conf2 = HBaseConfiguration.create();
HTable table2 = new HTable(conf2, "myTable");

For more information about how connections are handled in the HBase client, see HConnectionManager.

10.1.1.1. Connection Pooling

For applications which require high-end multithreaded access (e.g., web-servers or application servers that may serve many application threads in a single JVM), see HTablePool.

10.1.2. WriteBuffer and Batch Methods

If Section 11.5.4, “HBase Client: AutoFlush” is turned off on HTablePuts are sent to RegionServers when the writebuffer is filled. The writebuffer is 2MB by default. Before an HTable instance is discarded, either close() or flushCommits() should be invoked so Puts will not be lost.

Note: htable.delete(Delete); does not go in the writebuffer! This only applies to Puts.

For additional information on write durability, review the ACID semantics page.

For fine-grained control of batching of Puts or Deletes, see the batch methods on HTable.

10.1.3. Filters

Get and Scan instances can be optionally configured with filters which are applied on the RegionServer.

10.2. Daemons

10.2.1. Master

HMaster is the implementation of the Master Server. The Master server is responsible for monitoring all RegionServer instances in the cluster, and is the interface for all metadata changes.

10.2.1.1. Startup Behavior

If run in a multi-Master environment, all Masters compete to run the cluster. If the active Master loses it’s lease in ZooKeeper (or the Master shuts down), then then the remaining Masters jostle to take over the Master role.

10.2.1.2. Interface

The methods exposed by HMasterInterface are primarily metadata-oriented methods:

  • Table (createTable, modifyTable, removeTable, enable, disable)
  • ColumnFamily (addColumn, modifyColumn, removeColumn)
  • Region (move, assign, unassign)

For example, when the HBaseAdmin method disableTable is invoked, it is serviced by the Master server.

10.2.1.3. Processes

The Master runs several background threads:

  • LoadBalancer periodically reassign regions in the cluster.
  • CatalogJanitor periodically checks and cleans up the .META. table.

 

10.2.2. RegionServer

HRegionServer is the RegionServer implementation. It is responsible for serving and managing regions.

10.2.2.1. Interface

The methods exposed by HRegionRegionInterface contain both data-oriented and region-maintenance methods:

  • Data (get, put, delete, next, etc.)
  • Region (splitRegion, compactRegion, etc.)

For example, when the HBaseAdmin method majorCompact is invoked on a table, the client is actually iterating through all regions for the specified table and requesting a major compaction directly to each region.

10.2.2.2. Processes

The RegionServer runs a variety of background threads:

  • CompactSplitThread checks for splits and handle minor compactions.
  • MajorCompactionChecker checks for major compactions.
  • MemStoreFlusher periodically flushes in-memory writes in the MemStore to StoreFiles.
  • LogRoller periodically checks the RegionServer’s HLog.

10.3. Regions

This chapter is all about Regions.

Note

Regions are comprised of a Store per Column Family.

10.3.1. Region Size

Region size is one of those tricky things, there are a few factors to consider:

  • Regions are the basic element of availability and distribution.
  • HBase scales by having regions across many servers. Thus if you have 2 regions for 16GB data, on a 20 node machine you are a net loss there.
  • High region count has been known to make things slow, this is getting better, but it is probably better to have 700 regions than 3000 for the same amount of data.
  • Low region count prevents parallel scalability as per point #2. This really cant be stressed enough, since a common problem is loading 200MB data into HBase then wondering why your awesome 10 node cluster is mostly idle.
  • There is not much memory footprint difference between 1 region and 10 in terms of indexes, etc, held by the RegionServer.

Its probably best to stick to the default, perhaps going smaller for hot tables (or manually split hot regions to spread the load over the cluster), or go with a 1GB region size if your cell sizes tend to be largish (100k and up).

10.3.2. Region Splits

Splits run unaided on the RegionServer; i.e. the Master does not participate. The RegionServer splits a region, offlines the split region and then adds the daughter regions to META, opens daughters on the parent’s hosting RegionServer and then reports the split to the Master. See Section 2.8.2.7, “Managed Splitting” for how to manually manage splits (and for why you might do this)

10.3.3. Region Load Balancer

Periodically, and when there are not any regions in transition, a load balancer will run and move regions around to balance cluster load. The period at which it runs can be configured.

10.3.4. Store

A Store hosts a MemStore and 0 or more StoreFiles (HFiles). A Store corresponds to a column family for a table for a given region.

10.3.4.1. MemStore

The MemStore holds in-memory modifications to the Store. Modifications are KeyValues. When asked to flush, current memstore is moved to snapshot and is cleared. HBase continues to serve edits out of new memstore and backing snapshot until flusher reports in that the flush succeeded. At this point the snapshot is let go.

10.3.4.2. StoreFile (HFile)

10.3.4.2.1. HFile Format

The hfile file format is based on the SSTable file described in the BigTable [2006] paper and on Hadoop’s tfile (The unit test suite and the compression harness were taken directly from tfile). Schubert Zhang’s blog post on HFile: A Block-Indexed File Format to Store Sorted Key-Value Pairs makes for a thorough introduction to HBase’s hfile. Matteo Bertozzi has also put up a helpful description, HBase I/O: HFile.

10.3.4.2.2. HFile Tool

To view a textualized version of hfile content, you can do use the org.apache.hadoop.hbase.io.hfile.HFile tool. Type the following to see usage:

$ ${HBASE_HOME}/bin/hbase org.apache.hadoop.hbase.io.hfile.HFile 

For example, to view the content of the file hdfs://10.81.47.41:9000/hbase/TEST/1418428042/DSMP/4759508618286845475, type the following:

 $ ${HBASE_HOME}/bin/hbase org.apache.hadoop.hbase.io.hfile.HFile -v -f hdfs://10.81.47.41:9000/hbase/TEST/1418428042/DSMP/4759508618286845475 

If you leave off the option -v to see just a summary on the hfile. See usage for other things to do with the HFile tool.

10.3.4.3. Compaction

There are two types of compactions: minor and major. Minor compactions will usually pick up a couple of the smaller adjacent files and rewrite them as one. Minors do not drop deletes or expired cells, only major compactions do this. Sometimes a minor compaction will pick up all the files in the store and in this case it actually promotes itself to being a major compaction. For a description of how a minor compaction picks files to compact, see the ascii diagram in the Store source code.

After a major compaction runs there will be a single storefile per store, and this will help performance usually. Caution: major compactions rewrite all of the stores data and on a loaded system, this may not be tenable; major compactions will usually have to be done manually on large systems. SeeSection 2.8.2.8, “Managed Compactions”.

10.3.5. Block Cache

The Block Cache contains three levels of block priority to allow for scan-resistance and in-memory ColumnFamilies. A block is added with an in-memory flag if the containing ColumnFamily is defined in-memory, otherwise a block becomes a single access priority. Once a block is accessed again, it changes to multiple access. This is used to prevent scans from thrashing the cache, adding a least-frequently-used element to the eviction algorithm. Blocks from in-memory ColumnFamilies are the last to be evicted.

For more information, see the LruBlockCache source

10.4. Write Ahead Log (WAL)

10.4.1. Purpose

Each RegionServer adds updates (Puts, Deletes) to its write-ahead log (WAL) first, and then to the Section 10.3.4.1, “MemStore” for the affectedSection 10.3.4, “Store”. This ensures that HBase has durable writes. Without WAL, there is the possibility of data loss in the case of a RegionServer failure before each MemStore is flushed and new StoreFiles are written. HLog is the HBase WAL implementation, and there is one HLog instance per RegionServer.

The WAL is in HDFS in /hbase/.logs/ with subdirectories per region.For more general information about the concept of write ahead logs, see the Wikipedia Write-Ahead Log article.

10.4.2. WAL Flushing

TODO (describe).

10.4.3. WAL Splitting

10.4.3.1. How edits are recovered from a crashed RegionServer

When a RegionServer crashes, it will lose its ephemeral lease in ZooKeeper…TODO

10.4.3.2. hbase.hlog.split.skip.errors

When set to true, the default, any error encountered splitting will be logged, the problematic WAL will be moved into the .corrupt directory under the hbaserootdir, and processing will continue. If set to false, the exception will be propagated and the split logged as failed.[19]

10.4.3.3. How EOFExceptions are treated when splitting a crashed RegionServers’ WALs

If we get an EOF while splitting logs, we proceed with the split even when hbase.hlog.split.skip.errors == false. An EOF while reading the last log in the set of files to split is near-guaranteed since the RegionServer likely crashed mid-write of a record. But we’ll continue even if we got an EOF reading other than the last file in the set.[20]