Hadoop MapReduce Connector to Teradata EDW

Hadoop MapReduce Connector to Teradata EDW | Teradata Developer Exchange.

Hadoop MapReduce programmers often find that it is more convenient and productive to have direct access from their MapReduce programs to data stored in a RDBMS such as Teradata Enterprise Data Warehouse (EDW) because:

  1. There is no benefit to exporting relational data into a flat file.
  2. There is no need to upload the file into the Hadoop Distributed File System (HDFS).
  3. There is no need to change and rerun the scripts/commands in the first two steps when they need to use different tables/columns in their MapReduce programs.

1. Introduction

Connectivity between a Java or other language program to JDBC and ODBC is well understood by most programmers.  But when dealing with Hadoop and MPP databases, two factors outside the domain of connectors become crucial: scale and balance.  Hadoop and MPP databases often run into the dozens or hundreds of server nodes, consuming 10s or hundreds of terabytes per execution. For the shared-nothing architecture to perform at maximum throughput, the processing workload and data must be partitioned across execution threads.  Otherwise, one server will have an inordinate amount of work compared to others, causing the total elapsed time to be slower. Consequently, a Hadoop connector is needed to support parallel efficiency.

In this document we first describe how MapReduce programs (a.k.a. mappers) can have parallel access to the Teradata EDW data using the TeradataDBInputFormatapproach discussed in Section 2. In Section 3, we provide a complete example with accompanying source code which can be directly used to access the EDW without any changes required in the data warehouse or Hadoop. The TeradataDBInputFormat class can be directly used by programmers without any changes for many applications. Overall, readers can get the following out of this article:

  1. Architecturally how MapReduce programs get direct and parallel access to the Teradata EDW.
  2. How TeradataDBInputFormat class can be used without changes to Hadoop or the EDW. Step-by-step installation and deployment is included in the example.
  3. How programmers can extend the TeradataDBInputFormat approach for specific application needs.

2. The TeradataDBInputFormat approach

A common approach for a MapReduce program to access relational data is to first use the DBMS export utility to pass SQL answer sets to a local file and then load the local file to Hadoop.

However, there are several use cases where the export and load into HDFS is inconvenient for the programmer. Recognizing the need to access relational data in MapReduce programs, the Apache.org open source project for Hadoop provides the DBInputFormat class library.  The DBInputFormat and DBOutputFormat Java class libraries allow MapReduce programs to send SQL queries through the standard JDBC interface to the EDW in parallel. Teradata provides a version of DBInputFormat [3] that will be part of the Cloudera Distribution for Hadoop. Note that Cloudera has a good explanation of DBInputFormat on their website. The TeradataDBInputFormat approach is inspired by but not based on the Apache DBInputFormat approach.

DBInputFormat and DBOutputFormat along with their Teradata versions are good interfaces for ad hoc medium or small volume data transfers.   They make it easy to copy tables from the EDW into HDFS and vice versa.  One good use of these interfaces is when a mapper program needs to do table look-ups but has no need to persist the data fetched.  These interfaces are not efficient for high volume data transfers where bulk data movement tools like Teradata Parallel Transporter are more appropriate.   In many cases queries and bulk data movement is better optimized inside the database itself.   While it’s an oversimplification, think of the input and output format class libraries similar to workloads processed by BTEQ.  They are very flexible and useful, but do not support every workload.

 2.1 DBInputFormat with JDBC

DBInputFormat uses JDBC to connect to relational databases, typically MySQL or Oracle. The basic idea is that a MapReduce programmer provides a SQL query via theDBInputFormat class. The DBInputFormat class associates a modified SQL query with each mapper started by Hadoop.  Then each mapper sends a query through a standard JDBC driver to the DBMS and gets back a portion of the query results and works on the results in parallel. The DBInputFormat approach is correct because the union of all queries sent by all mappers is equivalent to the original SQL query.

