`
乡里伢崽
  • 浏览: 108629 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

hive自定义InputFormat

    博客分类:
  • hive
 
阅读更多
自定义分隔符
package com.lwz.inputf;

import java.io.IOException;   
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;   
import org.apache.hadoop.io.Text;   
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;   
import org.apache.hadoop.mapred.InputSplit;   
import org.apache.hadoop.mapred.JobConf;   
import org.apache.hadoop.mapred.JobConfigurable;   
import org.apache.hadoop.mapred.RecordReader;   
import org.apache.hadoop.mapred.Reporter;   
import org.apache.hadoop.mapred.TextInputFormat; 
import org.apache.hadoop.util.LineReader;
/** 
* 自定义hadoop的 org.apache.hadoop.mapred.InputFormat 
*  
* @author winston 
*   hadoop将数据给到map进行处理前会使用InputFormat对数据进行两方面的预处理,最常见的FormatInput就是TextInputFormat
*/ 
public class ClickstreamInputFormat extends TextInputFormat implements   
        JobConfigurable {   
   
    public RecordReader<LongWritable, Text> getRecordReader(
            InputSplit genericSplit, JobConf job, Reporter reporter)   
            throws IOException {  
    //-----genericSplit----hdfs://vmtmstorm01:8020/user/hive/warehouse/hive_test/test:0+13
    System.out.println("-----genericSplit----"+genericSplit.toString());
        reporter.setStatus(genericSplit.toString());   
        /*---job----Configuration:
        core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml,
        yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml,
        org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@351d4566,
        file:/etc/hive/conf.cloudera.hive/hive-site.xml
        */
        System.out.println("---job----"+job);
        System.out.println("----reporter----"+reporter);
        return new ClickstreamRecordReader((FileSplit) genericSplit,job);   
    }   
   
   
    public class ClickstreamRecordReader implements 
    RecordReader<LongWritable, Text> { 


private CompressionCodecFactory compressionCodecs = null; 
private long start; 
private long pos; 
private long end; 
private LineReader lineReader; 
int maxLineLength; 
/*FileSplit是一個輸入文件
LineReader 一个类,提供一个输入流行的读者。根据所使用的构造函数,线将被终止:
   下列之一:'\n'(LF),‘R’(CR),或“\r\n”(CR + LF)。
  或者,一个自定义的字节序列分隔符在这两种情况下,EOF也终止否则无端接线。
*/
public ClickstreamRecordReader(FileSplit inputSplit, Configuration job) 
        throws IOException { 
    maxLineLength = job.getInt("mapred.ClickstreamRecordReader.maxlength", 
            Integer.MAX_VALUE); 
        start = inputSplit.getStart(); 
    System.out.println("---start---"+start);
    System.out.println("----inputSplit.getLength----"+inputSplit.getLength());
    end = start + inputSplit.getLength(); 
    //获取文件的路径
    final Path file = inputSplit.getPath(); 
    System.out.println("---filepath---"+file);
    //一个工厂,将找到一个给定的文件名正确编解码。
    compressionCodecs = new CompressionCodecFactory(job); 
    // 发现对于给定的基于其文件名后缀的文件相关的压缩编解码器。
    final CompressionCodec codec = compressionCodecs.getCodec(file); 

    // Open file and seek to the start of the split 
    //获取该文件数据那个文件系统
    FileSystem fs = file.getFileSystem(job); 
    System.out.println("---FileSystem----"+fs.toString()+"----fs-----"+fs);
    //在指定的path下打开一个FSDataInputStream流
    FSDataInputStream fileIn = fs.open(file); 
    boolean skipFirstLine = false; 
    if (codec != null) { 
    //如果解压器不为空时创建一个LineReader实例
        lineReader = new LineReader(codec.createInputStream(fileIn), job); 
        end = Long.MAX_VALUE; 
    } else { 
        if (start != 0) { 
            skipFirstLine = true; 
            --start; 
            //寻找到了文件的起始偏移
            fileIn.seek(start); 
        } 
        //创建一个LineReader实例
        lineReader = new LineReader(fileIn, job); 
    } 
    if (skipFirstLine) { 
        start += lineReader.readLine(new Text(), 0, 
                (int) Math.min((long) Integer.MAX_VALUE, end - start)); 
    } 
    this.pos = start; 


将代码导出,传到主机,然后通过add jar /app/home/*.jar;添加jar包,此中法添加的都是临时性的,只在当前会话生效

建表
Create table hive_test(num int,name string,jj string) stored as INPUTFORMAT 'com.lwz.inputf.ClickstreamInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';



分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics