`
yehao0716
  • 浏览: 22149 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

hadoop学习——arrayWritable的应用

阅读更多

 

 

 

 

package kpi;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ArrayWritableTest {
	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException, URISyntaxException {
		Configuration conf = new Configuration();
		FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop:9000/"),
				conf);
		fileSystem.delete(new Path("/kpi__data_out_1"), true);

		Job job = new Job(conf, KpiWritable1.class.getName());
		job.setJarByClass(KpiWritable1.class);

		FileInputFormat.setInputPaths(job, new Path(
				"hdfs://hadoop:9000/kpi_data"));
		FileOutputFormat.setOutputPath(job, new Path(
				"hdfs://hadoop:9000/kpi__data_out_1"));

		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongArrayWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		job.waitForCompletion(true);
	}

	static class MyMapper extends
			Mapper<LongWritable, Text, Text, LongArrayWritable> {
		Text key2 = new Text();

		@Override
		protected void map(
				LongWritable key,
				Text value,
				org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, LongArrayWritable>.Context context)
				throws IOException, InterruptedException {
			String[] split = value.toString().split("\t");
			key2.set(split[1]);
			String[] traffic = new String[4];
			traffic[0] = split[6];
			traffic[1] = split[7];
			traffic[2] = split[8];
			traffic[3] = split[9];

			LongArrayWritable arrayWritable = new LongArrayWritable(traffic);
			context.write(key2, arrayWritable);
		}
	}

	static class MyReducer extends
			Reducer<Text, LongArrayWritable, Text, NullWritable> {
		private Text key3 = new Text();

		@Override
		protected void reduce(
				Text key2,
				Iterable<LongArrayWritable> val2s,
				org.apache.hadoop.mapreduce.Reducer<Text, LongArrayWritable, Text, NullWritable>.Context context)
				throws IOException, InterruptedException {
			long sum1 = 0;
			long sum2 = 0;
			long sum3 = 0;
			long sum4 = 0;
			for (LongArrayWritable traffic : val2s) {
				Writable[] writables = traffic.get();
				sum1 += Long.valueOf(writables[0].toString());
				sum2 += Long.valueOf(writables[1].toString());
				sum3 += Long.valueOf(writables[2].toString());
				sum4 += Long.valueOf(writables[3].toString());
			}
			key3.set(key2 + " " + sum1 + " " + sum2 + " " + sum3 + " " + sum4);
			context.write(key3, NullWritable.get());
		}
	}

	static class LongArrayWritable extends ArrayWritable {

		public LongArrayWritable() {
			super(LongWritable.class);
		}

		public LongArrayWritable(String[] string) {
			super(LongWritable.class);
			LongWritable[] longs = new LongWritable[string.length];
			for (int i = 0; i < longs.length; i++) {
				longs[i] = new LongWritable(Long.valueOf(string[i]));
			}
			set(longs);
		}
	}
}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics