Hadoop MapReduce Connector to Teradata EDW | Teradata Developer Exchange.
Hadoop MapReduce programmers often find that it is more convenient and productive to have direct access from their MapReduce programs to data stored in a RDBMS such as Teradata Enterprise Data Warehouse (EDW) because:
- There is no benefit to exporting relational data into a flat file.
- There is no need to upload the file into the Hadoop Distributed File System (HDFS).
- There is no need to change and rerun the scripts/commands in the first two steps when they need to use different tables/columns in their MapReduce programs.
1. Introduction
Connectivity between a Java or other language program to JDBC and ODBC is well understood by most programmers. But when dealing with Hadoop and MPP databases, two factors outside the domain of connectors become crucial: scale and balance. Hadoop and MPP databases often run into the dozens or hundreds of server nodes, consuming 10s or hundreds of terabytes per execution. For the shared-nothing architecture to perform at maximum throughput, the processing workload and data must be partitioned across execution threads. Otherwise, one server will have an inordinate amount of work compared to others, causing the total elapsed time to be slower. Consequently, a Hadoop connector is needed to support parallel efficiency.
In this document we first describe how MapReduce programs (a.k.a. mappers) can have parallel access to the Teradata EDW data using the TeradataDBInputFormat
approach discussed in Section 2. In Section 3, we provide a complete example with accompanying source code which can be directly used to access the EDW without any changes required in the data warehouse or Hadoop. The TeradataDBInputFormat
class can be directly used by programmers without any changes for many applications. Overall, readers can get the following out of this article:
- Architecturally how MapReduce programs get direct and parallel access to the Teradata EDW.
- How
TeradataDBInputFormat
class can be used without changes to Hadoop or the EDW. Step-by-step installation and deployment is included in the example. - How programmers can extend the
TeradataDBInputFormat
approach for specific application needs.
2. The TeradataDBInputFormat approach
A common approach for a MapReduce program to access relational data is to first use the DBMS export utility to pass SQL answer sets to a local file and then load the local file to Hadoop.
However, there are several use cases where the export and load into HDFS is inconvenient for the programmer. Recognizing the need to access relational data in MapReduce programs, the Apache.org open source project for Hadoop provides the DBInputFormat
class library. The DBInputFormat
and DBOutputFormat
Java class libraries allow MapReduce programs to send SQL queries through the standard JDBC interface to the EDW in parallel. Teradata provides a version of DBInputFormat
[3] that will be part of the Cloudera Distribution for Hadoop. Note that Cloudera has a good explanation of DBInputFormat
on their website. The TeradataDBInputFormat
approach is inspired by but not based on the Apache DBInputFormat
approach.
DBInputFormat
and DBOutputFormat
along with their Teradata versions are good interfaces for ad hoc medium or small volume data transfers. They make it easy to copy tables from the EDW into HDFS and vice versa. One good use of these interfaces is when a mapper program needs to do table look-ups but has no need to persist the data fetched. These interfaces are not efficient for high volume data transfers where bulk data movement tools like Teradata Parallel Transporter are more appropriate. In many cases queries and bulk data movement is better optimized inside the database itself. While it’s an oversimplification, think of the input and output format class libraries similar to workloads processed by BTEQ. They are very flexible and useful, but do not support every workload.
2.1 DBInputFormat with JDBC
DBInputFormat
uses JDBC to connect to relational databases, typically MySQL or Oracle. The basic idea is that a MapReduce programmer provides a SQL query via theDBInputFormat
class. The DBInputFormat
class associates a modified SQL query with each mapper started by Hadoop. Then each mapper sends a query through a standard JDBC driver to the DBMS and gets back a portion of the query results and works on the results in parallel. The DBInputFormat
approach is correct because the union of all queries sent by all mappers is equivalent to the original SQL query.
The DBInputFormat
approach provides two interfaces for a MapReduce program to directly access data from a DBMS. The underlying implementation is the same for the two interfaces. In the first interface, a MapReduce program provides a table name T, a list P of column names to be retrieved, optional filter conditions C on the table and column(s) O to be used in the Order-By clause, in addition to user name, password and DBMS URL values. The DBInputFormat
implementation first generates a “count” query:
1
|
SELECT count (*) from T where C |
and sends it to the DBMS to get the number of rows (R) in the query result. At runtime, the DBInputFormat
implementation knows the number of mappers (M) started by Hadoop and associates the following query Q with each mapper. Each mapper will connect to the DBMS and send Q over JDBC connection and get back the results.
1
2
3
|
SELECT P FROM T WHERE C ORDER BY O LIMIT L (Q) OFFSET X |
The above Query Q asks the DBMS to evaluate the query.
1
|
SELECT P FROM T WHERE C ORDER BY O, |
but only return L number of rows starting from the offset X. In total M queries are sent to the DBMS by the M mappers and they are almost identical except that the values of L and X are different. For the ith mapper (where 1 ≤ i ≤ M − 1) which is not the last mapper, and
. For the last mapper,
and
.
Basically all mappers except the last one will receive an average number of rows and the last mapper will get more rows than other mappers when the total number of rows in the result cannot be evenly divided by the number of mappers.
In the second interface of the DBInputFormat
class, a mapper program can provide an arbitrary SQL select query SQ (which could involve multiple tables) whose results are the input to the mappers. The mapper has to provide a count query QC which returns an integer which is the number of rows returned by the query SQ. The DBInputFormat
class sends the query QC to the DBMS to get the number of rows (R), and the rest of the processing is the same as described for the first interface.
While this DBInputFormat
approach clearly streamlines the process of accessing relational data from products like MySQL, the performance cannot scale. There are several performance issues with the DBInputFormat
approach. In both interfaces, each mapper sends essentially the same SQL query to the DBMS but with different LIMIT and OFFSET clauses to get a subset of the relational data. Sorting is required at the DBMS side for every query sent by a mapper because of the ORDER-BY clause introduced into each query, even if the program itself does not need sorted input. This is how parallel processing of relational data by mappers is achieved in the DBInputFormat
class. Furthermore, the DBMS has to execute as many queries as the number of mappers in the Hadoop system which is not efficient — especially when the number of mappers is large. The above performance issues are especially serious for a parallel DBMS such as Teradata EDW which tends to have high number of concurrent queries and larger datasets. Also the required ordering/sorting is an expensive operation in parallel DBMS because the rows in a table are not stored on a single node and sorting requires row redistribution across nodes.
DBInputFormat
cannot be used to access Teradata EDW since LIMIT and OFFSET clauses are not in ANSI Standard SQL and are not supported by Teradata EDW. However, a newer Apache Hadoop class named DataDrivenDBInputFormat
derived from DBInputFormat
can read input data from a Teradata EDW table. DataDrivenDBInputFormat
operates like DBInputFormat
. The only difference is that instead of using non-standard LIMIT and OFFSET to demarcate splits, DataDrivenDBInputFormat
generates WHERE clauses which separate the data into roughly equivalent shards. DataDrivenDBInputFormat
has all of the same DBInputFormat
performance issues we have discussed above.
Figure 1 – DBInputFormat
2.2 TeradataDBInputFormat
The Teradata connector for Hadoop — TeradataDBInputFormat
— sends the SQL query Q provided by a MapReduce program only once to Teradata EDW. Q is executed only once and the results are stored in a Partitioned Primary Index table (PPI) T [4]. Then each mapper from Hadoop sends a new query Qi which just asks for the ith partition on every AMP. Depending on the number of mappers, the complexity of the SQL query provided by a MapReduce program and the amount of data involved in the SQL query, the performance of the TeradataDBInputFormat
approach can obviously be orders of magnitudes better than the DBInputFormat
approach, as we have seen in our internaltesting.
Now we describe the architecture behind TeradataDBInputFormat
. First, the TeradataDBInputFormat
class sends the query P to the EDW based on the query Q provided by the mapper program.
1
2
3
|
CREATE TABLE T AS (Q) WITH DATA PRIMARY INDEX ( c1 ) (P) PARTITION BY (c2 MOD M) + 1 |
The above query asks the EDW to evaluate Q and store the results – table layout and data — in a new PPI table T. The hash value of the primary index column c1 of each row in the query results determines which AMP should store that row. Then the value of the partition-by expression determines the physical partition (location) of each row on a particular AMP. This is done using modulo M which means divide by m and take the remainder.
All rows on the same AMP with the same partition-by value are physically stored together and can be directly and efficiently located by the Teradata optimizer. There are different ways to automatically choose the primary index column and partition-by expression.
After the query Q is evaluated and the table T is created, each AMP has M partitions numbered from 1 to M (M is the number of mappers started in Hadoop). Then each mapper sends the following query Qi (1 ≤ i ≤ M) to the EDW,
1
|
SELECT * FROM T WHERE PARTITION = i (Qi) |
Teradata EDW will directly locate all rows in the ith partition on every AMP in parallel and return them to the mapper. This operation is done in parallel for all mappers. After all mappers retrieve their data, the table T is automatically deleted. Notice that if the original SQL query just selects data from a base table which is a PPI table, then we do not need to create another PPI table since we can directly use the existing partitions to partition the data each mapper should receive.
As mentioned in the beginning of Section 2.2, if the number of mappers is large and the complexity of the SQL query provided by a MapReduce program is high (for example involving multi-table join and grouping), the performance of the TeradataDBInputFormat
approach can obviously be orders of magnitudes better than theDBInputFormat
approach. This is because the DBMS system has to execute the same user SQL query as many times as the number of mappers, sort the results and send back only a portion of the final results to each mapper. However in the TeradataDBInputFormat
approach, the complex SQL query is executed only once and the results are stored in a PPI table’s multiple partitions each of which is sent to a different mapper. As mentioned before, the discussed TeradataDBInputFormat
does not require any change to the Teradata EDW codebase. We have investigated a few areas where we can significantly improve the performance of the TeradataDBInputFormat
approach with new enhancements to Teradata EDW, which we will probably discuss in a separate article.
Figure 2 – TeradataDBInputFormat
2.3 Known Issues
Notice that the data retrieved by a MapReduce program via the TeradataDBInputFormat
approach or the DBInputFormat
approach are not stored in Hadoop after the MapReduce program is finished, unless the MapReduce program intentionally does so. Therefore if some Teradata EDW data are frequently used by many MapReduce programs, it will be more efficient to copy these data and materialize them in HDFS. One approach to store a Teradata table permanently in Hadoop DFS is to use Cloudera’s Sqoop [5] which we have integrated TeradataDBInputFormat
into.
One potential issue in the current implementation provided in Appendix is that we could have column name conflict. For example, assume the business query in the DDL P in Section 2.2 is “select * from T1, T2 where T1.a=T2.b
” and that T1 and T2 have columns of the same names. Currently Teradata DBMS will complain about column name conflict if we simply create a table to store the above query result. Either the EDW can be enhanced to automatically resolve the column name conflict or theTeradataDBInputFormat
class can be enhanced to automatically resolve the column name conflict so that users do not need to change the query “select * from T1, T2 where T1.a=T2.b
” to explicitly uniquely name each column in the results which is a workaround solution for now.
2.4 DBOutputFormat
The DBOutputFormat
provided by Cloudera writes to the database by generating a set of INSERT statements in each reducer. The current DBOutputFormat
approach while multiple Reducers sending batches of INSERT statements to DBMS can work with the Teradata EDW without modification. For more detail, please refer to [3].
3. Example using TeradataDBInputFormat
In the section, we first describe the requirements for running our TeradataDBInputFormat
class, and then an example is used to explain how to use theTeradataDBInputFormat
approach.
3.1 Requirements
The TeradataDBInputFormat
class is implemented in Java. The enclosed package can be compiled and run in an environment with the following features:
- Sun JDK 1.6, update 8 or later versions
- Hadoop version 0.20.2 +228 or later versions
- Teradata version V2R5 or later release
Note that the Hadoop DFS and the Teradata DBMS may or may not be installed on the same hardware platform.
You should start by downloading the TeradataDBInputFormat
connector. The JAR file (i.e., TeradataDBInputFormat.jar
) should be placed into the $HADOOP_HOME/lib/
directory on your Hadoop TaskTracker machines. It is a good idea to also include it on any server you launch Hadoop jobs from.
3.2 Sample code using TeradataDBInputFormat class to access Teradata EDW data
Table Schema
Assume a MapReduce program needs to access some transaction data stored in the following table table_1
, defined by the following CREATE TABLE
statement:
1
2
3
4
5
6
|
CREATE TABLE table_1 ( transaction_id int , product_id int , sale_date date , description varchar (64) ) PRIMARY INDEX (transaction_id). |
To access the transaction and product information, a MapReduce program can simply provide a SQL query, like “select transaction_id, product_id from table_1 where transaction_id > 1000"
.
Configuring the MapReduce job
The following code shows how a MapReduce job using TeradataDBInputFormat
class is configured and run.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
public class TransactionsMapReduceJob extends Configured implements Tool { private String query = "" ; private String output_file_path = "" ; /** * Constructor */ public TransactionsMapReduceJob( final String query_, final String output_) { query = query_; output_file_path = output_; } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job myJob = new Job(conf, conf.get( "mapreduce.job.name" )); // the following statement is very important!!! ///1. Set the class as the record reader myJob.getConfiguration().setClass( "record.reader.class" , TransactionTDBWritable. class , TeradataDBWritable. class ); ///2. Store the query TeradataDBInputFormat.setInput(myJob.getConfiguration(), query, TransactionTDBWritable. class ); ///3. Specify the input format class myJob.setInputFormatClass(TransactionDBInputFormat. class ); myJob.setJarByClass(TransactionsMapReduceJob. class ); myJob.setOutputKeyClass(LongWritable. class ); myJob.setOutputValueClass(LongWritable. class ); myJob.setMapperClass(TransactionMapper. class ); myJob.setReducerClass(TransactionReducer. class ); myJob.setOutputFormatClass(TextOutputFormat. class ); FileOutputFormat.setOutputPath(myJob, new Path(output_file_path)); int ret = myJob.waitForCompletion( true ) ? 0 : 1 ; // clean up ... TeradataDBInputFormat.cleanDB(myJob.getConfiguration()); return ret; } public static void main(String[] args) throws Exception { int res = 0 ; try { int args_num = args.length; // Assumption 1: The second to last parameter: output file path String output_file_path = args[args_num- 2 ]; // Assumption 2: The last parameter: the query String query = args[args_num- 1 ]; Tool mapred_tool = new TransactionsMapReduceJob(query, output_file_path); res = ToolRunner.run( new Configuration(), mapred_tool, args); } catch (Exception e) { e.printStackTrace(); } finally { System.exit(res); } } } |
Defining Java Class to represent DBMS data to be used in MapReduce program
The following class TDBWritable
is defined to describe data from Teradata EDW to be used by the MapReduce program.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
public class TransactionTDBWritable implements TDBWritable, Writable { private long transaction_id = 0 ; private long product_id = 0 ; // Static code: the programmer should explicitly declare the attributes (name and type) related to the query. static private List<String> attribute_names = new Vector<String>(); static private List<String> attribute_types = new Vector<String>(); static { // The corresponding query: /// SELECT transaction_id, product_id FROM ... WHERE ... //1. for the first item attribute_names.add( "transaction_id" ); attribute_types.add( "int" ); //2. for the second item attribute_names.add( "product_id" ); attribute_types.add( "int" ); } /** * Default constructor */ public TransactionTDBWritable(){ super ();} @Override public void readFields(ResultSet resultSet) throws SQLException { transaction_id = resultSet.getLong( "transaction_id" ); product_id = resultSet.getLong( "product_id" ); } … @Override public void readFields(DataInput in) throws IOException { transaction_id = in.readLong(); product_id = in.readLong(); } @Override public void write(DataOutput out) throws IOException { out.writeLong(transaction_id); out.writeLong(product_id); } @Override public void addAttrbute(String name, String type) { attribute_names.add(name); attribute_types.add(type); } @Override public List<String> getAttributeNameList() { return attribute_names; } @Override public List<String> getAttributeValueList() { return attribute_types; } } |
Note that the TeradataDBInputFormat
can be enhanced such that the above class does not need to be manually created since it can be automatically generated by looking at the resulting query’s schema.
A dummy class inheriting from TeradataDBInputFormat<T>
is needed:
1
2
3
4
5
|
public class TransactionDBInputFormat extends TeradataDBInputFormat<TransactionTDBWritable> { //NEED DO NOTHING! //Transfer the type information of “TransactionTDBWritable” down to TransactionDBInputFormat’s constructor. } |
Using data in a Mapper
The TeradataDBInputFormat
will read from the database and populate the retrieved data to the fields in the TransactionTDBWritable
class. A mapper then receives an instance of the TransactionTDBWritable
implementation as its input value and can use the retrieved DBMS data as desired. The following code simply shows how a mapper has direct access to DBMS data passed to it as an instance of the TransactionTDBWritable
class.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public class TransactionMapper extends Mapper<LongWritable, TransactionTDBWritable, LongWritable, LongWritable> { public TransactionMapper(){} protected void map(LongWritable k1, TransactionTDBWritable v1, Context context) { try { context.write( new LongWritable(v1.getTransactionID()), new LongWritable(v1.getProductID())); } catch (IOException e) { ... } catch (InterruptedException e) { ... } } } |
Prepare the configuration properties
To enable the access to the Teradata DB, the mapper program also needs to know the information of the database connection, like DB URL, user account, password and so on. This information can be stored in a property XML file as shown in the following file TeraProps.xml
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
<? xmlversion = "1.0" ?> <? xml-stylesheettype = "text/xsl" href = "configuration.xsl" ?> < configuration > <!-- Parameters about the Teradata RDBMS --> < property > < name >teradata.db.url</ name > < value >127.1.2.1</ value > < description > The URL of the Teradata DB the program is going to interact with </ description > </ property > < property > < name >teradata.db.account</ name > < value >user1</ value > < description >The account name</ description > </ property > < property > < name >teradata.db.password</ name > < value >b123</ value > < description >Password</ description > </ property > … </ configuration > |
Deploy and run
The whole source package should be first compiled and compressed as a jar file, for example, MyMapReduceJob.jar
, before it can be run on the Hadoop DFS. Assume that we put the property XML file under the same directory as the jar file. Then, we may start the MapReduce job with the following command:
HADOOP_HOME/bin/hadoop jar MyMapReduceJob.jar TransactionsMapReduceJob -Dmapred.map.tasks=32 -conf TeraProps.xml output.tbl "select transaction_id, product_id from table_1 where transaction_id > 1000" |
where:
HADOOP_HOME
stands for the Hadoop installation directory-Dmapred.map.tasks=32
sets the number of map tasks to 32-conf TeraProps.xml
tells where to find the parameters about the Teradata DBMS- the file
output.tbl
contains the job’s output, and - “
select transaction_id, product_id from table_1 where transaction_id > 1000
” is the user query.
4. Conclusion
MapReduce related research and development continues to be active and attract interests from both industry and academia. MapReduce is particularly interesting to parallel DBMS vendors since both MapReduce and Teradata Data Warehouses use clusters of nodes and shared-nothing scale-out technology for large scale data analysis. The TeradataDBInputFormat
approach in this article show how MapReduce programs can efficiently and directly have parallel access to Teradata EDW data without external steps of exporting and loading data from Teradata EDW to Hadoop.
5. References
[1] http://www.cloudera.com/blog/2009/03/database-access-with-hadoop/
[3] DBInputFormat
. http://www.cloudera.com/blog/2009/03/databaseaccess-with-hadoop
[4] Teradata Online Documentation http://www.info.teradata.com
[5] Cloudera Scoop http://www.cloudera.com/blog/2009/06/introducing-sqoop/
6. Appendix
The TeradataDBInputFormat
class is implemented in Java, and the related source code is included in the attached zip file. The source code is composed of three parts:
The core implementation of the TeradataDBInputFormat
classes is built based on the PPI strategy in Section 2.2. Five classes are defined:
TeradataDBInputFormat
TeradataDBSplit
TeradataDBRecordReader
TeradataDBWritable
DummyDBWritable
The generation of the internal intermediate table in Teradata DBMS according to the user query.
IntermediateTableQueryGenerator
IntermediateTableGenerator
An example to show how to use the TeradataDBInputFormat
class.
TransactionTDBWritable
TransactionDBInputFormat
TransactionMapper
TransactionReducer
TransactionsMapReduceJob