The DBInputFormat approach provides two interfaces for a MapReduce program to directly access data from a DBMS. The underlying implementation is the same for the two interfaces. In the first interface, a MapReduce program provides a table name T, a list P of column names to be retrieved, optional filter conditions C on the table and column(s) O to be used in the Order-By clause, in addition to user name, password and DBMS URL values. The DBInputFormat implementation first generates a “count” query:

1
SELECT count(*) from T where C

and sends it to the DBMS to get the number of rows (R) in the query result. At runtime, the DBInputFormat implementation knows the number of mappers (M) started by Hadoop and associates the following query Q with each mapper. Each mapper will connect to the DBMS and send Q over JDBC connection and get back the results.

1
2
3
SELECT P FROM T WHERE C ORDER BY O
LIMIT  L                                                               (Q)              
OFFSET X

The above Query Q asks the DBMS to evaluate the query.

1
SELECT P FROM T WHERE C ORDER BY O,

but only return L number of rows starting from the offset X.  In total M queries are sent to the DBMS by the M mappers and they are almost identical except that the values of L and X are different. For the ith mapper (where 1 ≤ i ≤ M − 1) which is not the last mapper,  and. For the last mapper, and .

Basically all mappers except the last one will receive an average number of rows and the last mapper will get more rows than other mappers when the total number of rows in the result cannot be evenly divided by the number of mappers.

In the second interface of the DBInputFormat class, a mapper program can provide an arbitrary SQL select query SQ (which could involve multiple tables) whose results are the input to the mappers. The mapper has to provide a count query QC which returns an integer which is the number of rows returned by the query SQ. The DBInputFormatclass sends the query QC to the DBMS to get the number of rows (R), and the rest of the processing is the same as described for the first interface.

While this DBInputFormat approach clearly streamlines the process of accessing relational data from products like MySQL, the performance cannot scale. There are several performance issues with the DBInputFormat approach. In both interfaces, each mapper sends essentially the same SQL query to the DBMS but with different LIMIT and OFFSET clauses to get a subset of the relational data. Sorting is required at the DBMS side for every query sent by a mapper because of the ORDER-BY clause introduced into each query, even if the program itself does not need sorted input. This is how parallel processing of relational data by mappers is achieved in the DBInputFormat class. Furthermore, the DBMS has to execute as many queries as the number of mappers in the Hadoop system which is not efficient — especially when the number of mappers is large. The above performance issues are especially serious for a parallel DBMS such as Teradata EDW which tends to have high number of concurrent queries and larger datasets. Also the required ordering/sorting is an expensive operation in parallel DBMS because the rows in a table are not stored on a single node and sorting requires row redistribution across nodes.

DBInputFormat cannot be used to access Teradata EDW since LIMIT and OFFSET clauses are not in ANSI Standard SQL and are not supported by Teradata EDW.  However, a newer Apache Hadoop class named DataDrivenDBInputFormat derived from DBInputFormat can read input data from a Teradata EDW table. DataDrivenDBInputFormatoperates like DBInputFormat. The only difference is that  instead of using non-standard LIMIT and OFFSET to demarcate splits, DataDrivenDBInputFormat generates WHERE clauses which separate the data into roughly equivalent shards. DataDrivenDBInputFormat has all of the same DBInputFormat performance issues we have discussed above.

Figure 1 – DBInputFormat

2.2 TeradataDBInputFormat

The Teradata connector for Hadoop — TeradataDBInputFormat — sends the SQL query Q provided by a MapReduce program only once to Teradata EDW. Q is executed only once and the results are stored in a Partitioned Primary Index table (PPIT [4]. Then each mapper from Hadoop sends a new query Qi which just asks for the ith partition on every AMP.  Depending on the number of mappers, the complexity of the SQL query provided by a MapReduce program and the amount of data involved in the SQL query, the performance of the TeradataDBInputFormat approach can obviously be orders of magnitudes better than the DBInputFormat approach, as we have seen in our internaltesting.

Now we describe the architecture behind TeradataDBInputFormat. First, the TeradataDBInputFormat class sends the query P to the EDW based on the query Q provided by the mapper program.

1
2
3
CREATE TABLE T AS (Q) WITH DATA
PRIMARY INDEX ( c1 )                                             (P)  
PARTITION BY (c2 MOD M) + 1

The above query asks the EDW to evaluate Q and store the results – table layout and data — in a new PPI table T. The hash value of the primary index column c1 of each row in the query results determines which AMP should store that row. Then the value of the partition-by expression determines the physical partition (location) of each row on a particular AMP.  This is done using modulo M which means divide by m and take the remainder.

All rows on the same AMP with the same partition-by value are physically stored together and can be directly and efficiently located by the Teradata optimizer. There are different ways to automatically choose the primary index column and partition-by expression.

After the query Q is evaluated and the table T is created, each AMP has M partitions numbered from 1 to M (M is the number of mappers started in Hadoop). Then each mapper sends the following query Qi (1 ≤ i ≤ M) to the EDW,

1
SELECT * FROM T WHERE PARTITION = i                              (Qi)

Teradata EDW will directly locate all rows in the ith partition on every AMP in parallel and return them to the mapper. This operation is done in parallel for all mappers. After all mappers retrieve their data, the table is automatically deleted. Notice that if the original SQL query just selects data from a base table which is a PPI table, then we do not need to create another PPI table since we can directly use the existing partitions to partition the data each mapper should receive.

As mentioned in the beginning of Section 2.2,  if the number of mappers is large and the complexity of the SQL query provided by a MapReduce program is high (for example involving multi-table join and grouping), the performance of the TeradataDBInputFormat approach can obviously be orders of magnitudes better than theDBInputFormat approach. This is because the DBMS system has to execute the same user SQL query as many times as the number of mappers, sort the results and send back only a portion of the final results to each mapper. However in the TeradataDBInputFormat approach, the complex SQL query is executed only once and the results are stored in a PPI table’s multiple partitions each of which is sent to a different mapper. As mentioned before, the discussed TeradataDBInputFormat does not require any change to the Teradata EDW codebase. We have investigated a few areas where we can significantly improve the performance of the TeradataDBInputFormat approach with new enhancements to Teradata EDW, which we will probably discuss in a separate article.

Figure 2 – TeradataDBInputFormat

2.3 Known Issues

Notice that the data retrieved by a MapReduce program via the TeradataDBInputFormat approach or the DBInputFormat approach are not stored in Hadoop after the MapReduce program is finished, unless the MapReduce program intentionally does so. Therefore if some Teradata EDW data are frequently used by many MapReduce programs, it will be more efficient to copy these data and materialize them in HDFS.  One approach to store a Teradata table permanently in Hadoop DFS is to use Cloudera’s Sqoop [5] which we have integrated TeradataDBInputFormat into.

One potential issue in the current implementation provided in Appendix is that we could have column name conflict. For example, assume the business query in the DDL P in Section 2.2 is “select * from T1, T2 where T1.a=T2.b” and that T1 and T2 have columns of the same names. Currently Teradata DBMS will complain about column name conflict if we simply create a table to store the above query result. Either the EDW can be enhanced to automatically resolve the column name conflict or theTeradataDBInputFormat class can be enhanced to automatically resolve the column name conflict so that users do not need to change the query  “select * from T1, T2 where T1.a=T2.b” to explicitly uniquely name each column in the results which is a workaround solution for now.

2.4 DBOutputFormat

The DBOutputFormat provided by Cloudera writes to the database by generating a set of INSERT statements in each reducer.  The current DBOutputFormat approach while multiple Reducers sending batches of INSERT statements to DBMS can work with the Teradata EDW without modification. For more detail, please refer to [3].

3. Example using TeradataDBInputFormat

In the section, we first describe the requirements for running our TeradataDBInputFormat class, and then an example is used to explain how to use theTeradataDBInputFormat approach.

3.1 Requirements

The TeradataDBInputFormat class is implemented in Java. The enclosed package can be compiled and run in an environment with the following features:

  • Sun JDK 1.6, update 8 or later versions
  • Hadoop version 0.20.2 +228 or later versions
  • Teradata version V2R5 or later release

Note that the Hadoop DFS and the Teradata DBMS may or may not be installed on the same hardware platform.

You should start by downloading the TeradataDBInputFormat connector.  The JAR file (i.e., TeradataDBInputFormat.jar) should be placed into the $HADOOP_HOME/lib/directory on your Hadoop TaskTracker machines.  It is a good idea to also include it on any server you launch Hadoop jobs from.

3.2 Sample code using TeradataDBInputFormat class to access Teradata EDW data

Table Schema

Assume a MapReduce program needs to access some transaction data stored in the following table table_1, defined by the following CREATE TABLE statement:

1
2
3
4
5
6
CREATE TABLE table_1 (
   transaction_id int,
   product_id int,
   sale_date date,
   description varchar(64)
) PRIMARY INDEX(transaction_id).

To access the transaction and product information, a MapReduce program can simply provide a SQL query, like “select transaction_id, product_id from table_1 where transaction_id > 1000".

Configuring the MapReduce job

The following code shows how a MapReduce job using TeradataDBInputFormat class is configured and run.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class TransactionsMapReduceJob extends Configured implements Tool
{
    private String query = "";
    private String output_file_path = "";
 
    /**
    * Constructor
    */
    public TransactionsMapReduceJob(final String query_, final String output_)
    {
        query = query_;
        output_file_path = output_;
    }
     
    @Override
    public int run(String[] args) throws Exception
    {
        Configuration conf = getConf();
        Job myJob = new Job(conf, conf.get("mapreduce.job.name"));
        
        // the following statement is very important!!!
        ///1. Set the class as the record reader
        myJob.getConfiguration().setClass("record.reader.class", TransactionTDBWritable.class, TeradataDBWritable.class);
        
        ///2. Store the query  
        TeradataDBInputFormat.setInput(myJob.getConfiguration(), query, TransactionTDBWritable.class);
        
        ///3. Specify the input format class
        myJob.setInputFormatClass(TransactionDBInputFormat.class);
             
        myJob.setJarByClass(TransactionsMapReduceJob.class);
            
        myJob.setOutputKeyClass(LongWritable.class);
        myJob.setOutputValueClass(LongWritable.class);
            
        myJob.setMapperClass(TransactionMapper.class);
        myJob.setReducerClass(TransactionReducer.class);
            
        myJob.setOutputFormatClass(TextOutputFormat.class);
                            
        FileOutputFormat.setOutputPath(myJob, new  Path(output_file_path));
            
        int ret = myJob.waitForCompletion(true) ? 0 : 1;
            
        // clean up ...
         TeradataDBInputFormat.cleanDB(myJob.getConfiguration());
            
        return ret;
    }
     
    public static void main(String[] args) throws Exception
    {
        int res = 0;
        try{
            int args_num = args.length;
         
            // Assumption 1: The second to last parameter: output file path
            String output_file_path = args[args_num-2];
            // Assumption 2: The last parameter: the query
            String query = args[args_num-1];
             
            Tool mapred_tool = new TransactionsMapReduceJob(query, output_file_path);
            res = ToolRunner.run(new Configuration(), mapred_tool, args);
             
        } catch (Exception e)
        {
            e.printStackTrace();
        } finally
        {
            System.exit(res);
        }
    }
}

Defining Java Class to represent DBMS data to be used in MapReduce program

The following class TDBWritable is defined to describe data from Teradata EDW to be used by the MapReduce program.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class TransactionTDBWritable implements TDBWritable, Writable
{
    private long transaction_id = 0;
    private long product_id = 0;
     
    // Static code: the programmer should explicitly declare the attributes (name and type) related to the query.
    static private List<String> attribute_names = new Vector<String>();
    static private List<String> attribute_types = new Vector<String>();
    static
    {
        // The corresponding query:
        /// SELECT transaction_id, product_id FROM ... WHERE ...
        //1. for the first item
        attribute_names.add("transaction_id");
        attribute_types.add("int");
        //2. for the second item
        attribute_names.add("product_id");
        attribute_types.add("int");
    }
    /**
    * Default constructor
    */
    public TransactionTDBWritable(){super();}
    @Override
    public void readFields(ResultSet resultSet) throws SQLException
    {
        transaction_id = resultSet.getLong("transaction_id");
        product_id = resultSet.getLong("product_id");
    }
     
    
     
    @Override
    public void readFields(DataInput in) throws IOException
    {
        transaction_id = in.readLong();
        product_id = in.readLong();
    }
     
    @Override
    public void write(DataOutput out) throws IOException
    {
        out.writeLong(transaction_id);
        out.writeLong(product_id);
    }
     
    @Override
    public void addAttrbute(String name, String type)
    {
        attribute_names.add(name);
        attribute_types.add(type);
    }
      
    @Override
    public List<String> getAttributeNameList()
    {
        return attribute_names;
    }
     
    @Override
    public List<String> getAttributeValueList()
    {
        return attribute_types;
    }
}

Note that the TeradataDBInputFormat can be enhanced such that the above class does not need to be manually created since it can be automatically generated by looking at the resulting query’s schema.

A dummy class inheriting from TeradataDBInputFormat<T> is needed:

1
2
3
4
5
public class TransactionDBInputFormat extends TeradataDBInputFormat<TransactionTDBWritable>
{
    //NEED DO NOTHING!
    //Transfer the type information of “TransactionTDBWritable” down to TransactionDBInputFormat’s constructor.
}

Using data in a Mapper

The TeradataDBInputFormat will read from the database and populate the retrieved data to the fields in the TransactionTDBWritable class.  A mapper then receives an instance of the TransactionTDBWritable implementation as its input value and can use the retrieved DBMS data as desired.  The following code simply shows how a mapper has direct access to DBMS data passed to it as an instance of the TransactionTDBWritable class.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TransactionMapper extends Mapper<LongWritable, TransactionTDBWritable, LongWritable, LongWritable>
{
    public TransactionMapper(){}
    
    protected void map(LongWritable k1, TransactionTDBWritable v1, Context context)
    {
        try
        {
            context.write(new LongWritable(v1.getTransactionID()), new LongWritable(v1.getProductID()));           
        } catch (IOException e)
        {
            ...
        } catch (InterruptedException e)
        {
            ...
        }
    }
}

Prepare the configuration properties

To enable the access to the Teradata DB, the mapper program also needs to know the information of the database connection, like DB URL, user account, password and so on. This information can be stored in a property XML file as shown in the following file TeraProps.xml.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<?xmlversion="1.0"?>
<?xml-stylesheettype="text/xsl" href="configuration.xsl"?>
<configuration>
    <!-- Parameters about the Teradata RDBMS -->
    <property>
        <name>teradata.db.url</name>
        <value>127.1.2.1</value>
        <description>
            The URL of the Teradata DB the program is going to interact with
        </description>
    </property>
    <property>
        <name>teradata.db.account</name>
        <value>user1</value>
        <description>The account name</description>
    </property>
    <property>
        <name>teradata.db.password</name>
        <value>b123</value>
        <description>Password</description>
    </property>
    
</configuration>

Deploy and run

The whole source package should be first compiled and compressed as a jar file, for example, MyMapReduceJob.jar, before it can be run on the Hadoop DFS. Assume that we put the property XML file under the same directory as the jar file. Then, we may start the MapReduce job with the following command:

HADOOP_HOME/bin/hadoop jar MyMapReduceJob.jar TransactionsMapReduceJob -Dmapred.map.tasks=32 -conf TeraProps.xml output.tbl "select transaction_id, product_id from table_1 where transaction_id > 1000"

where:

  1. HADOOP_HOME stands for the Hadoop installation directory
  2. -Dmapred.map.tasks=32 sets the number of map tasks to 32
  3. -conf TeraProps.xml tells where to find the parameters about the Teradata DBMS
  4. the file output.tbl contains the job’s output, and
  5. select transaction_id, product_id from table_1 where transaction_id > 1000” is the user query.

4. Conclusion

MapReduce related research and development continues to be active and attract interests from both industry and academia. MapReduce is particularly interesting to parallel DBMS vendors since both MapReduce and Teradata Data Warehouses use clusters of nodes and shared-nothing scale-out technology for large scale data analysis.  The TeradataDBInputFormat approach in this article show how MapReduce programs can efficiently and directly have parallel access to Teradata EDW data without external steps of exporting and loading data from Teradata EDW to Hadoop.

5. References

[1] http://www.cloudera.com/blog/2009/03/database-access-with-hadoop/

[2] http://www.cloudera.com

[3] DBInputFormathttp://www.cloudera.com/blog/2009/03/databaseaccess-with-hadoop

[4] Teradata Online Documentation http://www.info.teradata.com

[5] Cloudera Scoop http://www.cloudera.com/blog/2009/06/introducing-sqoop/

6. Appendix

The TeradataDBInputFormat class is implemented in Java, and the related source code is included in the attached zip file. The source code is composed of three parts:

The core implementation of the TeradataDBInputFormat classes is built based on the PPI strategy in Section 2.2. Five classes are defined:

  • TeradataDBInputFormat
  • TeradataDBSplit
  • TeradataDBRecordReader
  • TeradataDBWritable
  • DummyDBWritable

The generation of the internal intermediate table in Teradata DBMS according to the user query.

  • IntermediateTableQueryGenerator
  • IntermediateTableGenerator

An example to show how to use the TeradataDBInputFormat class.

  • TransactionTDBWritable
  • TransactionDBInputFormat
  • TransactionMapper
  • TransactionReducer
  • TransactionsMapReduceJob

Database Access with Hadoop

Database Access with Hadoop | Apache Hadoop for the Enterprise | Cloudera.

Hadoop’s strength is that it enables ad-hoc analysis of unstructured or semi-structured data. Relational databases, by contrast, allow for fast queries of very structured data sources. A point of frustration has been the inability to easily query both of these sources at the same time. The DBInputFormatcomponent provided in Hadoop 0.19 finally allows easy import and export of data between Hadoop and many relational databases, allowing relational data to be more easily incorporated into your data processing pipeline.

This blog post explains how the DBInputFormat works and provides an example of using DBInputFormat to import data into HDFS.

DBInputFormat and JDBC

First we’ll cover how DBInputFormat interacts with databases. DBInputFormat uses JDBC to connect to data sources. Because JDBC is widely implemented, DBInputFormat can work with MySQL, PostgreSQL, and several other database systems. Individual database vendors provide JDBC drivers to allow third-party applications (like Hadoop) to connect to their databases. Links to popular drivers are listed in the resources section at the end of this post.

To start using DBInputFormat to connect to your database, you’ll need to download the appropriate database driver from the list in the resources section (see the end of this post), and drop it into the$HADOOP_HOME/lib/ directory on your Hadoop TaskTracker machines, and on the machine where you launch your jobs from.

Reading Tables with DBInputFormat

The DBInputFormat is an InputFormat class that allows you to read data from a database. An InputFormat is Hadoop’s formalization of a data source; it can mean files formatted in a particular way, data read from a database, etc. DBInputFormat provides a simple method of scanning entire tables from a database, as well as the means to read from arbitrary SQL queries performed against the database. Most queries are supported, subject to a few limitations discussed at the end of this article.

Configuring the job

To use the DBInputFormat, you’ll need to configure your job. The following example shows how to connect to a MySQL database and load from a table:

CREATE TABLE employees ( employee_id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(32) NOT NULL); 

Listing 1: Example table schema

JobConf conf = new JobConf(getConf(), MyDriver.class); conf.setInputFormat(DBInputFormat.class); DBConfiguration.configureDB(conf, “com.mysql.jdbc.Driver”, “jdbc:mysql://localhost/mydatabase”); String [] fields = { “employee_id”, "name" }; DBInputFormat.setInput(conf, MyRecord.class, “employees”, null /* conditions */, “employee_id”, fields); // set Mapper, etc., and call JobClient.runJob(conf); 

Listing 2: Java code to set up a MapReduce job with DBInputFormat

This example code will connect to mydatabase on localhost and read the two fields from theemployees table.

The configureDB() and setInput() calls configure the DBInputFormat. The first call specifies the JDBC driver implementation to use and what database to connect to. The second call specifies what data to load from the database. The MyRecord class is the class where data will be read into in Java, and "employees" is the name of the table to read. The "employee_id" parameter specifies the table’s primary key, used for ordering results. The section “Limitations of the InputFormat” below explains why this is necessary. Finally, the fields array lists what columns of the table to read. An overloaded definition of setInput() allows you to specify an arbitrary SQL query to read from, instead.

After calling configureDB() and setInput(), you should configure the rest of your job as usual, setting the Mapper and Reducer classes, specifying any other data sources to read from (e.g., datasets in HDFS) and other job-specific parameters.

Retrieving the data

The DBInputFormat will read from the database, but how does data get to your mapper? ThesetInput() method used in the example above took, as a parameter, the name of a class which will hold the contents of one row. You’ll need to write an implementation of the DBWritable interface to allow DBInputFormat to populate your class with fields from the table. DBWritable is an adaptor interface that allows data to be read and written using both Hadoop’s internal serialization mechanism, and using JDBC calls. Once the data is read into your custom class, you can then read the class’ fields in the mapper.

The following example provides a DBWritable implementation that holds one record from theemployees table, as described above:

class MyRecord implements Writable, DBWritable { long id; String name; public void readFields(DataInput in) throws IOException { this.id = in.readLong(); this.name = Text.readString(in); } public void readFields(ResultSet resultSet) throws SQLException { this.id = resultSet.getLong(1); this.name = resultSet.getString(2); } public void write(DataOutput out) throws IOException { out.writeLong(this.id); Text.writeString(out, this.name); } public void write(PreparedStatement stmt) throws SQLException { stmt.setLong(1, this.id); stmt.setString(2, this.name); } } 

Listing 3: DBWritable implementation for records from the employees table

java.sql.ResultSet object represents the data returned from a SQL statement. It contains a cursor representing a single row of the results. This row will contain the fields specified in the setInput() call. In the readFields() method of MyRecord, we read the two fields from the ResultSet. ThereadFields() and write() methods that operate on java.io.DataInput and DataOutput objects are part of the Writable interface used by Hadoop to marshal data between mappers and reducers, or pack results into SequenceFiles.

Using the data in a mapper

The mapper then receives an instance of your DBWritable implementation as its input value. The input key is a row id provided by the database; you’ll most likely discard this value.

public class MyMapper extends MapReduceBase implements Mapper<LongWritable, MyRecord, LongWritable, Text> { public void map(LongWritable key, MyRecord val, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException { // Use val.id, val.name here output.collect(new LongWritable(val.id), new Text(val.name)); } } 

Listing 4: Example mapper using a custom DBWritable

Writing results back to the database

A companion class, DBOutputFormat, will allow you to write results back to a database. When setting up the job, call conf.setOutputFormat(DBOutputFormat.class); and then callDBConfiguration.configureDB() as before.

The DBOutputFormat.setOutput() method then defines how the results will be written back to the database. Its three arguments are the JobConf object for the job, a string defining the name of the table to write to, and an array of strings defining the fields of the table to populate. e.g.,DBOutputFormat.setOutput(job, "employees", "employee_id", "name");.

The same DBWritable implementation that you created earlier will suffice to inject records back into the database. The write(PreparedStatement stmt) method will be invoked on each instance of theDBWritable that you pass to the OutputCollector from the reducer. At the end of reducing, those PreparedStatement objects will be turned into INSERT statements to run against the SQL database.

Limitations of the InputFormat

JDBC allows applications to generate SQL queries which are executed against the database; the results are then returned to the calling application. Keep in mind that you will be interacting with your database via repeated SQL queries. Therefore:

  • Hadoop may need to execute the same query multiple times. It will need to return the same results each time. So any concurrent updates to your database, etc, should not affect the query being run by your MapReduce job. This can be accomplished by disallowing writes to the table while your MapReduce job runs, restricting your MapReduce’s query via a clause such as “insert_date <yesterday,” or dumping the data to a temporary table in the database before launching your MapReduce process.
  • In order to parallelize the processing of records from the database, Hadoop will execute SQL queries that use ORDER BYLIMIT, and OFFSET clauses to select ranges out of tables. Your results, therefore, need to be orderable by one or more keys (either PRIMARY, like the one in the example, or UNIQUE).
  • In order to set the number of map tasks, the DBInputFormat needs to know how many records it will read. So if you’re writing an arbitrary SQL query against the database, you will need to provide a second query that returns the number of rows that the first query will return (e.g., by using COUNTand GROUP BY).

With these restrictions in mind, there’s still a great deal of flexibility available to you. You can bulk load entire tables into HDFS, or select large ranges of data. For example, if you want to read records from a table that is also being populated by another source concurrently, you might set up that table to attach a timestamp field to each record. Before doing the bulk read, pick the current timestamp, then select all records with timestamps earlier than that one. New records being fed in by the other writer will have later timestamps and will not affect the MapReduce job.

Finally, be careful to understand the bottlenecks in your data processing pipeline. Launching a MapReduce job with 100 mappers performing queries against a database server may overload the server or its network connection. In this case, you’ll achieve less parallelism than theoretically possible, due to starvation, disk seeks, and other performance penalties.

Limitations of the OutputFormat

The DBOutputFormat writes to the database by generating a set of INSERT statements in each reducer. The reducer’s close() method then executes them in a bulk transaction. Performing a large number of these from several reduce tasks concurrently can swamp a database. If you want to export a very large volume of data, you may be better off generating the INSERT statements into a text file, and then using a bulk data import tool provided by your database to do the database import.

Conclusions

DBInputFormat provides a straightforward interface to read data from a database into your MapReduce applications. You can read database tables into HDFS, import them into Hive, or use them to perform joins in MapReduce jobs. By supporting JDBC, it provides a common interface to a variety of different database sources.

This is probably best not used as a primary data access mechanism; queries against database-driven data are most efficiently executed within the database itself, and large-scale data migration is better done using the bulk data export/import tools associated with your database. But when analysis of ad hoc data in HDFS can be improved by the addition of some additional relational data, DBInputFormat allows you to quickly perform the join without a large amount of setup overhead. DBOutputFormat then allows you to export results back to the same database for combining with other database-driven tables.

DBInputFormat is available in Hadoop 0.19 and is provided by HADOOP-2536, a patch started by Fredrik Hedberg and further developed by Enis Soztutar. A backport of this patch that can be applied to Hadoop 0.18.3 is available at the above link.

This article is based on a talk I gave at the SF Bay Hadoop User Group meetup on Feburary 18th; the slides from that talk are available as a PDF.

Resources