首页 > 科技 > Hive的优化和压缩

Hive的优化和压缩

使用之前的数据库

执行语句

explain select count(*) from emp;

explain可以帮助我们看到有多少个任务

会出现下面的信息根标签,操作语法树等信息

根标签

操作语法树

之前在做日志分析的时候,创建表的语句,也有很多信息

依赖,是会构成有向无环图的, 根据有向无环图会按照顺序执行job。这样也会造成执行时间比较长,我们就需要做优化

优化一:大表拆小表

比如临时表、分区表、外部表。其中分区表:加载的表会更少,会直接加载到某个数据,不需要加载所有数据,提高检索速度。

优化二:sql语句

将复杂的语句,比如子查询简化拆分成多个简单的语句。

join和filter的使用,先过滤再join

优化三:合理设置map和reduce个数

reduce数目:可以通过参数设置,默认是一个

** hive中:set mapreduce.job.reduces=

** mapreduce编码中:job.setNumReduceTasks(tasks);

map的数目: 一个分片就是一个块,一个maptask,不可以通过参数去设置

在源码中有个计算方式,大体是:

** Math.max(minSize, Math.min(maxSize, blockSize));

minSize默认参数值是0M

maxSize默认参数值是256M

blockSize默认参数值128M

可以打开FileInputFormat类中的computeSplitSize的方法

修改块的大小是不可行的,因为集群投入生产,格式化后就不能再修改了,可以通过代码指定

** FileInputFormat.setMaxInputSplitSize(job, size);

** FileInputFormat.setMinInputSplitSize(job, size);

通过split的最大最小值来改变map任务数

优化四:开启并行执行

有多个job之间无依赖关系可以一起执行,充分利用资源,在工作中都会把它打开

hive.exec.parallel

设置同时运行的线程数,根据集群资源设置,默认是8

hive.exec.parallel.thread.number

优化五:jvm重用

mapreduce.job.jvm.numtasks默认是1,运行一个job会启动一个jvm上运行

用完就销毁,可以设置重用,节省资源,可以按照比例调整数目,会影响map任务,shuffer会有小幅度的下降

reduce影响不大,一般可以设置为3或5

还有一个推测执行(不建议使用,了解为主):

比如:运行十个map,十个reduce,等map结束后,等了一段时间后,九个执行reduce完毕,有一个还没结束,分布的数据量都是差不多的。推测执行:mapreduce会再重新开启一个跟这个一模一样的任务,,两个相同的任务完成同一件事,谁先完成,就会把另一个kill。缺点:会消耗更多的资源,一般不建议开启,有可能数据会重复写入,造成异常。

优化六:hive本地模式(了解为主)

hive的本地模式:hive.exec.mode.local.auto默认flase

hive底层运用的是hadoop集群,本地模式不会在集群所有机器上运行,会选择一台作为本地运行,一般处理小数据量级的

速度会很快

限制:job的输入数据不能大于128MB,map的个数不能超过4个,reduce的个数不能超过1个

优化七:

hive数据倾斜:在MR程序中由于某个key值分布不均匀,导致某个reduce运行速度严重过慢,严重影响了整个job的运行

原因有很多,解决办法也很多,有几个比较典型。

解决一:默认的分区是采用hash取值,可以自定义实现分区规则来避免产生倾斜

解决二:在key中加入随机数的侧率,打乱分区

在hive中

产生倾斜的主要语句:join、group by、distinct。

join,连接某个key值时,key值得数据量很多。

join:map join 、 reduce join 、SMB join(sort merge bucket)

map join:适合小表join大表的场景 【读取小表缓存到内存中,在map端完成reduce,减轻reduce压力】

开启mapjoin,默认值是true,开启了map join,符合条件就会去执行

属性配置


hive.auto.convert.join

true

执行map join的条件,默认是10M

属性配置


hive.auto.convert.join.noconditionaltask.size

10000000

reduce join:适合大表join大表的场景 【加上随机数,把倾斜的数据分到不同的reduce上】

SMB join:适合大表join大表的场景,简称:桶join,创建桶表

分区与分区之间的join,减少了join的范围。

桶join只适合桶与桶之间的join,适合抽样的统计。

注意:桶表之间的join,两张表的桶的个数要么是一致,要么就是成倍数关系

如何判断是大key导致的问题?

通过时间判断:如果每个reduce的运行时间都很长,那么可能是reduce数目设置过少造成的;如果大部分的reduce任务在几分钟之内完成了,而某一个reduce可能30分钟还没完成,可能是倾斜;可能也是某个节点造成的问题,可以考虑使用推测执行,如果推测执行的任务也很慢,就有可能是倾斜问题或者如果推测执行的新任务在短时间内完成,可能就是节点造成的某个任务运行过慢。

自定义counter判断:判断统计查看每个任务的信息,输入记录条数和输出字符数。

Hadoop和 hive 压缩

Hadoop数据压缩

MR操作过程中进行大量数据传输。

压缩技术能够有效的减少底层存储(HDFS)读写字节数。

压缩提高了网络带宽和磁盘空间的效率。

数据压缩能够有效的节省资源!

压缩是mr程序的优化策略!

通过压缩编码对mapper或者reducer数据传输进行数据的压缩,以减少磁盘IO。

bin/hadoop checknative可以查看hadoop是否开启了压缩

我们准备几个软件

解压

