`
bo_hai
  • 浏览: 554243 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

Hadoop 自定义数据类型实例

 
阅读更多

一、来自 hadoop in action 上的实例,我在这里做了一个总结。文件内容如下:

17:16:20	http://blackproof.iteye.com/blog/1806263
17:16:21	http://blackproof.iteye.com/blog/1806264
17:16:56	http://blackproof.iteye.com/blog/1806265
17:16:30	http://blackproof.iteye.com/blog/1806266
17:16:45	http://blackproof.iteye.com/blog/1806267
17:16:23	http://blackproof.iteye.com/blog/1806268

 需求是:把后面的URLString 封装成 URL类型。代码如下:

 

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;

import org.apache.hadoop.io.Writable;

public class URLWritable implements Writable {

	protected URL url;
	
	public URLWritable() {
		
	}
	
	public URLWritable(URL url) {
		this.url = url;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(url.toString());
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.url = new URL(in.readUTF());
	}
	
	public void set(String string) {
		try {
			this.url = new URL(string);
		} catch (MalformedURLException e) {
			throw new RuntimeException("Should not have happened " + e.toString());
		}
	}
}

 

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;



public class TimeUrlLineRecordReader extends RecordReader<Text, URLWritable> {
	public static final String Time_URL_SEPERATOR = 
	    "mapreduce.input.keyvaluelinerecordreader.key.value.separator";
	
	private final LineRecordReader lineRecordReader;
	
	private byte separator = (byte) '\t';
	
	private Text innerValue;
	  
	private Text key;
	  
	private URLWritable value;
	
	public static int findSeparator(byte[] utf, int start, int length, byte sep) {
		for (int i = start; i < (start + length); i++) {
			if (utf[i] == sep) {
				return i;
			}
		}
		return -1;
	}
	
	public static void setKeyValue(Text key, URLWritable value, byte[] line,
			int lineLen, int pos) {
		if (pos == -1) {
			key.set(line, 0, lineLen);
			value.set(StringUtils.EMPTY);
		} else {
			key.set(line, 0, pos);
			String url = null;
			System.arraycopy(line, pos + 1,url , 0, lineLen - pos - 1);
			value.set(url);
		}
	}

	public TimeUrlLineRecordReader(Configuration conf) throws IOException {
		lineRecordReader = new LineRecordReader();
		String sepStr = conf.get(Time_URL_SEPERATOR, "\t");
	    this.separator = (byte) sepStr.charAt(0);
	}
	
	@Override
	public void initialize(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		 lineRecordReader.initialize(split, context);
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		byte[] line = null;
		int lineLen = -1;
		if (lineRecordReader.nextKeyValue()) {
			innerValue = lineRecordReader.getCurrentValue();
			line = innerValue.getBytes();
			lineLen = innerValue.getLength();
		} else {
			return false;
		}
		if (line == null) {
			return false;
		}
		if (key == null) {
			key = new Text();
		}
		if (value == null) {
			value = new URLWritable();
		}
		int pos = findSeparator(line, 0, lineLen, this.separator);
		setKeyValue(key, value, line, lineLen, pos);
	    return true;
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		return key;
	}

	@Override
	public URLWritable getCurrentValue() throws IOException,
			InterruptedException {
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		return lineRecordReader.getProgress();
	}

	@Override
	public void close() throws IOException {
		lineRecordReader.close();
	}
}

 

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class TimeUrlTextInputFormat extends FileInputFormat<Text, URLWritable>{

	@Override
	protected boolean isSplitable(JobContext context, Path file) {
		final CompressionCodec codec = new CompressionCodecFactory(
				context.getConfiguration()).getCodec(file);
		return codec == null;
	}

	@Override
	public RecordReader<Text, URLWritable> createRecordReader(InputSplit split,
			TaskAttemptContext context) throws IOException, InterruptedException {
		context.setStatus(split.toString());
		return new TimeUrlLineRecordReader(context.getConfiguration());
	}
}

 

import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class CustomTimeUrl extends Configured implements Tool {

	public static class CustomTimeUrlMapper extends Mapper<Text, URLWritable, Text, URLWritable> {

		@Override
		protected void map(Text key, URLWritable value, Context context)
				throws IOException, InterruptedException {
			context.write(key, value);
		}
		
	}
	
	public static class CustomTimeUrlReducer extends Reducer<Text, URLWritable, Text, URLWritable> {

		@Override
		protected void reduce(Text key, Iterable<URLWritable> values,Context context)throws IOException, InterruptedException {
			for (URLWritable value : values) {
				context.write(key, value);
			}
		}
		
	}
	
	@Override
	public int run(String[] args) throws Exception {
		Job job = new Job(getConf());
		job.setJarByClass(getClass());
		job.setJobName("CustomTimeUrl");
		
		job.setInputFormatClass(TimeUrlTextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(URLWritable.class);
		
		job.setMapperClass(CustomTimeUrlMapper.class);
		job.setReducerClass(CustomTimeUrlReducer.class);
		
		FileInputFormat.setInputPaths(job, new Path("/timeurl/input/"));
		FileOutputFormat.setOutputPath(job, new Path("/timeurl/output"));
		
		boolean success = job.waitForCompletion(true);
		return success ? 0 : 1;
	}
	
	public static void main(String[] args) throws Exception {
		int result = ToolRunner.run(new TimeUrl(), args);
		System.exit(result);
	}

}

 

分享到:
评论
1 楼 shankses 2015-01-13  
你好,这是个不错的方法,但是inputfile还是根据大小取的,能不能根据行数取,比如每10000行数据作为一个inputfile,希望回复,谢谢

相关推荐

Global site tag (gtag.js) - Google Analytics