hive 压缩和优化

压缩

hadoop 执行阶段可以配置一些数据压缩策略以减少数据的传输。

</archives/39578da0.html#hadoop-%E6%95%B0%E6%8D%AE%E5%8E%8B%E7%BC%A9>

在hive执行的时候也可以指定启用压缩和压缩的相关的配置

开启数据压缩

1
2
set hive.exec.compress.intermediate=true;

hive 传输中的数据压缩开启

1
set mapreduce.map.output.compress=true;

map 阶段的输出的数据压缩开启

1
2
set mapreduce.map.output.compress.codec=
org.apache.hadoop.io.compress.SnappyCodec;

指定map阶段的数据压缩的实现方式

1
set hive.exec.compress.output=true;

hive 最终输出的数据的压缩开启

1
set mapreduce.output.fileoutputformat.compress=true;

开启mr 程序的输出的数据压缩

1
2
set mapreduce.output.fileoutputformat.compress.codec =
org.apache.hadoop.io.compress.SnappyCodec;

设置输出压缩的方式

1
set mapreduce.output.fileoutputformat.compress.type=BLOCK;

设置最终压缩类型为块压缩

存储格式

hive 支持多种存储格式 比如 TEXTFILE 、SEQUENCEFILE、ORC、PARQUET。

在数据存储的时候根据存储的方式不同,还可分为行存储和列存储。

假设 数据内容为

1
2
3
4
5
6
7
8
1  小明  

2 小红

3 小绿

4 小黑

如果是行存储,那么在文件中的排列方式将是

1
1  小明  2  小红 3  小绿 4  小黑

行存储的时候,获取一行的数据比较容易,因为一行的数据都是相邻的。

同理,列存储,就是一列的数据进行挨着存储

1
1 2 3 4 小明 小红 小绿 小黑

这种存储方式,在只查询对应的列信息的时候比较容易。

那么对于hive的存储格式中,

TEXTFILE和SEQUENCEFILE 是基于行存储。

ORC和PARQUET是列存储。

指定使用那种存储格式,可以在创建表的时候使用 stored as ocr 或 stored as parquet 即可

TEXTFILE

最简单的数据格式,默认格式,数据不压缩,数据解析的开销大。

OCR

ocr文件的wiki连接 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC

OCR 的文件结构

每个文件由多个stripe 组成。一个stripe 一般为一个hdfs的一个块大小(默认128M);针对这些行的数据做一个列的存储。
一个stripe 由三部分组成 IndexData,RowData,Stripe Footer。
IndexData 索引数据 存了记录某行的各字段在Row Data中的offset。
Row Data 是具体的数据。
Stripe Footer:存的是各个Stream的类型,长度等信息。

https://blog.csdn.net/maizi1045/article/details/79667857

Parquet

wiki链接 https://cwiki.apache.org/confluence/display/Hive/Parquet

Parquet文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的数据和元数据,因此Parquet格式文件是自解析的。

  • 行组(Row Group):每一个行组包含一定的行数,在一个HDFS文件中至少存储一个行组,类似于orc的stripe的概念。
  • 列块(Column Chunk):在一个行组中每一列保存在一个列块中,行组中的所有列连续的存储在这个行组文件中。一个列块中的值都是相同类型的,不同的列块可能使用不同的算法进行压缩。
  • 页(Page):每一个列块划分为多个页,一个页是最小的编码的单位,在同一个列块的不同页可能使用不同的编码方式。

https://blog.csdn.net/javastart/article/details/124528452

使用存储和压缩处理文件输出格式

创建一个表使用ocr的输出格式

ocr 是用的比较多的一种输出格式

在创建表的tblproperties 信息中,可以添加一些ocr的相关参数信息。

创建表使用 ocr 格式和zlib压缩

1
2
3
4
5
6
7
8
9
create table info(id int ,name string)
row format delimited fields terminated by '\t'
stored as orc
tblproperties("orc.compress"="ZLIB");


//插入数据
insert into info(id,name) values(1,'erererer');

优化

查询执行计划

使用explain 可以类似于mysql 那样看到具体的执行计划信息。

