Hadoop MapReduce Connector to Teradata EDW

Hadoop MapReduce Connector to Teradata EDW | Teradata Developer Exchange.

Hadoop MapReduce programmers often find that it is more convenient and productive to have direct access from their MapReduce programs to data stored in a RDBMS such as Teradata Enterprise Data Warehouse (EDW) because:

  1. There is no benefit to exporting relational data into a flat file.
  2. There is no need to upload the file into the Hadoop Distributed File System (HDFS).
  3. There is no need to change and rerun the scripts/commands in the first two steps when they need to use different tables/columns in their MapReduce programs.

1. Introduction

Connectivity between a Java or other language program to JDBC and ODBC is well understood by most programmers.  But when dealing with Hadoop and MPP databases, two factors outside the domain of connectors become crucial: scale and balance.  Hadoop and MPP databases often run into the dozens or hundreds of server nodes, consuming 10s or hundreds of terabytes per execution. For the shared-nothing architecture to perform at maximum throughput, the processing workload and data must be partitioned across execution threads.  Otherwise, one server will have an inordinate amount of work compared to others, causing the total elapsed time to be slower. Consequently, a Hadoop connector is needed to support parallel efficiency.

In this document we first describe how MapReduce programs (a.k.a. mappers) can have parallel access to the Teradata EDW data using the TeradataDBInputFormatapproach discussed in Section 2. In Section 3, we provide a complete example with accompanying source code which can be directly used to access the EDW without any changes required in the data warehouse or Hadoop. The TeradataDBInputFormat class can be directly used by programmers without any changes for many applications. Overall, readers can get the following out of this article:

  1. Architecturally how MapReduce programs get direct and parallel access to the Teradata EDW.
  2. How TeradataDBInputFormat class can be used without changes to Hadoop or the EDW. Step-by-step installation and deployment is included in the example.
  3. How programmers can extend the TeradataDBInputFormat approach for specific application needs.

2. The TeradataDBInputFormat approach

A common approach for a MapReduce program to access relational data is to first use the DBMS export utility to pass SQL answer sets to a local file and then load the local file to Hadoop.

However, there are several use cases where the export and load into HDFS is inconvenient for the programmer. Recognizing the need to access relational data in MapReduce programs, the Apache.org open source project for Hadoop provides the DBInputFormat class library.  The DBInputFormat and DBOutputFormat Java class libraries allow MapReduce programs to send SQL queries through the standard JDBC interface to the EDW in parallel. Teradata provides a version of DBInputFormat [3] that will be part of the Cloudera Distribution for Hadoop. Note that Cloudera has a good explanation of DBInputFormat on their website. The TeradataDBInputFormat approach is inspired by but not based on the Apache DBInputFormat approach.

DBInputFormat and DBOutputFormat along with their Teradata versions are good interfaces for ad hoc medium or small volume data transfers.   They make it easy to copy tables from the EDW into HDFS and vice versa.  One good use of these interfaces is when a mapper program needs to do table look-ups but has no need to persist the data fetched.  These interfaces are not efficient for high volume data transfers where bulk data movement tools like Teradata Parallel Transporter are more appropriate.   In many cases queries and bulk data movement is better optimized inside the database itself.   While it’s an oversimplification, think of the input and output format class libraries similar to workloads processed by BTEQ.  They are very flexible and useful, but do not support every workload.

 2.1 DBInputFormat with JDBC

DBInputFormat uses JDBC to connect to relational databases, typically MySQL or Oracle. The basic idea is that a MapReduce programmer provides a SQL query via theDBInputFormat class. The DBInputFormat class associates a modified SQL query with each mapper started by Hadoop.  Then each mapper sends a query through a standard JDBC driver to the DBMS and gets back a portion of the query results and works on the results in parallel. The DBInputFormat approach is correct because the union of all queries sent by all mappers is equivalent to the original SQL query.

The DBInputFormat approach provides two interfaces for a MapReduce program to directly access data from a DBMS. The underlying implementation is the same for the two interfaces. In the first interface, a MapReduce program provides a table name T, a list P of column names to be retrieved, optional filter conditions C on the table and column(s) O to be used in the Order-By clause, in addition to user name, password and DBMS URL values. The DBInputFormat implementation first generates a “count” query:

1
SELECT count(*) from T where C

