`
tenderuser
  • 浏览: 62187 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

TextFileNameInputFormat

 
阅读更多
以前写过一个,不过没做笔记,过段时间就忘了,还是放在这里把。。。。 功能很简单,就是从不同的文件中读文本中的每一行,然后将文件名作为key,将文件中的每一行作为key,recordReader基本上就是从LineRecordReader处copy过来的。。 一点技术含量木有。。。 而且还很乱。。   
/**
 * 
 */
package ledkk.util;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.LineReader;

/**
 * @author ledkk
 *
 */
public class TextFileNameInputFormat extends FileInputFormat<Text, Text> {


	/* (non-Javadoc)
	 * @see org.apache.hadoop.mapred.lib.CombineFileInputFormat#getRecordReader(org.apache.hadoop.mapred.InputSplit, org.apache.hadoop.mapred.JobConf, org.apache.hadoop.mapred.Reporter)
	 */
	@Override
	public RecordReader<Text, Text> getRecordReader(
			InputSplit arg0, JobConf arg1,
			Reporter arg2) throws IOException {
		return new TextFileNameRecordReader(arg0, arg1, arg2);
	}
	
	
	public static class TextFileNameRecordReader implements RecordReader<Text, Text>{
		
		FileSplit split;
		JobConf conf;
		Reporter reporter;
		LineReader dis;
		
		 long start;
		 long pos;
		 long end;
			
		public TextFileNameRecordReader(InputSplit s , JobConf conf , Reporter reporter) throws IOException{
			this.split = (FileSplit) s;
			this.conf = conf;
			this.reporter = reporter;
			
			start = split.getStart();
		    end = start + split.getLength();
		    final Path file = split.getPath();
		    FileSystem fs = file.getFileSystem(conf);
		    FSDataInputStream fileIn = fs.open(split.getPath());
		    dis = new LineReader(fileIn, conf);
		    this.pos = start;
		}

		/* (non-Javadoc)
		 * @see org.apache.hadoop.mapred.RecordReader#next(java.lang.Object, java.lang.Object)
		 */
		@Override
		public synchronized  boolean next(Text key, Text value) throws IOException {
			Path f = split.getPath();
			String fileName = f.getName();
			 while (pos < end) {
			      key.set(fileName);

			      int newSize = dis.readLine(value, Integer.MAX_VALUE,
			                                Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
			                                		Integer.MAX_VALUE));
			      if (newSize == 0) {
			        return false;
			      }
			      pos += newSize;
			      if (newSize < Integer.MAX_VALUE) {
			        return true;
			      }

			      LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
		    }
			 return false;
			
		}

		/* (non-Javadoc)
		 * @see org.apache.hadoop.mapred.RecordReader#createKey()
		 */
		@Override
		public Text createKey() {
			return new Text();
		}

		/* (non-Javadoc)
		 * @see org.apache.hadoop.mapred.RecordReader#createValue()
		 */
		@Override
		public Text createValue() {
			return new Text();
		}

		/* (non-Javadoc)
		 * @see org.apache.hadoop.mapred.RecordReader#getPos()
		 */
		@Override
		public long getPos() throws IOException {
			return pos;
		}

		/* (non-Javadoc)
		 * @see org.apache.hadoop.mapred.RecordReader#close()
		 */
		@Override
		public void close() throws IOException {
			if(dis!=null){
				dis.close();
			}
		}

		/* (non-Javadoc)
		 * @see org.apache.hadoop.mapred.RecordReader#getProgress()
		 */
		@Override
		public float getProgress() throws IOException {
			return 0;
		}
		
	}
	
}



分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics