写好Hive 程序的五个提示

写好Hive 程序的五个提示 – 淘宝共享数据平台 tbdata.org.

使用Hive可以高效而又快速地编写复杂的MapReduce查询逻辑。但是某些情况下,因为不熟悉数据特性,或没有遵循Hive的优化约定,Hive计算任务会变得非常低效,甚至无法得到结果。一个”好”的Hive程序仍然需要对Hive运行机制有深入的了解。

有一些大家比较熟悉的优化约定包括:Join中需要将大表写在靠右的位置;尽量使用UDF而不是transfrom……诸如此类。下面讨论5个性能和逻辑相关的问题,帮助你写出更好的Hive程序。

全排序

Hive的排序关键字是SORT BY,它有意区别于传统数据库的ORDER BY也是为了强调两者的区别–SORT BY只能在单机范围内排序。考虑以下表定义:

CREATE TABLE if not exists t_order(  id int, -- 订单编号  sale_id int, -- 销售ID  customer_id int, -- 客户ID  product _id int, -- 产品ID  amount int -- 数量  ) PARTITIONED BY (ds STRING);

在表中查询所有销售记录,并按照销售ID和数量排序:

set mapred.reduce.tasks=2;  Select sale_id, amount from t_order  Sort by sale_id, amount;

这一查询可能得到非期望的排序。指定的2reducer分发到的数据可能是(各自排序):

Reducer1

Sale_id | amount  0 | 100  1 | 30  1 | 50  2 | 20

Reducer2

Sale_id | amount  0 | 110  0 | 120  3 | 50  4 | 20

因为上述查询没有reduce keyhive会生成随机数作为reduce key。这样的话输入记录也随机地被分发到不同reducer机器上去了。为了保证reducer之间没有重复的sale_id记录,可以使用DISTRIBUTE BY关键字指定分发keysale_id。改造后的HQL如下:

set mapred.reduce.tasks=2;  Select sale_id, amount from t_order  Distribute by sale_id  Sort by sale_id, amount;

这样能够保证查询的销售记录集合中,销售ID对应的数量是正确排序的,但是销售ID不能正确排序,原因是hive使用hadoop默认的HashPartitioner分发数据。

这就涉及到一个全排序的问题。解决的办法无外乎两种:

1.) 不分发数据,使用单个reducer

set mapred.reduce.tasks=1;

这一方法的缺陷在于reduce端成为了性能瓶颈,而且在数据量大的情况下一般都无法得到结果。但是实践中这仍然是最常用的方法,原因是通常排序的查询是为了得到排名靠前的若干结果,因此可以用limit子句大大减少数据量。使用limit n后,传输到reduce端(单机)的数据记录数就减少到n* map个数)。

2.) 修改Partitioner,这种方法可以做到全排序。这里可以使用Hadoop自带的TotalOrderPartitioner(来自于Yahoo!TeraSort项目),这是一个为了支持跨reducer分发有序数据开发的Partitioner,它需要一个SequenceFile格式的文件指定分发的数据区间。如果我们已经生成了这一文件(存储在/tmp/range_key_list,分成100reducer),可以将上述查询改写为

set mapred.reduce.tasks=100;  set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;  set total.order.partitioner.path=/tmp/ range_key_list;  Select sale_id, amount from t_order  Cluster by sale_id  Sort by amount;

有很多种方法生成这一区间文件(例如hadoop自带的o.a.h.mapreduce.lib.partition.InputSampler工具)。这里介绍用Hive生成的方法,例如有一个按id有序的t_sale表:

CREATE TABLE if not exists t_sale (  id int,  name string,  loc string  );

则生成按sale_id分发的区间文件的方法是:

create external table range_keys(sale_id int)  row format serde  'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe'  stored as  inputformat  'org.apache.hadoop.mapred.TextInputFormat'  outputformat  'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat'  location '/tmp/range_key_list';     insert overwrite table range_keys  select distinct sale_id  from source t_sale sampletable(BUCKET 100 OUT OF 100 ON rand()) s  sort by sale_id;

生成的文件(/tmp/range_key_list目录下)可以让TotalOrderPartitionersale_id有序地分发reduce处理的数据。区间文件需要考虑的主要问题是数据分发的均衡性,这有赖于对数据深入的理解。

怎样做笛卡尔积?

Hive设定为严格模式(hive.mapred.mode=strict)时,不允许在HQL语句中出现笛卡尔积,这实际说明了Hive对笛卡尔积支持较弱。因为找不到Join keyHive只能使用1reducer来完成笛卡尔积。

当然也可以用上面说的limit的办法来减少某个表参与join的数据量,但对于需要笛卡尔积语义的需求来说,经常是一个大表和一个小表的Join操作,结果仍然很大(以至于无法用单机处理),这时MapJoin才是最好的解决办法。

MapJoin,顾名思义,会在Map端完成Join操作。这需要将Join操作的一个或多个表完全读入内存。

MapJoin的用法是在查询/子查询的SELECT关键字后面添加/*+ MAPJOIN(tablelist) */提示优化器转化为MapJoin(目前Hive的优化器不能自动优化MapJoin)。其中tablelist可以是一个表,或以逗号连接的表的列表。tablelist中的表将会读入内存,应该将小表写在这里。

PS:有用户说MapJoin在子查询中可能出现未知BUG。在大表和小表做笛卡尔积时,规避笛卡尔积的方法是,给Join添加一个Join key,原理很简单:将小表扩充一列join key,并将小表的条目复制数倍,join key各不相同;将大表扩充一列join key为随机数。

怎样写exist in子句?

Hive不支持where子句中的子查询,SQL常用的exist in子句需要改写。这一改写相对简单。考虑以下SQL查询语句:

SELECT a.key, a.value  FROM a  WHERE a.key in  (SELECT b.key  FROM B);

可以改写为

SELECT a.key, a.value  FROM a LEFT OUTER JOIN b ON (a.key = b.key)  WHERE b.key <> NULL;

一个更高效的实现是利用left semi join改写为:

SELECT a.key, a.val  FROM a LEFT SEMI JOIN b on (a.key = b.key);

left semi join0.5.0以上版本的特性。

Hive怎样决定reducer个数?

Hadoop MapReduce程序中,reducer个数的设定极大影响执行效率,这使得Hive怎样决定reducer个数成为一个关键问题。遗憾的是Hive的估计机制很弱,不指定reducer个数的情况下,Hive会猜测确定一个reducer个数,基于以下两个设定:

1. hive.exec.reducers.bytes.per.reducer(默认为1000^3

2. hive.exec.reducers.max(默认为999

计算reducer数的公式很简单:

N=min(参数2,总输入数据量/参数1)

通常情况下,有必要手动指定reducer个数。考虑到map阶段的输出数据量通常会比输入有大幅减少,因此即使不设定reducer个数,重设参数2还是必要的。依据Hadoop的经验,可以将参数2设定为0.95*(集群中TaskTracker个数)

合并MapReduce操作

Multi-group by

Multi-group byHive的一个非常好的特性,它使得Hive中利用中间结果变得非常方便。例如,

FROM (SELECT a.status, b.school, b.gender  FROM status_updates a JOIN profiles b  ON (a.userid = b.userid and  a.ds='2009-03-20' )  ) subq1  INSERT OVERWRITE TABLE gender_summary  PARTITION(ds='2009-03-20')  SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender  INSERT OVERWRITE TABLE school_summary  PARTITION(ds='2009-03-20')  SELECT subq1.school, COUNT(1) GROUP BY subq1.school

上述查询语句使用了Multi-group by特性连续group by2次数据,使用不同的group by key。这一特性可以减少一次MapReduce操作。

Multi-distinct

Multi-distinct是淘宝开发的另一个multi-xxx特性,使用Multi-distinct可以在同一查询/子查询中使用多个distinct,这同样减少了多次MapReduce操作。

Hadoop状态页面的Browse the filesystem链接无效的问题

From: Hadoop状态页面的Browse the filesystem链接无效的问题 – 姚明技术博文—创新才是硬道理 – ITeye技术网站.

Note:简而言之,在本地host文件中添加所有hadoop cluster中节点的IP/域名,即可解决该问题。

NameNode ‘192.168.1.164:9000’

Started: Tue Jul 06 14:37:10 CST 2010
Version: 0.20.2, r911707
Compiled: Fri Feb 19 08:07:34 UTC 2010 by chrisdo
Upgrades: There are no upgrades in progress.

Browse the filesystem 
Namenode Logs


Cluster Summary

4 files and directories, 1 blocks = 5 total. Heap Size is 16.12 MB / 888.94 MB (1%)

Configured Capacity : 1.59 TB
DFS Used : 72 KB
Non DFS Used : 120.87 GB
DFS Remaining : 1.47 TB
DFS Used% : 0 %
DFS Remaining% : 92.57 %
Live Nodes : 2
Dead Nodes : 0

NameNode Storage:

Storage Directory Type State
/opt/hadoop/data/dfs.name.dir IMAGE_AND_EDITS Active

Hadoop , 2010.

  以上是我们安装完成,并且正常运行后的HDFS状态页面(访问地址:http://210.66.44.88:50070/dfshealth.jsp),其中的Browse the filesystem 是查看文件系统的入口,但是有可能会出现无法访问的问题,我就遇到过,上网查了很久的解决办法都无果,后来我通过firebug拦截请求发现,Browse the filesystem这个链接的页面会跳转到另外一个页面,而这个页面的地址是http://192.168.1.164:50075/browseDirectory.jsp?namenodeInfoPort=50070&dir=%2F(192.168.1.164是服务器的内网地址,在masters和slaves文件中配置的是局域网IP),但是需要通过外网才能访问(类似于http://210.66.44.88:50075/browseDirectory.jsp?namenodeInfoPort=50070&dir=%2F才行),这就是问题所在。

  我们来看看nn_browsedfscontent.jsp的源代码:

  1. package org.apache.hadoop.hdfs.server.namenode;  
  2.   
  3. import java.io.IOException;  
  4. import java.net.InetAddress;  
  5. import java.net.InetSocketAddress;  
  6. import java.net.URLEncoder;  
  7. import java.util.Vector;  
  8. import javax.servlet.ServletConfig;  
  9. import javax.servlet.ServletContext;  
  10. import javax.servlet.ServletException;  
  11. import javax.servlet.http.HttpServletRequest;  
  12. import javax.servlet.http.HttpServletResponse;  
  13. import javax.servlet.http.HttpSession;  
  14. import javax.servlet.jsp.JspFactory;  
  15. import javax.servlet.jsp.JspWriter;  
  16. import javax.servlet.jsp.PageContext;  
  17. import javax.servlet.jsp.SkipPageException;  
  18. import org.apache.hadoop.util.ServletUtil;  
  19. import org.apache.jasper.runtime.HttpJspBase;  
  20. import org.apache.jasper.runtime.JspSourceDependent;  
  21. import org.apache.jasper.runtime.ResourceInjector;  
  22.   
  23. public final class nn_005fbrowsedfscontent_jsp extends HttpJspBase  
  24.   implements JspSourceDependent  
  25. {  
  26.   private static final JspFactory _jspxFactory = JspFactory.getDefaultFactory();  
  27.   private static Vector _jspx_dependants;  
  28.   private ResourceInjector _jspx_resourceInjector;  
  29.   
  30.   //此处是将请求随机转发到一个DataNode节点上  
  31.   public void redirectToRandomDataNode(NameNode nn, HttpServletResponse resp)  
  32.     throws IOException  
  33.   {  
  34.     String nodeToRedirect;  
  35.     int redirectPort;  
  36.     FSNamesystem fsn = nn.getNamesystem();  
  37.     String datanode = fsn.randomDataNode();  
  38.   
  39.     if (datanode != null) {  
  40.       redirectPort = Integer.parseInt(datanode.substring(datanode.indexOf(58) + 1));  
  41.       nodeToRedirect = datanode.substring(0, datanode.indexOf(58));  
  42.     }  
  43.     else {  
  44.       nodeToRedirect = nn.getHttpAddress().getHostName();  
  45.       redirectPort = nn.getHttpAddress().getPort();  
  46.     }  
  47.     // 此处是得到服务器的名称(hostname)  
  48.     String fqdn = InetAddress.getByName(nodeToRedirect).getCanonicalHostName();  
  49.     String redirectLocation = http://&#8221; + fqdn + “:” + redirectPort + “/browseDirectory.jsp?namenodeInfoPort=” + nn.getHttpAddress().getPort() + “&dir=” + URLEncoder.encode(“/”“UTF-8”);  
  50.   
  51.     resp.sendRedirect(redirectLocation);  
  52.   }  
  53.   
  54.   public Object getDependants()  
  55.   {  
  56.     return _jspx_dependants;  
  57.   }  
  58.   
  59.   public void _jspService(HttpServletRequest request, HttpServletResponse response)  
  60.     throws IOException, ServletException  
  61.   {  
  62.     PageContext pageContext = null;  
  63.     HttpSession session = null;  
  64.     ServletContext application = null;  
  65.     ServletConfig config = null;  
  66.     JspWriter out = null;  
  67.     Object page = this;  
  68.     JspWriter _jspx_out = null;  
  69.     PageContext _jspx_page_context = null;  
  70.     try  
  71.     {  
  72.       response.setContentType(“text/html; charset=UTF-8”);  
  73.       pageContext = _jspxFactory.getPageContext(this, request, response, nulltrue8192true);  
  74.   
  75.       _jspx_page_context = pageContext;  
  76.       application = pageContext.getServletContext();  
  77.       config = pageContext.getServletConfig();  
  78.       session = pageContext.getSession();  
  79.       out = pageContext.getOut();  
  80.       _jspx_out = out;  
  81.       this._jspx_resourceInjector = ((ResourceInjector)application.getAttribute(“com.sun.appserv.jsp.resource.injector”));  
  82.   
  83.       out.write(10);  
  84.       out.write(“\n\n<html>\n\n<title></title>\n\n<body>\n”);  
  85.   
  86.       NameNode nn = (NameNode)application.getAttribute(“name.node”);  
  87.       redirectToRandomDataNode(nn, response);  
  88.   
  89.       out.write(“\n<hr>\n\n<h2>Local logs</h2>\n<a href=\”/logs/\”>Log</a> directory\n\n”);  
  90.   
  91.       out.println(ServletUtil.htmlFooter());  
  92.   
  93.       out.write(10);  
  94.     } catch (Throwable t) {  
  95.       if (!(t instanceof SkipPageException)) {  
  96.         out = _jspx_out;  
  97.         if ((out != null) && (out.getBufferSize() != 0))  
  98.           out.clearBuffer();  
  99.         if (_jspx_page_context != null) _jspx_page_context.handlePageException(t);  
  100.       }  
  101.     } finally {  
  102.       _jspxFactory.releasePageContext(_jspx_page_context);  
  103.     }  
  104.   }  
  105. }  

  从代码可以看出,当我们点击Browse the filesystem 时后台会将请求随机转发到一台DataNode节点,使用的是slaves文件中配置的服务器列表所反解的域名或主机名,而通过局域网IP未能反解出域名和主机名,所以用的是IP,这样就出问题了,解决办法有两个:视redirectToRandomDataNode方法内生成的URL而定,如果反转域名是主机名的话,你只需要修改本地HOSTS映射就可以了(推荐使用Windows Hosts Editor,软件地址:http://yymmiinngg.iteye.com/blog/360779);如果反转出的域名是主机局域网IP的话,那就需要配置slaves和masters使用域名或外网IP