在hadoop中使用lzo的压缩算法可以减小数据 的大小和数据的磁盘读写时间,不仅如此,lzo是基于block分块的,这样他就允许数据被分解成chunk,并行的被hadoop处理。这样的特点,就可以让lzo在hadoop上成为一种非常好用的压缩格式。
lzo本身不是splitable的,所以当数据为text格式时,用lzo压缩出来的数据当做job的输入是一个文件作为一个map。但是sequence file本身是分块的,所以sequence file格式的文件,再配上lzo的压缩格式,就可实现lzo文件方式的splitable。
由于压缩的数据通常只有原始数据的1/4,在HDFS中存储压缩数据,可以使集群能保存更多的数据,延长集群的使用寿命。不仅如此,由于 mapreduce作业通常瓶颈都在IO上,存储压缩数据就意味这更少的IO操作,job运行更加的高效。但是,在hadoop上使用压缩也有两个比较麻 烦的地方:第一,有些压缩格式不能被分块,并行的处理,比如gzip。第二,另外的一些压缩格式虽然支持分块处理,但是解压的过程非常的缓慢,使job的 瓶颈转移到了cpu上,例如bzip2。比如我们有一个1.1GB的gzip文件,该文件 被 分成128MB/chunk存储在hdfs上,那么它就会被分成9块。为了能够在mapreduce中并行的处理各个chunk,那么各个mapper之 间就有了依赖。而第二个mapper就会在文件的某个随机的byte出进行处理。那么gzip解压时要用到的上下文字典就会为空,这就意味这gzip的压 缩文件无法在hadoop上进行正确的并行处理。也就因此在hadoop上大的gzip压缩文件只能被一个mapper来单个的处理,这样就很不高效,跟 不用mapreduce没有什么区别了。而另一种bzip2压缩格式,虽然bzip2的压缩非常的快,并且甚至可以被分块,但是其解压过程非常非常的缓 慢,并且不能被用streaming来读取,这样也无法在hadoop中高效的使用这种压缩。即使使用,由于其解压的低效,也会使得job的瓶颈转移到 cpu上去。
如果能够拥有一种压缩算法,即能够被分块,并行的处理,速度也非常的快,那就非常的理想。这种方式就是lzo。lzo的压缩文件是由许多的小的 blocks组成(约256K),使的hadoop的job可以根据block的划分来split job。不仅如此,lzo在设计时就考虑到了效率问题,它的解压速度是gzip的两倍,这就让它能够节省很多的磁盘读写,它的压缩比的不如gzip,大约 压缩出来的文件比gzip压缩的大一半,但是这样仍然比没有经过压缩的文件要节省20%-50%的存储空间 ,这样就可以在效率上大大的提高job执行的速度。以下是一组压缩对比数据,使用一个8.0GB的未经过压缩的数据来进行对比:
压缩格式 文件 大小(GB) 压缩时间 解压时间
None some_logs 8.0 - -
Gzip some_logs.gz 1.3 241 72
LZO some_logs.lzo 2.0 55 35
可以看出,lzo压缩文件会比gzip压缩文件稍微大一些,但是仍然比原始文件要小很多倍,并且lzo文件压缩的速度几乎相当于gzip的5倍,而解压的 速度相当于gzip的两倍。lzo文件可以根据block boundaries来进行分块,比如一个1.1G的lzo压缩文件,那么处理第二个128MB block的mapper就必须能够确认下一个block的boundary,以便进行解压操作。lzo并没有写什么数据头来做到这一点,而是实现了一个 lzo index文件,将这个文件(foo.lzo.index)写在每个foo.lzo文件中。这个index文件只是简单的包含了每个block在数据中的 offset,这样由于offset已知的缘故,对数据的读写就变得非常的快。通常能达到90-100MB/秒,也就是10-12秒就能读完一个GB的文 件。一旦该index文件被创建,任何基于lzo的压缩文件就能通过load该index文件而进行相应的分块,并且一个block接一个block的被 读取。也因此,各个mapper都能够得到正确的block,这就是说,可以只需要进行一个LzopInputStream的封装,就可以在hadoop 的mapreduce中并行高效的使用lzo。如果现在有一个job的InputFormat是TextInputFormat,那么就可以用lzop来 压缩文件,确保它正确的创建了index,将TextInputFormat换成LzoTextInputFormat,然后job就能像以前一样正确的 运行,并且更加的快。有时候,一个大的文件被lzo压缩过之后,甚至都不用分块就能被单个mapper高效的处理了。
【同转LZO算法】
// 本程序采用 Lempel-Ziv 压缩算法, 云风没有对这个算法做深入研究
// 代码是根据 Markus Franz Xaver Johannes Oberhumer 的 LZO 改写
// 而成, 所以算法上的问题请不要问我, 如果你对这个压缩算法有兴趣,
// 请拜访 LZO 的主页
// http://wildsau.idv.uni-linz.ac.at/mfx/lzo.html
// LZO 属于 GNU 软件, 在此声明, 本文件中的代码使用权利不同于风魂游
// 戏程序库中的其他部分, 请遵守 GNU 协议使用.
//#include <stdio.h>
//#include <string.h>
#include <stdlib.h>
#define byte unsigned char
int _stdcall compress(void *src, unsigned src_len, void *dst);
int _stdcall decompress(void *src, unsigned src_len, void *dst);
static unsigned _do_compress (byte *in, unsigned in_len, byte *out, unsigned *out_len)
{
static long wrkmem [16384L];
register byte *ip;
byte *op;
byte *in_end = in + in_len;
byte *ip_end = in + in_len - 13;
byte *ii;
byte **dict = (byte **)wrkmem;
op = out;
ip = in;
ii = ip;
ip += 4;
for(;;)
{
register byte *m_pos;
unsigned m_off;
unsigned m_len;
unsigned dindex;
dindex = ((0x21*(((((((unsigned)(ip[3])<<6)^ip[2])<<5)^ip[1])<<5)^ip[0]))>>5) & 0x3fff;
m_pos = dict [dindex];
if(((unsigned)m_pos < (unsigned)in) ||
(m_off = (unsigned)((unsigned)ip-(unsigned)m_pos) ) <= 0 ||
m_off > 0xbfff)
goto literal;
if(m_off <= 0x0800 || m_pos[3] == ip[3])
goto try_match;
dindex = (dindex & 0x7ff ) ^ 0x201f;
m_pos = dict[dindex];
if((unsigned)(m_pos) < (unsigned)(in) ||
(m_off = (unsigned)( (int)((unsigned)ip-(unsigned)m_pos))) <= 0 ||
m_off > 0xbfff)
goto literal;
if (m_off <= 0x0800 || m_pos[3] == ip[3])
goto try_match;
goto literal;
try_match:
if(*(unsigned short*)m_pos == *(unsigned short*)ip && m_pos[2]==ip[2])
goto match;
literal:
dict[dindex] = ip;
++ip;
if (ip >= ip_end)
break;
continue;
match:
dict[dindex] = ip;
if(ip - ii > 0)
{
register unsigned t = ip - ii;
if (t <= 3)
op[-2] |= (byte)t;
else if(t <= 18)
*op++ = (byte)(t - 3);
else
{
register unsigned tt = t - 18;
*op++ = 0;
while(tt > 255)
{
tt -= 255;
*op++ = 0;
}
*op++ = (byte)tt;
}
do *op++ = *ii++; while (--t > 0);
}
ip += 3;
if(m_pos[3] != *ip++ || m_pos[4] != *ip++ || m_pos[5] != *ip++ ||
m_pos[6] != *ip++ || m_pos[7] != *ip++ || m_pos[8] != *ip++ )
{
--ip;
m_len = ip - ii;
if(m_off <= 0x0800 )
{
--m_off;
*op++ = (byte)(((m_len - 1) << 5) | ((m_off & 7) << 2));
*op++ = (byte)(m_off >> 3);
}
else
if (m_off <= 0x4000 )
{
-- m_off;
*op++ = (byte)(32 | (m_len - 2));
goto m3_m4_offset;
}
else
{
m_off -= 0x4000;
*op++ = (byte)(16 | ((m_off & 0x4000) >> 11) | (m_len - 2));
goto m3_m4_offset;
}
}
else
{
{
byte *end = in_end;
byte *m = m_pos + 9;
while (ip < end && *m == *ip)
m++, ip++;
m_len = (ip - ii);
}
if(m_off <= 0x4000)
{
--m_off;
if (m_len <= 33)
*op++ = (byte)(32 | (m_len - 2));
else
{
m_len -= 33;
*op++=32;
goto m3_m4_len;
}
}
else
{
m_off -= 0x4000;
if(m_len <= 9)
*op++ = (byte)(16|((m_off & 0x4000) >> 11) | (m_len - 2));
else
{
m_len -= 9;
*op++ = (byte)(16 | ((m_off & 0x4000) >> 11));
m3_m4_len:
while (m_len > 255)
{
m_len -= 255;
*op++ = 0;
}
*op++ = (byte)m_len;
}
}
m3_m4_offset:
*op++ = (byte)((m_off & 63) << 2);
*op++ = (byte)(m_off >> 6);
}
ii = ip;
if (ip >= ip_end)
break;
}
*out_len = op - out;
return (unsigned) (in_end - ii);
}
int _stdcall compress(void *in, unsigned in_len,
void *out)
{
byte *op = out;
unsigned t,out_len;
if (in_len <= 13)
t = in_len;
else
{
t = _do_compress (in,in_len,op,&out_len);
op += out_len;
}
if (t > 0)
{
byte *ii = (byte*)in + in_len - t;
if (op == (byte*)out && t <= 238)
*op++ = (byte) ( 17 + t );
else
if (t <= 3)
op[-2] |= (byte)t ;
else
if (t <= 18)
*op++ = (byte)(t-3);
else
{
unsigned tt = t - 18;
*op++ = 0;
while (tt > 255)
{
tt -= 255;
*op++ = 0;
}
*op++ = (byte)tt;
}
do *op++ = *ii++; while (--t > 0);
}
*op++ = 17;
*op++ = 0;
*op++ = 0;
return (op - (byte*)out);
}
int _stdcall decompress (void *in, unsigned in_len,
void *out)
{
register byte *op;
register byte *ip;
register unsigned t;
register byte *m_pos;
byte *ip_end = (byte*)in + in_len;
op = out;
ip = in;
if(*ip > 17)
{
t = *ip++ - 17;
if (t < 4)
goto match_next;
do *op++ = *ip++; while (--t > 0);
goto first_literal_run;
}
for(;;)
{
t = *ip++;
if (t >= 16) goto match;
if (t == 0)
{
while (*ip == 0)
{
t += 255;
ip++;
}
t += 15 + *ip++;
}
* (unsigned *) op = * ( unsigned *) ip;
op += 4; ip += 4;
if (--t > 0)
{
if (t >= 4)
{
do
{
* (unsigned * ) op = * ( unsigned * ) ip;
op += 4; ip += 4; t -= 4;
} while (t >= 4);
if (t > 0) do *op++ = *ip++; while (--t > 0);
}
else
do *op++ = *ip++; while (--t > 0);
}
first_literal_run:
t = *ip++;
if (t >= 16)
goto match;
m_pos = op - 0x0801;
m_pos -= t >> 2;
m_pos -= *ip++ << 2;
*op++ = *m_pos++; *op++ = *m_pos++; *op++ = *m_pos;
goto match_done;
for(;;)
{
match:
if (t >= 64)
{
m_pos = op - 1;
m_pos -= (t >> 2) & 7;
m_pos -= *ip++ << 3;
t = (t >> 5) - 1;
goto copy_match;
}
else
if (t >= 32)
{
t &= 31;
if (t == 0)
{
while (*ip == 0)
{
t += 255;
ip++;
}
t += 31 + *ip++;
}
m_pos = op - 1;
m_pos -= (* ( unsigned short * ) ip) >> 2;
ip += 2;
}
else
if (t >= 16)
{
m_pos = op;
m_pos -= (t & 8) << 11;
t &= 7;
if (t == 0)
{
while (*ip == 0)
{
t += 255;
ip++;
}
t += 7 + *ip++;
}
m_pos -= (* ( unsigned short *) ip) >> 2;
ip += 2;
if (m_pos == op)
goto eof_found;
m_pos -= 0x4000;
}
else
{
m_pos = op - 1;
m_pos -= t >> 2;
m_pos -= *ip++ << 2;
*op++ = *m_pos++; *op++ = *m_pos;
goto match_done;
}
if (t >= 6 && (op - m_pos) >= 4)
{
* (unsigned *) op = * ( unsigned *) m_pos;
op += 4; m_pos += 4; t -= 2;
do
{
* (unsigned *) op = * ( unsigned *) m_pos;
op += 4; m_pos += 4; t -= 4;
}while (t >= 4);
if (t > 0)
do *op++ = *m_pos++; while (--t > 0);
}
else
{
copy_match:
*op++ = *m_pos++; *op++ = *m_pos++;
do *op++ = *m_pos++; while (--t > 0);
}
match_done:
t = ip[-2] & 3;
if (t == 0) break;
match_next:
do *op++ = *ip++; while (--t > 0);
t = *ip++;
}
}
eof_found:
if (ip != ip_end) return -1;
return (op - (byte*)out);
}
分享到:
相关推荐
hadoop支持LZO压缩配置 将编译好后的hadoop-lzo-0.4.20.jar 放入hadoop-2.7.2/share/hadoop/common/ core-site.xml增加配置支持LZO压缩 <name>io.compression.codecs org.apache.hadoop.io....
配置hadoop支持LZO和snappy压缩
hadoop-lzo-0.4.21-SNAPSHOT.jar是hadoop数据压缩lzo工具包
hadoop lzo 压缩算法的所有工程,包括hadoop-lzo-master,编译好之后的target文件夹和hadoop-lzo-0.4.20-SNAPSHOT.jar文件。复制到eclipse中,可以直接使用lzo压缩算法。
hadoop配置支持LZO压缩必备,版本号hadoop-lzo-0.4.20-SNAPSHOT.jar,
hadoop lzo 压缩jar包,本人已经编译好,提供大家下载。
将生成的 build/hadoop-lzo-0.4.15.jar cp 到 /usr/local/hadoop-1.0.2/lib 测试解压程序 bin/hadoop jar /usr/local/hadoop-1.0.2/lib/hadoop-lzo-0.4.15.jar ...
LZO:实时数据压缩库 hadoop-LZO:hadoop中可切割数据的LZO压缩 资源包含:lzo-2.06.tar.gz, hadoop-lzo-master.zip
22、MapReduce使用Gzip压缩、Snappy压缩和Lzo压缩算法写文件和读取相应的文件 网址:https://blog.csdn.net/chenwewi520feng/article/details/130456088 本文的前提是hadoop环境正常。 本文最好和MapReduce操作常见...
hdfs默认不支持lzo压缩,需要通过将lzo源码融入hadoop源码,重新编译hadoop源码;或者编译lzo源码生成jar,作为插件使用
Hadoop IO操作压缩的lzo算法的jar包,中间包含LzoCodec,和LzoPCodec
Lzo压缩、解压缩安装包。包括Lzo,Lzop,hadoop-lzo的安装包。在Linux,AIX亲测可用。
java用于解压缩lzo文件的jar包
hadoop 的lzo压缩源码包tar.gz,其中包括lzo-2.06.tar.gz和lzo-2.10.tar.gz两个版本,可供选择