and sends it to the DBMS to get the number of rows (R) in the query result. At runtime, the DBInputFormat implementation knows the number of mappers (M) started by Hadoop and associates the following query Q with each mapper. Each mapper will connect to the DBMS and send Q over JDBC connection and get back the results.

1
2
3
SELECT P FROM T WHERE C ORDER BY O
LIMIT  L                                                               (Q)              
OFFSET X

The above Query Q asks the DBMS to evaluate the query.

1
SELECT P FROM T WHERE C ORDER BY O,

but only return L number of rows starting from the offset X.  In total M queries are sent to the DBMS by the M mappers and they are almost identical except that the values of L and X are different. For the ith mapper (where 1 ≤ i ≤ M − 1) which is not the last mapper,  and. For the last mapper, and .

Basically all mappers except the last one will receive an average number of rows and the last mapper will get more rows than other mappers when the total number of rows in the result cannot be evenly divided by the number of mappers.

In the second interface of the DBInputFormat class, a mapper program can provide an arbitrary SQL select query SQ (which could involve multiple tables) whose results are the input to the mappers. The mapper has to provide a count query QC which returns an integer which is the number of rows returned by the query SQ. The DBInputFormatclass sends the query QC to the DBMS to get the number of rows (R), and the rest of the processing is the same as described for the first interface.

While this DBInputFormat approach clearly streamlines the process of accessing relational data from products like MySQL, the performance cannot scale. There are several performance issues with the DBInputFormat approach. In both interfaces, each mapper sends essentially the same SQL query to the DBMS but with different LIMIT and OFFSET clauses to get a subset of the relational data. Sorting is required at the DBMS side for every query sent by a mapper because of the ORDER-BY clause introduced into each query, even if the program itself does not need sorted input. This is how parallel processing of relational data by mappers is achieved in the DBInputFormat class. Furthermore, the DBMS has to execute as many queries as the number of mappers in the Hadoop system which is not efficient — especially when the number of mappers is large. The above performance issues are especially serious for a parallel DBMS such as Teradata EDW which tends to have high number of concurrent queries and larger datasets. Also the required ordering/sorting is an expensive operation in parallel DBMS because the rows in a table are not stored on a single node and sorting requires row redistribution across nodes.

DBInputFormat cannot be used to access Teradata EDW since LIMIT and OFFSET clauses are not in ANSI Standard SQL and are not supported by Teradata EDW.  However, a newer Apache Hadoop class named DataDrivenDBInputFormat derived from DBInputFormat can read input data from a Teradata EDW table. DataDrivenDBInputFormatoperates like DBInputFormat. The only difference is that  instead of using non-standard LIMIT and OFFSET to demarcate splits, DataDrivenDBInputFormat generates WHERE clauses which separate the data into roughly equivalent shards. DataDrivenDBInputFormat has all of the same DBInputFormat performance issues we have discussed above.

Figure 1 – DBInputFormat

2.2 TeradataDBInputFormat

The Teradata connector for Hadoop — TeradataDBInputFormat — sends the SQL query Q provided by a MapReduce program only once to Teradata EDW. Q is executed only once and the results are stored in a Partitioned Primary Index table (PPIT [4]. Then each mapper from Hadoop sends a new query Qi which just asks for the ith partition on every AMP.  Depending on the number of mappers, the complexity of the SQL query provided by a MapReduce program and the amount of data involved in the SQL query, the performance of the TeradataDBInputFormat approach can obviously be orders of magnitudes better than the DBInputFormat approach, as we have seen in our internaltesting.

Now we describe the architecture behind TeradataDBInputFormat. First, the TeradataDBInputFormat class sends the query P to the EDW based on the query Q provided by the mapper program.

1
2
3
CREATE TABLE T AS (Q) WITH DATA
PRIMARY INDEX ( c1 )                                             (P)  
PARTITION BY (c2 MOD M) + 1

The above query asks the EDW to evaluate Q and store the results – table layout and data — in a new PPI table T. The hash value of the primary index column c1 of each row in the query results determines which AMP should store that row. Then the value of the partition-by expression determines the physical partition (location) of each row on a particular AMP.  This is done using modulo M which means divide by m and take the remainder.

All rows on the same AMP with the same partition-by value are physically stored together and can be directly and efficiently located by the Teradata optimizer. There are different ways to automatically choose the primary index column and partition-by expression.

After the query Q is evaluated and the table T is created, each AMP has M partitions numbered from 1 to M (M is the number of mappers started in Hadoop). Then each mapper sends the following query Qi (1 ≤ i ≤ M) to the EDW,

1
SELECT * FROM T WHERE PARTITION = i                              (Qi)

Teradata EDW will directly locate all rows in the ith partition on every AMP in parallel and return them to the mapper. This operation is done in parallel for all mappers. After all mappers retrieve their data, the table is automatically deleted. Notice that if the original SQL query just selects data from a base table which is a PPI table, then we do not need to create another PPI table since we can directly use the existing partitions to partition the data each mapper should receive.

As mentioned in the beginning of Section 2.2,  if the number of mappers is large and the complexity of the SQL query provided by a MapReduce program is high (for example involving multi-table join and grouping), the performance of the TeradataDBInputFormat approach can obviously be orders of magnitudes better than theDBInputFormat approach. This is because the DBMS system has to execute the same user SQL query as many times as the number of mappers, sort the results and send back only a portion of the final results to each mapper. However in the TeradataDBInputFormat approach, the complex SQL query is executed only once and the results are stored in a PPI table’s multiple partitions each of which is sent to a different mapper. As mentioned before, the discussed TeradataDBInputFormat does not require any change to the Teradata EDW codebase. We have investigated a few areas where we can significantly improve the performance of the TeradataDBInputFormat approach with new enhancements to Teradata EDW, which we will probably discuss in a separate article.

Figure 2 – TeradataDBInputFormat

2.3 Known Issues

Notice that the data retrieved by a MapReduce program via the TeradataDBInputFormat approach or the DBInputFormat approach are not stored in Hadoop after the MapReduce program is finished, unless the MapReduce program intentionally does so. Therefore if some Teradata EDW data are frequently used by many MapReduce programs, it will be more efficient to copy these data and materialize them in HDFS.  One approach to store a Teradata table permanently in Hadoop DFS is to use Cloudera’s Sqoop [5] which we have integrated TeradataDBInputFormat into.

One potential issue in the current implementation provided in Appendix is that we could have column name conflict. For example, assume the business query in the DDL P in Section 2.2 is “select * from T1, T2 where T1.a=T2.b” and that T1 and T2 have columns of the same names. Currently Teradata DBMS will complain about column name conflict if we simply create a table to store the above query result. Either the EDW can be enhanced to automatically resolve the column name conflict or theTeradataDBInputFormat class can be enhanced to automatically resolve the column name conflict so that users do not need to change the query  “select * from T1, T2 where T1.a=T2.b” to explicitly uniquely name each column in the results which is a workaround solution for now.

2.4 DBOutputFormat

The DBOutputFormat provided by Cloudera writes to the database by generating a set of INSERT statements in each reducer.  The current DBOutputFormat approach while multiple Reducers sending batches of INSERT statements to DBMS can work with the Teradata EDW without modification. For more detail, please refer to [3].

3. Example using TeradataDBInputFormat

In the section, we first describe the requirements for running our TeradataDBInputFormat class, and then an example is used to explain how to use theTeradataDBInputFormat approach.

3.1 Requirements

The TeradataDBInputFormat class is implemented in Java. The enclosed package can be compiled and run in an environment with the following features:

  • Sun JDK 1.6, update 8 or later versions
  • Hadoop version 0.20.2 +228 or later versions
  • Teradata version V2R5 or later release

Note that the Hadoop DFS and the Teradata DBMS may or may not be installed on the same hardware platform.

You should start by downloading the TeradataDBInputFormat connector.  The JAR file (i.e., TeradataDBInputFormat.jar) should be placed into the $HADOOP_HOME/lib/directory on your Hadoop TaskTracker machines.  It is a good idea to also include it on any server you launch Hadoop jobs from.

3.2 Sample code using TeradataDBInputFormat class to access Teradata EDW data

Table Schema

Assume a MapReduce program needs to access some transaction data stored in the following table table_1, defined by the following CREATE TABLE statement:

1
2
3
4
5
6
CREATE TABLE table_1 (
   transaction_id int,
   product_id int,
   sale_date date,
   description varchar(64)
) PRIMARY INDEX(transaction_id).

To access the transaction and product information, a MapReduce program can simply provide a SQL query, like “select transaction_id, product_id from table_1 where transaction_id > 1000".

Configuring the MapReduce job

The following code shows how a MapReduce job using TeradataDBInputFormat class is configured and run.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class TransactionsMapReduceJob extends Configured implements Tool
{
    private String query = "";
    private String output_file_path = "";
 
    /**
    * Constructor
    */
    public TransactionsMapReduceJob(final String query_, final String output_)
    {
        query = query_;
        output_file_path = output_;
    }
     
    @Override
    public int run(String[] args) throws Exception
    {
        Configuration conf = getConf();
        Job myJob = new Job(conf, conf.get("mapreduce.job.name"));
        
        // the following statement is very important!!!
        ///1. Set the class as the record reader
        myJob.getConfiguration().setClass("record.reader.class", TransactionTDBWritable.class, TeradataDBWritable.class);
        
        ///2. Store the query  
        TeradataDBInputFormat.setInput(myJob.getConfiguration(), query, TransactionTDBWritable.class);
        
        ///3. Specify the input format class
        myJob.setInputFormatClass(TransactionDBInputFormat.class);
             
        myJob.setJarByClass(TransactionsMapReduceJob.class);
            
        myJob.setOutputKeyClass(LongWritable.class);
        myJob.setOutputValueClass(LongWritable.class);
            
        myJob.setMapperClass(TransactionMapper.class);
        myJob.setReducerClass(TransactionReducer.class);
            
        myJob.setOutputFormatClass(TextOutputFormat.class);
                            
        FileOutputFormat.setOutputPath(myJob, new  Path(output_file_path));
            
        int ret = myJob.waitForCompletion(true) ? 0 : 1;
            
        // clean up ...
         TeradataDBInputFormat.cleanDB(myJob.getConfiguration());
            
        return ret;
    }
     
    public static void main(String[] args) throws Exception
    {
        int res = 0;
        try{
            int args_num = args.length;
         
            // Assumption 1: The second to last parameter: output file path
            String output_file_path = args[args_num-2];
            // Assumption 2: The last parameter: the query
            String query = args[args_num-1];
             
            Tool mapred_tool = new TransactionsMapReduceJob(query, output_file_path);
            res = ToolRunner.run(new Configuration(), mapred_tool, args);
             
        } catch (Exception e)
        {
            e.printStackTrace();
        } finally
        {
            System.exit(res);
        }
    }
}

Defining Java Class to represent DBMS data to be used in MapReduce program

The following class TDBWritable is defined to describe data from Teradata EDW to be used by the MapReduce program.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class TransactionTDBWritable implements TDBWritable, Writable
{
    private long transaction_id = 0;
    private long product_id = 0;
     
    // Static code: the programmer should explicitly declare the attributes (name and type) related to the query.
    static private List<String> attribute_names = new Vector<String>();
    static private List<String> attribute_types = new Vector<String>();
    static
    {
        // The corresponding query:
        /// SELECT transaction_id, product_id FROM ... WHERE ...
        //1. for the first item
        attribute_names.add("transaction_id");
        attribute_types.add("int");
        //2. for the second item
        attribute_names.add("product_id");
        attribute_types.add("int");
    }
    /**
    * Default constructor
    */
    public TransactionTDBWritable(){super();}
    @Override
    public void readFields(ResultSet resultSet) throws SQLException
    {
        transaction_id = resultSet.getLong("transaction_id");
        product_id = resultSet.getLong("product_id");
    }
     
    
     
    @Override
    public void readFields(DataInput in) throws IOException
    {
        transaction_id = in.readLong();
        product_id = in.readLong();
    }
     
    @Override
    public void write(DataOutput out) throws IOException
    {
        out.writeLong(transaction_id);
        out.writeLong(product_id);
    }
     
    @Override
    public void addAttrbute(String name, String type)
    {
        attribute_names.add(name);
        attribute_types.add(type);
    }
      
    @Override
    public List<String> getAttributeNameList()
    {
        return attribute_names;
    }
     
    @Override
    public List<String> getAttributeValueList()
    {
        return attribute_types;
    }
}

Note that the TeradataDBInputFormat can be enhanced such that the above class does not need to be manually created since it can be automatically generated by looking at the resulting query’s schema.

A dummy class inheriting from TeradataDBInputFormat<T> is needed:

1
2
3
4
5
public class TransactionDBInputFormat extends TeradataDBInputFormat<TransactionTDBWritable>
{
    //NEED DO NOTHING!
    //Transfer the type information of “TransactionTDBWritable” down to TransactionDBInputFormat’s constructor.
}

Using data in a Mapper

The TeradataDBInputFormat will read from the database and populate the retrieved data to the fields in the TransactionTDBWritable class.  A mapper then receives an instance of the TransactionTDBWritable implementation as its input value and can use the retrieved DBMS data as desired.  The following code simply shows how a mapper has direct access to DBMS data passed to it as an instance of the TransactionTDBWritable class.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TransactionMapper extends Mapper<LongWritable, TransactionTDBWritable, LongWritable, LongWritable>
{
    public TransactionMapper(){}
    
    protected void map(LongWritable k1, TransactionTDBWritable v1, Context context)
    {
        try
        {
            context.write(new LongWritable(v1.getTransactionID()), new LongWritable(v1.getProductID()));           
        } catch (IOException e)
        {
            ...
        } catch (InterruptedException e)
        {
            ...
        }
    }
}

Prepare the configuration properties

To enable the access to the Teradata DB, the mapper program also needs to know the information of the database connection, like DB URL, user account, password and so on. This information can be stored in a property XML file as shown in the following file TeraProps.xml.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<?xmlversion="1.0"?>
<?xml-stylesheettype="text/xsl" href="configuration.xsl"?>
<configuration>
    <!-- Parameters about the Teradata RDBMS -->
    <property>
        <name>teradata.db.url</name>
        <value>127.1.2.1</value>
        <description>
            The URL of the Teradata DB the program is going to interact with
        </description>
    </property>
    <property>
        <name>teradata.db.account</name>
        <value>user1</value>
        <description>The account name</description>
    </property>
    <property>
        <name>teradata.db.password</name>
        <value>b123</value>
        <description>Password</description>
    </property>
    
</configuration>

Deploy and run

The whole source package should be first compiled and compressed as a jar file, for example, MyMapReduceJob.jar, before it can be run on the Hadoop DFS. Assume that we put the property XML file under the same directory as the jar file. Then, we may start the MapReduce job with the following command:

HADOOP_HOME/bin/hadoop jar MyMapReduceJob.jar TransactionsMapReduceJob -Dmapred.map.tasks=32 -conf TeraProps.xml output.tbl "select transaction_id, product_id from table_1 where transaction_id > 1000"

where:

  1. HADOOP_HOME stands for the Hadoop installation directory
  2. -Dmapred.map.tasks=32 sets the number of map tasks to 32
  3. -conf TeraProps.xml tells where to find the parameters about the Teradata DBMS
  4. the file output.tbl contains the job’s output, and
  5. select transaction_id, product_id from table_1 where transaction_id > 1000” is the user query.

4. Conclusion

MapReduce related research and development continues to be active and attract interests from both industry and academia. MapReduce is particularly interesting to parallel DBMS vendors since both MapReduce and Teradata Data Warehouses use clusters of nodes and shared-nothing scale-out technology for large scale data analysis.  The TeradataDBInputFormat approach in this article show how MapReduce programs can efficiently and directly have parallel access to Teradata EDW data without external steps of exporting and loading data from Teradata EDW to Hadoop.

5. References

[1] http://www.cloudera.com/blog/2009/03/database-access-with-hadoop/

[2] http://www.cloudera.com

[3] DBInputFormathttp://www.cloudera.com/blog/2009/03/databaseaccess-with-hadoop

[4] Teradata Online Documentation http://www.info.teradata.com

[5] Cloudera Scoop http://www.cloudera.com/blog/2009/06/introducing-sqoop/

6. Appendix

The TeradataDBInputFormat class is implemented in Java, and the related source code is included in the attached zip file. The source code is composed of three parts:

The core implementation of the TeradataDBInputFormat classes is built based on the PPI strategy in Section 2.2. Five classes are defined:

  • TeradataDBInputFormat
  • TeradataDBSplit
  • TeradataDBRecordReader
  • TeradataDBWritable
  • DummyDBWritable

The generation of the internal intermediate table in Teradata DBMS according to the user query.

  • IntermediateTableQueryGenerator
  • IntermediateTableGenerator

An example to show how to use the TeradataDBInputFormat class.

  • TransactionTDBWritable
  • TransactionDBInputFormat
  • TransactionMapper
  • TransactionReducer
  • TransactionsMapReduceJob