tar -zxvf cdh5.3.6-snappy-lib-natirve.tar.gz

将hadoop中对应的目录native直接删除覆盖

这是hadoop中的目录

我们将原先的删除备份

再将解压的拷贝过来

bin/hadoop checknative可以查看hadoop是否开启了压缩

压缩要支持可分割性

那什么叫可分割性呢,我们来看一个小例子

我桌面上有一个Tomcat文件,我们进行压缩

我们压缩的时候,进行分卷

这是压缩完的

我解压一个是可以的

但是我删除一个,就不可以了,这就是不可分割性

MR 压缩设置

设置这两个参数

map:输出

mapreduce.map.output.compress=true

mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec

reduce:输出

mapreduce.output.fileoutputformat.compress=true

mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec

代码中设置

FileOutputFormat.setCompressOutput(job, true)

FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);

可以利用wordcount查看下


跑一个普通的看看(数据量大的效果比较明显):

bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0-cdh5.3.6.jar wordcount /input /output

设置后再查看

bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0-cdh5.3.6.jar wordcount -Dmapreduce.map.output.compress=true -Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec /input /output2

Hive的压缩设置

map:输出


hive.exec.compress.intermediate

true

配置Map

set hive.exec.compress.intermediate=true;

set mapreduce.map.output.compress=true;

set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;

reduce:输出


hive.exec.compress.output

true

配置reduce

set mapreduce.output.fileoutputformat.compress=true;

set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;

set hive.exec.compress.output=true;

hive的高压缩存储格式

[STORED AS file_format]

file_format:

hive默认的存储格式是TEXTFILE

我们创建数据

【原文本数据】

create table file_source(

id string,

url string,

referer string,

keyword string,

type string,

guid string,

pageId string,

moduleId string,

linkId string,

attachedInfo string,

sessionId string,

trackerU string,

trackerType string,

ip string,

trackerSrc string,

cookie string,

orderCode string,

trackTime string,

endUserId string,

firstLink string,

sessionViewNo string,

productId string,

curMerchantId string,

provinceId string,

cityId string,

fee string,

edmActivity string,

edmEmail string,

edmJobId string,

ieVersion string,

platform string,

internalKeyword string,

resultSum string,

currentPage string,

linkPosition string,

buttonPosition string

)row format delimited fields terminated by '\t';

导入数据

load data local inpath '/data/test/data1' into table file_source;

创建表

create table file_textfile(

id string,

url string,

referer string,

keyword string,

type string,

guid string,

pageId string,

moduleId string,

linkId string,

attachedInfo string,

sessionId string,

trackerU string,

trackerType string,

ip string,

trackerSrc string,

cookie string,

orderCode string,

trackTime string,

endUserId string,

firstLink string,

sessionViewNo string,

productId string,

curMerchantId string,

provinceId string,

cityId string,

fee string,

edmActivity string,

edmEmail string,

edmJobId string,

ieVersion string,

platform string,

internalKeyword string,

resultSum string,

currentPage string,

linkPosition string,

buttonPosition string

)

row format delimited fields terminated by '\t'

stored as textfile;

导入数据

insert overwrite table file_textfile select * from file_source;

创建表

create table file_parquet(

id string,

url string,

referer string,

keyword string,

type string,

guid string,

pageId string,

moduleId string,

linkId string,

attachedInfo string,

sessionId string,

trackerU string,

trackerType string,

ip string,

trackerSrc string,

cookie string,

orderCode string,

trackTime string,

endUserId string,

firstLink string,

sessionViewNo string,

productId string,

curMerchantId string,

provinceId string,

cityId string,

fee string,

edmActivity string,

edmEmail string,

edmJobId string,

ieVersion string,

platform string,

internalKeyword string,

resultSum string,

currentPage string,

linkPosition string,

buttonPosition string

)

row format delimited fields terminated by '\t'

stored as PARQUET;

导入数据

insert overwrite table file_parquet select * from file_source;

创建表

create table file_orc(

id string,

url string,

referer string,

keyword string,

type string,

guid string,

pageId string,

moduleId string,

linkId string,

attachedInfo string,

sessionId string,

trackerU string,

trackerType string,

ip string,

trackerSrc string,

cookie string,

orderCode string,

trackTime string,

endUserId string,

firstLink string,

sessionViewNo string,

productId string,

curMerchantId string,

provinceId string,

cityId string,

fee string,

edmActivity string,

edmEmail string,

edmJobId string,

ieVersion string,

platform string,

internalKeyword string,

resultSum string,

currentPage string,

linkPosition string,

buttonPosition string

)

row format delimited fields terminated by '\t'

stored as orc;

导入数据

insert overwrite table file_orc select * from file_source;

【结果比较】

原始数据37.6 MB

textfile27.48 MB

parquet16.14 MB

orc4.4 MB

总结:

textfile 存储空间消耗比较大,并且压缩的text 无法分割和合并 查询的效率最低,可以直接存储,加载数据的速度最高

sequencefile 存储空间消耗最大,压缩的文件可以分割和合并 查询效率高,需要通过text文件转化来加载

rcfile 存储空间最小,查询的效率最高 ,需要通过text文件转化来加载,加载的速度最低

text,seqfile能不用就尽量不要用 最好是选择orc

本文来自投稿,不代表本人立场,如若转载,请注明出处:http://www.souzhinan.com/kj/255181.html