EXPLAIN [EXTENDED | DEPENDENCY | AUTHORIZATION] query

为了能够得到更好的执行效率和执行效果,需要先通过explain 看下执行计划。此执行计划是hive已经优化了的计划。

对于hive的执行分为2种 ,一种走MR 程序的,一种不走MR程序的,不走MR程序的执行性能远远大于走MR程序的,通过explain 可以看出具体的执行是否走了MR 和走了几个MR。

1
explain select * from emp;

一个检查的查询所有的数据的执行计划

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

STAGE DEPENDENCIES: |
| Stage-0 is a root stage |
| |
| STAGE PLANS: |
| Stage: Stage-0 |
| Fetch Operator |
| limit: -1 |
| Processor Tree: |
| TableScan |
| alias: emp |
| Statistics: Num rows: 1 Data size: 6990 Basic stats: COMPLETE Column stats: NONE |
| Select Operator |
| expressions: empno (type: int), ename (type: string), job (type: string), mgr (type: int), hiredate (type: string), sal (type: double), comm (type: double), deptno (type: int) |
| outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7 |
| Statistics: Num rows: 1 Data size: 6990 Basic stats: COMPLETE Column stats: NONE |
| ListSink |

可以看到执行计划总共只有一步。做了一个Fetch 操作,也就是数据的拉取,limit -1 表示拉取所有的数据.

1
explain select job,count(job) from emp group by job;

查看一个聚合操作的执行计划;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

STAGE DEPENDENCIES: |
| Stage-1 is a root stage |
| Stage-0 depends on stages: Stage-1 |
| |
| STAGE PLANS: |
| Stage: Stage-1 |
| Map Reduce |
| Map Operator Tree: |
| TableScan |
| alias: emp |
| Statistics: Num rows: 1 Data size: 6990 Basic stats: COMPLETE Column stats: NONE |
| Select Operator |
| expressions: job (type: string) |
| outputColumnNames: job |
| Statistics: Num rows: 1 Data size: 6990 Basic stats: COMPLETE Column stats: NONE |
| Group By Operator |
| aggregations: count(job) |
| keys: job (type: string) |
| mode: hash |
| outputColumnNames: _col0, _col1 |
| Statistics: Num rows: 1 Data size: 6990 Basic stats: COMPLETE Column stats: NONE |
| Reduce Output Operator |
| key expressions: _col0 (type: string) |
| sort order: + |
| Map-reduce partition columns: _col0 (type: string) |
| Statistics: Num rows: 1 Data size: 6990 Basic stats: COMPLETE Column stats: NONE |
| value expressions: _col1 (type: bigint) |
| Execution mode: vectorized |
| Reduce Operator Tree: |
| Group By Operator |
| aggregations: count(VALUE._col0) |
| keys: KEY._col0 (type: string) |
| mode: mergepartial |
| outputColumnNames: _col0, _col1 |
| Statistics: Num rows: 1 Data size: 6990 Basic stats: COMPLETE Column stats: NONE |
| File Output Operator |
| compressed: false |
| Statistics: Num rows: 1 Data size: 6990 Basic stats: COMPLETE Column stats: NONE |
| table: |
| input format: org.apache.hadoop.mapred.SequenceFileInputFormat |
| output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat |
| serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
| |
| Stage: Stage-0 |
| Fetch Operator |
| limit: -1 |
| Processor Tree: |
| ListSink |
|

聚合操作是需要走MR程序的。

STAGE DEPENDENCIES 看到总共的执行步骤,目前是有2步。 Stage-1 和 Stage-0 ,其中 Stage-0 依赖于 Stage-1

Stage-1 的执行计划就是一个MR程序.

可以看到Map 程序中 keys 用的是job ,聚合操作是 aggregations count(job),并且map输出的结果是 _col10

看到reduce 程序中 的key 是 KEY._col0 aggregations 是 count(VALUE._col0)

Stage-0 的执行计划是hive中 hdfs中的一个数据的fetch操作。

在执行大数据量的操作的时候,通过explain 进行优化是必须的,能够减少执行时间。默认的MR程序如果执行时间超过7天,将被系统停止运行。

fetch 抓取配置

hive 是如何决定那些查询走fetch 操作还是走MR程序呢?

内部是有个参数来控制的,可以修改hive-site.xml 来进行设置.

参数名称 hive.fetch.task.conversion

有3个选项

  • none 任何操作都会走mr程序
  • minimal SELECT STAR, FILTER on partition columns, LIMIT only 只有这些操作会有fetch操作
  • more SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns) 在minimal 的基础上,在全局 查找,字段查找,limit查找等都不走mapreduce.

测试:

  1. 查询当前的配置
1
2
set hive.fetch.task.conversion;

默认的配置是more,尽量的使用fetch操作

  1. 执行简单的查询过滤操作
1
2
3
explain select ename,job from emp where job = 'CLERK';
explain select ename,job from emp where job = 'CLERK' and mgr > 10;

可以看到执行这些操作的时候并没有走MR;

  1. 修改配置
1
set hive.fetch.task.conversion=minimal;
1
2
explain select ename,job from emp where job = 'CLERK' and mgr > 10;

执行简单的过滤也走MR了

  1. 修改所有走mr
1
set hive.fetch.task.conversion=none;

可以发现任何查询都走MR了.

1
explain select *  from emp;

本地模式配置

默认的hive 的查询提交到 MapReduce 通过集群 由yarn来调度任务。
这个过程会有一些性能的消耗,而对于有些小数据量的场景,可能开辟任务和调度的消耗 比处理任务的时候还长,这时候就是杀猪焉用宰牛刀。
可以配置本地运行默认,减少运行的消耗。

hive.exec.mode.local.auto=true 设置开启本地模式;
hive.exec.mode.local.auto.inputbytes.max=50000000 设置本地模式的最大数据量(50M 左右,本位byte)
hive.exec.mode.local.auto.input.files.max 本地模式的最大文件数量

下面2个配置是当开启了本地模式的时候,还得满足这2个条件才能使用本地模式

  1. 开始本地模式

默认是关闭的

1
2
3
set hive.exec.mode.local.auto=true;
set hive.exec.mode.local.auto.inputbytes.max=10000000;
set hive.exec.mode.local.auto.input.files.max=10;
  1. 执行任务
1
select deptno,avg(mgr) from emp group by deptno;

结果出现错误

查看hive 日志看出是内存溢出

调整内存大小,重新执行

修改hive-env.sh

1
2
3
//放开注释
export HADOOP_HEAPSIZE=1024

重启hive

重新执行分区查询

查看日志,可以看出目前已经是本地模式运行了;

  1. 小数据量下执行时间比较

比较本地模式的和集群模式的运行时间的区别,可以看出小数据量本地模式快于集群模式;

表查询优化

小表连接大表优化

表连接分为2种,一种是在Reduce 阶段连接,一种是把一个小表再Map阶段缓存起来,在Map阶段连接,称为MapJoin。

小表连接大表的场景非常适合MapJoin的方式。

并且在写sql的时候要使用小表来join大表。

新版 hive 已经优化小表join 大 和 大 join 小一个效果了。不过还是尽量用小的join 大的比较好。

开始mapjoin配置

hive.auto.convert.join = true 开启
hive.mapjoin.smalltable.filesize = 25000000 (25M) 小表的大小定义,只有小于这个值才是小表

注意:默认是开启的,默认值是25000000

  1. 开启
1
2
set hive.auto.convert.join = true;
set hive.mapjoin.smalltable.filesize = 25000000;

查看执行计划

1
2
explain select * from emp left join dept on emp.deptno = dept.deptno;

通过执行计划,可以看到执行使用了mapjoin

  1. 查看关闭的执行计划
1
2
set hive.auto.convert.join = false;

查看执行计划

1
2
explain select * from emp left join dept on emp.deptno = dept.deptno;

可以看到并没有使用mapjoin

大表连接大表优化

null key 过滤

比如一个连接查询

1
select * from emp left join dept on emp.deptno = dept.deptno;

如果不希望emp 中的 deptno 为null的出现在结果中,那么先对数据进行过滤,减少关联操作的数据量,对关联的数据能够减少处理的范围。

1
2

select t.* from (select * from emp where deptno is not null) t left join dept on t.deptno = dept.deptno;

空null转换

如果一个表中确实需要一些空值,但是又不能过滤,那么在执行的时候,这些空值就会由同一个maptask 来处理,空值很多的情况下,就会造成数据倾斜问题。

为了解决数据倾斜问题,把数据的空值处理成一个随机的值,让其能够分散到不同的maptask上。

  1. 测试数据准备一些有nullid的数据
1
2
create table nullidtable(id bigint, t bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

加载数据

1
2
load data local inpath '/home/hadoop/data/nullid' into table nullidtable;

没有null 的表

1
create table bigtable(id bigint, t bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

加载数据

1
2
load data local inpath '/home/hadoop/data/bigtable' into table bigtable;

两个文件的地址: https://www.aliyundrive.com/s/e62Xpw7WzD1 提取码: 3u9t

  1. 查看数据的个数

  1. 执行关联查询

//设置5个 reducejob 数量

1
set mapreduce.job.reduces=5;
1
insert overwrite directory '/join/join1'  select n.* from nullidtable n left join bigtable b on n.id = b.id;

在reduce阶段有一些数据倾斜;

  1. 通过为null增加随机值。
1
2
insert overwrite directory '/join/join2' select n.* from nullidtable n full join bigtable o on nvl(n.id,rand()) = o.id;

使用nvl 将空值处理成随机数,注意随机数不能正好关联上错误的数据.

现在reduce 节点的执行将是比较均衡的。

SMB(Sort Merge Bucket join)

当数据量比较大的时候,为了达到连接查询的高效果,可以通过连接键进行分桶 的方式。

比如 数据表 A 和数据表 B ,数据量都是10w 。如果直接关联将是 10w 数据量去关联10
w数据,当分成4 个桶的时候, 每个表都根据桶的数量hash 分为了4个文件,并且只会两两关联,不会出现一个文件去其他的文件关联,达到一种拆分的目的。

开启参数:

1
2
3
4
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;

默认关闭的

  1. 创建分桶表

分桶表1

1
2
3
4
5
6
7
8
9
10
11
12
create table bigtable_buck1(
id bigint,
t bigint,
uid string,
keyword string,
url_rank int,
click_num int,
click_url string)
clustered by(id)
sorted by(id)
into 3 buckets
row format delimited fields terminated by '\t';
1
insert into bigtable_buck1 select * from bigtable; 

分桶表2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
create table bigtable_buck2(
id bigint,
t bigint,
uid string,
keyword string,
url_rank int,
click_num int,
click_url string)
clustered by(id)
sorted by(id)
into 3 buckets
row format delimited fields terminated by '\t';

insert into bigtable_buck2 select * from bigtable;

  1. 执行开启命令
1
2
3
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
  1. 测试关联时间
1
2
3
insert overwrite directory '/join/join3' select b1.* from bigtable_buck1 b1 full join bigtable_buck2 b2 on b1.id = b2.id;


  1. 测试不使用分桶的表的关联时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
create table bigtable_test_1(
id bigint,
t bigint,
uid string,
keyword string,
url_rank int,
click_num int,
click_url string)
row format delimited fields terminated by '\t';


create table bigtable_test_2(
id bigint,
t bigint,
uid string,
keyword string,
url_rank int,
click_num int,
click_url string)
row format delimited fields terminated by '\t';

insert into bigtable_test_1 select * from bigtable;

insert into bigtable_test_2 select * from bigtable;

可以看到通过使用分桶的比不分桶的要快,当数据量大的时候结果对比才明显。

执行关联查询

1
insert overwrite directory '/join/join6' select t1.* from bigtable_test_1 t1 full join bigtable_test_2 t2 on t1.id = t2.id;

Group By 优化

主要针对有些聚合操作是可以在map阶段先完成一部分的。先在map阶段完成一部分,在reduce阶段得到最终的结果。

比如一个 分组后的 count 操作,原本是 map阶段分组,而reduce阶段计算 总数,而将计算总数的操作 让多个map先计算了,reduce来做个总和,可以防止数据倾斜,同时提高效率。

开启map端聚合

set hive.map.aggr = true 是否开启,(默认开启)

et hive.groupby.mapaggr.checkinterval = 100000 聚合操作的数量,每多少条数据执行一次聚合,最终reduce端将所有的结果合并。

set hive.groupby.skewindata = true 有数据倾斜的时候进行负载均衡

数据倾斜的数据进行负载开启后,生成的查询会有2个MR job。第一个MR Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作。

  1. 开启测试
1
2
set hive.map.aggr=true;

1
select deptno from emp group by deptno;
  1. 关闭测试
1
2
set hive.map.aggr=false;
select deptno from emp group by deptno;

注意:当数据量小的时候可能看不出效果

COUNT(Distinct) 优化

默认的Distinct 使用reduce 来完成,当数据量大的时候,可能会很慢。
可以通过group by 的方式来完成,group by的方式还能用到map 端聚合。

1
select count(id) from (select id from bigtable group by id) a;

行列过滤

列过滤

列过滤就是查询的时候只查询特定的列减少数据的传输。

行过滤

在关联查询的时候,尽量将数据缩小到一个比较小的范围。

比如查询:

1
2
3

select * from emp left join dept on emp.deptno = dept.deptno where emp.job = 'CLERK';

改写成:

1
2
3

select emp_table.* from (select * from emp where job = 'CLERK') emp_table left join dept on emp_table.deptno = dept.deptno;

通过下面的写法,可以在关联之前做到将行的数据过滤掉,减少mapreduce 的处理的数据范围;

注意:上面的那种写法,默认也会优化成下面的过滤,这种优化叫谓词下推。但是仅仅适用于简单的查询,复杂的查询无法自动优化,所以还是手动写成这样比较好。

设置map 和reduce 数量

针对map 和reduce 的数量设置,需要 根据多方面进行考虑。

主要的决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小。

如何调整map数量

  • 每个map 数量处理的文件数尽量接近文件块(默认128m)的数量
  • 针对比较耗时的文件,需要大量计算性能的文件,需要增加map数量,提高执行效率。

如何调reduce 数量

  1. 系统 自动方式

每个Reduce处理的数据量默认是256MB

参数1:set hive.exec.reducers.bytes.per.reducer=256000000

每个任务最大的reduce数,默认为1009
参数2:hive.exec.reducers.max=1009

N=min(参数2,总输入数据量/参数1)

  1. 设置方式 手动设置和修改

在hadoop的mapred-default.xml文件中修改
设置每个job的Reduce个数
set mapreduce.job.reduces = 15;

注意:reduce 数量等于生成的文件数,需要根据具体的业务场景选择具体的数量

小文件进行合并CombineHiveInputFormat

在hive中可以指定 hive的输入格式 CombineHiveInputFormat 来合并小文件。

在map执行前将小文件合并,减少了map数量。

map输入时的设置:

1
2
3
//设置格式
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

map-reduce 任务结束时的设置:

1
2
3

SET hive.merge.mapfiles = true;

是否合并map 阶段执行完毕的小文件 (默认开启)

1
SET hive.merge.mapredfiles = true;

是否合并reduce 阶段执行完毕的小文件 (默认关闭)

1
SET hive.merge.size.per.task = 268435456;

合并文件的最大大小,默认是 256000000

1
SET hive.merge.smallfiles.avgsize = 16777216; 16M 左右

当小文件的平均大小,小于此值的时候,单独起一个mr任务执行合并任务。 此目的是因为文件太小,合并会耗费性能,加快合并。

并行模式

将map 阶段和reduce 阶段的多个任务之间,是可以并行执行的。
在map还未完全执行完的时候就启动了reduce,已经开始执行了部分的数据。

开启

1
2
set hive.exec.parallel=true;              //打开任务并行执行
set hive.exec.parallel.thread.number=16; //同一个sql允许最大并行度,默认为8

默认是关闭的,在性能满足的情况下可以适当开启,减少程序处理时间。