`
beat_it_
  • 浏览: 62959 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

hadoop mr file2hfile2hbase

阅读更多

写了个简单的mr 操作file到hfile,在把hfile倒入hbase的例子,在此记录一下:

File2HFile2HBase.java代码:

package com.lyq.study.example;

import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import com.lyq.study.lib.HFileOutputFormatBase;
import com.lyq.study.util.HBaseConfigUtils;

public class File2HFile2HBase {
	private static final Log LOG = LogFactory.getLog(File2HFile2HBase.class);
	private String tableName = "testtable1";

	private static class MapperClass extends
			Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
		public byte[] family = Bytes.toBytes("info");
		public String[] columns = { "card", "type", "amount", "time", "many" };

		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String[] columnVals = value.toString().split(",");
			String rowkey = columnVals[0] + columnVals[3];
			Put put = new Put(Bytes.toBytes(rowkey));
			for (int i = 0; i < columnVals.length; i++) {
				put.add(family, Bytes.toBytes(columns[i]),
						Bytes.toBytes(columnVals[i]));
			}
			context.write(new ImmutableBytesWritable(Bytes.toBytes(rowkey)),
					put);
		}
	}

	public int run(final String[] args) throws Exception {
		if (args.length < 2) {
			System.err.printf("Usage: %s [generic options] <input> <output>\n",
					getClass().getSimpleName());
			return -1;
		}
		LOG.info("Usage command args: " + Arrays.toString(args));
		final String hfile = args[1];
		UserGroupInformation ugi = UserGroupInformation
				.createRemoteUser("hadoop");
		ugi.doAs(new PrivilegedAction<Void>() {

			@Override
			public Void run() {
				try {
					// 1.获取Configuration
					Configuration conf = HBaseConfigUtils.getHBaseConfig(1);
					conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
					// 2.获取Job
					Job job = getJob(conf, args);
					// 3.执行Job
					if (!job.waitForCompletion(true)) {
						throw new IOException("【初始化数据】失败");
					}
					// 4.把hfile文件导入到HBase
					LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
							conf);
					HTable htable = new HTable(conf, tableName);
					loader.doBulkLoad(new Path(hfile), htable);
					// 5.清理hfile目录
					deleteByDir(conf, new Path(hfile), true);

				} catch (IOException e) {
					// TODO: handle exception
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (ClassNotFoundException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (Exception e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				return null;
			}
		});
		return 0;
	}

	private Job getJob(Configuration conf, String[] args) throws IOException {
		Job job = Job.getInstance(conf);
		job.setJobName("File2HFile");
		job.setJarByClass(File2HFile2HBase.class);
		job.setMapperClass(MapperClass.class);

		job.setMapOutputKeyClass(ImmutableBytesWritable.class);
		job.setMapOutputValueClass(Put.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		HTable htable = new HTable(conf, tableName);
		HFileOutputFormatBase.configureIncrementalLoad(job, htable, HFileOutputFormatBase.class);
		return job;
	}

	public static boolean deleteByDir(Configuration conf, Path path,
			Boolean recursive) throws IOException {
		FileSystem fs = FileSystem.get(conf);
		boolean success = fs.delete(path, recursive);
		LOG.info("删除[" + path + "]成功? " + success);
		return success;

	}

	public static void main(String[] args) throws Exception {
		args = new String[] { "hdfs://master129:9000/test/input/data.txt",
				"hdfs://master129:9000/test/output" };
		File2HFile2HBase f2hf2hb = new File2HFile2HBase();
		System.exit(f2hf2hb.run(args));
	}
}

 HFileOutputFormatBase.java代码:

/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lyq.study.lib;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TextSortReducer;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;

/**
 * Writes HFiles. Passed Cells must arrive in order. Writes current time as the
 * sequence id for the file. Sets the major compacted attribute on created
 * hfiles. Calling write(null,null) will forceably roll all HFiles being
 * written.
 * <p>
 * Using this class as part of a MapReduce job is best done using
 * {@link #configureIncrementalLoad(Job, HTable)}.
 * 
 * @see KeyValueSortReducer
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HFileOutputFormatBase extends
		FileOutputFormat<ImmutableBytesWritable, KeyValue> {// 把Cell改成了KeyValue
															
	static Log LOG = LogFactory.getLog(HFileOutputFormatBase.class);
	static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
	private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
	private static final String DATABLOCK_ENCODING_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding";
	private static final String BLOCK_SIZE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize";

	public static final String _deleteRowkey = "_deleteRowkey";// 新增了该行

	public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(// 把cell改成了KeyValue
	//去掉了static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter()
	//把其内容直接拿到了getRecordWriter()中
			final TaskAttemptContext context) throws IOException,
			InterruptedException {

		// Get the path of the temporary output file
		final Path outputPath = FileOutputFormat.getOutputPath(context);
		final Path outputdir = new FileOutputCommitter(outputPath, context)
				.getWorkPath();
		final Path ignoreOutputPath = getDeleteRowKeyFile(outputPath);// 新增了该行
																		
		final Configuration conf = context.getConfiguration();
		final FileSystem fs = outputdir.getFileSystem(conf);
		// These configs. are from hbase-*.xml
		final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
				HConstants.DEFAULT_MAX_FILE_SIZE);
		// Invented config. Add to hbase-*.xml if other than default
		// compression.
		final String defaultCompression = conf.get("hfile.compression",
				Compression.Algorithm.NONE.getName());
		final boolean compactionExclude = conf.getBoolean(
				"hbase.mapreduce.hfileoutputformat.compaction.exclude", false);

		// create a map from column family to the compression algorithm
		final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
		final Map<byte[], String> bloomTypeMap = createFamilyBloomMap(conf);
		final Map<byte[], String> blockSizeMap = createFamilyBlockSizeMap(conf);

		String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY);
		final HFileDataBlockEncoder encoder;
		if (dataBlockEncodingStr == null) {
			encoder = NoOpDataBlockEncoder.INSTANCE;
		} else {
			try {
				encoder = new HFileDataBlockEncoderImpl(
						DataBlockEncoding.valueOf(dataBlockEncodingStr));
			} catch (IllegalArgumentException ex) {
				throw new RuntimeException(
						"Invalid data block encoding type configured for the param "
								+ DATABLOCK_ENCODING_CONF_KEY + " : "
								+ dataBlockEncodingStr);
			}
		}

		return new RecordWriter<ImmutableBytesWritable, KeyValue>() {// 把V改成了KeyValue
																		
			// Map of families to writers and how much has been output on the
			// writer.
			private final Map<byte[], WriterLength> writers = new TreeMap<byte[], WriterLength>(
					Bytes.BYTES_COMPARATOR);
			private final FSDataOutputStream dos = fs.create(ignoreOutputPath);
			private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
			private final byte[] now = Bytes
					.toBytes(System.currentTimeMillis());
			private boolean rollRequested = false;

			public void write(ImmutableBytesWritable row, KeyValue kv)// 把V cell改成了KeyValue kv
																		
					throws IOException {
				// KeyValue kv = KeyValueUtil.ensureKeyValue(cell);//注释掉了该行

				// null input == user explicitly wants to flush
				if (row == null && kv == null) {
					rollWriters();
					return;
				}

				byte[] rowKey = kv.getRow();
				long length = kv.getLength();
				byte[] family = kv.getFamily();

				if (ignore(kv)) {// 新增了该if条件判断
					byte[] readBuf = rowKey;
					dos.write(readBuf, 0, readBuf.length);
					dos.write(Bytes.toBytes("\n"));
					return;
				}

				WriterLength wl = this.writers.get(family);

				// If this is a new column family, verify that the directory
				// exists
				if (wl == null) {
					fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
				}

				// If any of the HFiles for the column families has reached
				// maxsize, we need to roll all the writers
				if (wl != null && wl.written + length >= maxsize) {
					this.rollRequested = true;
				}

				// This can only happen once a row is finished though
				if (rollRequested
						&& Bytes.compareTo(this.previousRow, rowKey) != 0) {
					rollWriters();
				}

				// create a new HLog writer, if necessary
				if (wl == null || wl.writer == null) {
					wl = getNewWriter(family, conf);
				}

				// we now have the proper HLog writer. full steam ahead
				kv.updateLatestStamp(this.now);
				wl.writer.append(kv);
				wl.written += length;

				// Copy the row so we know when a row transition.
				this.previousRow = rowKey;
			}

			private void rollWriters() throws IOException {
				for (WriterLength wl : this.writers.values()) {
					if (wl.writer != null) {
						LOG.info("Writer="
								+ wl.writer.getPath()
								+ ((wl.written == 0) ? "" : ", wrote="
										+ wl.written));
						close(wl.writer);
					}
					wl.writer = null;
					wl.written = 0;
				}
				this.rollRequested = false;
			}

			/*
			 * Create a new StoreFile.Writer.
			 * 
			 * @param family
			 * 
			 * @return A WriterLength, containing a new StoreFile.Writer.
			 * 
			 * @throws IOException
			 */
			private WriterLength getNewWriter(byte[] family, Configuration conf)
					throws IOException {
				WriterLength wl = new WriterLength();
				Path familydir = new Path(outputdir, Bytes.toString(family));
				String compression = compressionMap.get(family);
				compression = compression == null ? defaultCompression
						: compression;
				String bloomTypeStr = bloomTypeMap.get(family);
				BloomType bloomType = BloomType.NONE;
				if (bloomTypeStr != null) {
					bloomType = BloomType.valueOf(bloomTypeStr);
				}
				String blockSizeString = blockSizeMap.get(family);
				int blockSize = blockSizeString == null ? HConstants.DEFAULT_BLOCKSIZE
						: Integer.parseInt(blockSizeString);
				Configuration tempConf = new Configuration(conf);
				tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
				wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(
						tempConf), fs, blockSize)
						.withOutputDir(familydir)
						.withCompression(
								AbstractHFileWriter
										.compressionByName(compression))
						.withBloomType(bloomType)
						.withComparator(KeyValue.COMPARATOR)
						.withDataBlockEncoder(encoder)
						.withChecksumType(HStore.getChecksumType(conf))
						.withBytesPerChecksum(HStore.getBytesPerChecksum(conf))
						.build();

				this.writers.put(family, wl);
				return wl;
			}

			private void close(final StoreFile.Writer w) throws IOException {
				if (w != null) {
					w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
							Bytes.toBytes(System.currentTimeMillis()));
					w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, Bytes
							.toBytes(context.getTaskAttemptID().toString()));
					w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
							Bytes.toBytes(true));
					w.appendFileInfo(
							StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
							Bytes.toBytes(compactionExclude));
					w.appendTrackedTimestampsToMetadata();
					w.close();
				}
			}

			public void close(TaskAttemptContext c) throws IOException,
					InterruptedException {
				dos.flush();// 新增了该行
				dos.close();// 新增了该行
				for (WriterLength wl : this.writers.values()) {
					close(wl.writer);
				}
			}

		};
	}

	/*
	 * Data structure to hold a Writer and amount of data written on it.
	 */
	static class WriterLength {
		long written = 0;
		StoreFile.Writer writer = null;
	}

	/**
	 * Return the start keys of all of the regions in this table, as a list of
	 * ImmutableBytesWritable.
	 */
	private static List<ImmutableBytesWritable> getRegionStartKeys(HTable table)
			throws IOException {
		byte[][] byteKeys = table.getStartKeys();
		ArrayList<ImmutableBytesWritable> ret = new ArrayList<ImmutableBytesWritable>(
				byteKeys.length);
		for (byte[] byteKey : byteKeys) {
			ret.add(new ImmutableBytesWritable(byteKey));
		}
		return ret;
	}

	/**
	 * Write out a {@link SequenceFile} that can be read by
	 * {@link TotalOrderPartitioner} that contains the split points in
	 * startKeys.
	 */
	@SuppressWarnings("deprecation")
	private static void writePartitions(Configuration conf,
			Path partitionsPath, List<ImmutableBytesWritable> startKeys)
			throws IOException {
		LOG.info("Writing partition information to " + partitionsPath);
		if (startKeys.isEmpty()) {
			throw new IllegalArgumentException("No regions passed");
		}

		// We're generating a list of split points, and we don't ever
		// have keys < the first region (which has an empty start key)
		// so we need to remove it. Otherwise we would end up with an
		// empty reducer with index 0
		TreeSet<ImmutableBytesWritable> sorted = new TreeSet<ImmutableBytesWritable>(
				startKeys);

		ImmutableBytesWritable first = sorted.first();
		if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
			throw new IllegalArgumentException(
					"First region of table should have empty start key. Instead has: "
							+ Bytes.toStringBinary(first.get()));
		}
		sorted.remove(first);

		// Write the actual file
		FileSystem fs = partitionsPath.getFileSystem(conf);
		SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
				partitionsPath, ImmutableBytesWritable.class,
				NullWritable.class);

		try {
			for (ImmutableBytesWritable startKey : sorted) {
				writer.append(startKey, NullWritable.get());
			}
		} finally {
			writer.close();
		}
	}

	/**
	 * Configure a MapReduce Job to perform an incremental load into the given
	 * table. This
	 * <ul>
	 * <li>Inspects the table to configure a total order partitioner</li>
	 * <li>Uploads the partitions file to the cluster and adds it to the
	 * DistributedCache</li>
	 * <li>Sets the number of reduce tasks to match the current number of
	 * regions</li>
	 * <li>Sets the output key/value class to match HFileOutputFormat2's
	 * requirements</li>
	 * <li>Sets the reducer up to perform the appropriate sorting (either
	 * KeyValueSortReducer or PutSortReducer)</li>
	 * </ul>
	 * The user should be sure to set the map output value class to either
	 * KeyValue or Put before running this function.
	 */
	public static void configureIncrementalLoad(Job job, HTable table)
			throws IOException {
		configureIncrementalLoad(job, table, HFileOutputFormatBase.class);
	}

	public static void configureIncrementalLoad(Job job, HTable table,
			Class<? extends OutputFormat<?, ?>> cls) throws IOException {
		Configuration conf = job.getConfiguration();

		job.setOutputKeyClass(ImmutableBytesWritable.class);
		job.setOutputValueClass(KeyValue.class);
		job.setOutputFormatClass(HFileOutputFormatBase.class);

		// Based on the configured map output class, set the correct reducer to
		// properly
		// sort the incoming values.
		// TODO it would be nice to pick one or the other of these formats.
		if (KeyValue.class.equals(job.getMapOutputValueClass())) {
			job.setReducerClass(KeyValueSortReducer.class);
		} else if (Put.class.equals(job.getMapOutputValueClass())) {
			job.setReducerClass(PutSortReducer.class);
		} else if (Text.class.equals(job.getMapOutputValueClass())) {
			job.setReducerClass(TextSortReducer.class);
		} else {
			LOG.warn("Unknown map output value type:"
					+ job.getMapOutputValueClass());
		}

		conf.setStrings("io.serializations", conf.get("io.serializations"),
				MutationSerialization.class.getName(),
				ResultSerialization.class.getName(),
				KeyValueSerialization.class.getName());

		// Use table's region boundaries for TOP split points.
		LOG.info("Looking up current regions for table "
				+ Bytes.toString(table.getTableName()));
		List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
		LOG.info("Configuring " + startKeys.size() + " reduce partitions "
				+ "to match current region count");
		job.setNumReduceTasks(startKeys.size());

		configurePartitioner(job, startKeys);
		// Set compression algorithms based on column families
		configureCompression(table, conf);
		configureBloomType(table, conf);
		configureBlockSize(table, conf);

		// TableMapReduceUtil.addDependencyJars(job);// 注释掉了该行
		TableMapReduceUtil.initCredentials(job);
		LOG.info("Incremental table " + Bytes.toString(table.getTableName())
				+ " output configured.");
	}

	private static void configureBlockSize(HTable table, Configuration conf)
			throws IOException {
		StringBuilder blockSizeConfigValue = new StringBuilder();
		HTableDescriptor tableDescriptor = table.getTableDescriptor();
		if (tableDescriptor == null) {
			// could happen with mock table instance
			return;
		}
		Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
		int i = 0;
		for (HColumnDescriptor familyDescriptor : families) {
			if (i++ > 0) {
				blockSizeConfigValue.append('&');
			}
			blockSizeConfigValue.append(URLEncoder.encode(
					familyDescriptor.getNameAsString(), "UTF-8"));
			blockSizeConfigValue.append('=');
			blockSizeConfigValue.append(URLEncoder.encode(
					String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
		}
		// Get rid of the last ampersand
		conf.set(BLOCK_SIZE_CONF_KEY, blockSizeConfigValue.toString());
	}

	/**
	 * Run inside the task to deserialize column family to compression algorithm
	 * map from the configuration.
	 * 
	 * Package-private for unit tests only.
	 * 
	 * @return a map from column family to the name of the configured
	 *         compression algorithm
	 */
	static Map<byte[], String> createFamilyCompressionMap(Configuration conf) {
		return createFamilyConfValueMap(conf, COMPRESSION_CONF_KEY);
	}

	private static Map<byte[], String> createFamilyBloomMap(Configuration conf) {
		return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY);
	}

	private static Map<byte[], String> createFamilyBlockSizeMap(
			Configuration conf) {
		return createFamilyConfValueMap(conf, BLOCK_SIZE_CONF_KEY);
	}

	/**
	 * Run inside the task to deserialize column family to given conf value map.
	 * 
	 * @param conf
	 * @param confName
	 * @return a map of column family to the given configuration value
	 */
	private static Map<byte[], String> createFamilyConfValueMap(
			Configuration conf, String confName) {
		Map<byte[], String> confValMap = new TreeMap<byte[], String>(
				Bytes.BYTES_COMPARATOR);
		String confVal = conf.get(confName, "");
		for (String familyConf : confVal.split("&")) {
			String[] familySplit = familyConf.split("=");
			if (familySplit.length != 2) {
				continue;
			}
			try {
				confValMap
						.put(URLDecoder.decode(familySplit[0], "UTF-8")
								.getBytes(), URLDecoder.decode(familySplit[1],
								"UTF-8"));
			} catch (UnsupportedEncodingException e) {
				// will not happen with UTF-8 encoding
				throw new AssertionError(e);
			}
		}
		return confValMap;
	}

	/**
	 * Configure <code>job</code> with a TotalOrderPartitioner, partitioning
	 * against <code>splitPoints</code>. Cleans up the partitions file after job
	 * exists.
	 */
	static void configurePartitioner(Job job,
			List<ImmutableBytesWritable> splitPoints) throws IOException {

		// create the partitions file
		FileSystem fs = FileSystem.get(job.getConfiguration());
		Path partitionsPath = new Path("/tmp", "partitions_"
				+ UUID.randomUUID());
		fs.makeQualified(partitionsPath);
		fs.deleteOnExit(partitionsPath);
		writePartitions(job.getConfiguration(), partitionsPath, splitPoints);

		// configure job to use it
		job.setPartitionerClass(TotalOrderPartitioner.class);
		TotalOrderPartitioner.setPartitionFile(job.getConfiguration(),
				partitionsPath);
	}

	/**
	 * Serialize column family to compression algorithm map to configuration.
	 * Invoked while configuring the MR job for incremental load.
	 * 
	 * Package-private for unit tests only.
	 * 
	 * @throws IOException
	 *             on failure to read column family descriptors
	 */
	@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
	static void configureCompression(HTable table, Configuration conf)
			throws IOException {
		StringBuilder compressionConfigValue = new StringBuilder();
		HTableDescriptor tableDescriptor = table.getTableDescriptor();
		if (tableDescriptor == null) {
			// could happen with mock table instance
			return;
		}
		Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
		int i = 0;
		for (HColumnDescriptor familyDescriptor : families) {
			if (i++ > 0) {
				compressionConfigValue.append('&');
			}
			compressionConfigValue.append(URLEncoder.encode(
					familyDescriptor.getNameAsString(), "UTF-8"));
			compressionConfigValue.append('=');
			compressionConfigValue.append(URLEncoder.encode(familyDescriptor
					.getCompression().getName(), "UTF-8"));
		}
		// Get rid of the last ampersand
		conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());
	}

	/**
	 * Serialize column family to bloom type map to configuration. Invoked while
	 * configuring the MR job for incremental load.
	 * 
	 * @throws IOException
	 *             on failure to read column family descriptors
	 */
	static void configureBloomType(HTable table, Configuration conf)
			throws IOException {
		HTableDescriptor tableDescriptor = table.getTableDescriptor();
		if (tableDescriptor == null) {
			// could happen with mock table instance
			return;
		}
		StringBuilder bloomTypeConfigValue = new StringBuilder();
		Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
		int i = 0;
		for (HColumnDescriptor familyDescriptor : families) {
			if (i++ > 0) {
				bloomTypeConfigValue.append('&');
			}
			bloomTypeConfigValue.append(URLEncoder.encode(
					familyDescriptor.getNameAsString(), "UTF-8"));
			bloomTypeConfigValue.append('=');
			String bloomType = familyDescriptor.getBloomFilterType().toString();
			if (bloomType == null) {
				bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
			}
			bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
		}
		conf.set(BLOOM_TYPE_CONF_KEY, bloomTypeConfigValue.toString());
	}

	// 新增了ignore()
	@SuppressWarnings("deprecation")
	public boolean ignore(KeyValue kv) {
		boolean ignore = Bytes.toString(kv.getValue()).indexOf("Del") >= 0;
		return ignore;
	}

	// 新增了getDeleteRowKeyPath()
	public static Path getDeleteRowKeyPath(Path outputPath) {
		return new Path(outputPath + HFileOutputFormatBase._deleteRowkey);
	}

	// 新增了getDeleteRowKeyFile()
	public static Path getDeleteRowKeyFile(Path outputPath) {
		return new Path(getDeleteRowKeyPath(outputPath) + "/"
				+ UUID.randomUUID().toString());
	}
}

 

data.txt文件的内容如下:

6222020405006,typeA,100000,201408081225,2000
6222020405006,typeA,100000,201408112351,1000
6222020405006,typeA,100000,201408140739,4000
6222020405008,typeB,50000,201408150932,5000
6222020405009,typeC,30000,201408181212,10000

 

在这里解说一下:HFileOutputFormatBase.java是重写了hbase-server-0.96.2-hadoop2.jar里面的HFileOutputFormat2.java文件,在File2HFile2HBase.java 里方法 getJob()里return前一行:

HFileOutputFormatBase.configureIncrementalLoad(job, htable, HFileOutputFormatBase.class);

刚开始写了HFileOutputFormat.configureIncrementalLoad(job, htable);但是老是报如下错误:

2014-08-26 22:31:47,183 INFO  [main] example.File2HFile2HBase (File2HFile2HBase.java:run(61)) - Usage command args: [hdfs://master129:9000/test/input/data.txt, hdfs://master129:9000/test/output]
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/app/lib/slf4j-log4j12-1.6.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/app/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
2014-08-26 22:31:51,256 INFO  [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT
2014-08-26 22:31:51,258 INFO  [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:host.name=LiuYQ-PC
2014-08-26 22:31:51,258 INFO  [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.version=1.6.0_45
2014-08-26 22:31:51,258 INFO  [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.vendor=Sun Microsystems Inc.
2014-08-26 22:31:51,261 INFO  [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.home=C:\Program Files\Java\jdk1.6.0_45\jre
2014-08-26 22:31:51,261 INFO  [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.class.path=E:\workspace\.metadata\.plugins\org.apache.hadoop.eclipse\hadoop-conf-2965312243669964400;E:\workspace\hadoop_study\bin;D:\app\lib\activation-1.1.jar;D:\app\lib\annotations-api.jar;D:\app\lib\aopalliance-1.0.jar;D:\app\lib\asm-3.1.jar;D:\app\lib\asm-3.2.jar;D:\app\lib\avro-1.7.4.jar;D:\app\lib\catalina.jar;D:\app\lib\catalina-ant.jar;D:\app\lib\catalina-ha.jar;D:\app\lib\catalina-tribes.jar;D:\app\lib\commons-beanutils-1.7.0.jar;D:\app\lib\commons-beanutils-core-1.8.0.jar;D:\app\lib\commons-cli-1.2.jar;D:\app\lib\commons-codec-1.4.jar;D:\app\lib\commons-codec-1.7.jar;D:\app\lib\commons-collections-3.2.1.jar;D:\app\lib\commons-compress-1.4.1.jar;D:\app\lib\commons-configuration-1.6.jar;D:\app\lib\commons-daemon-1.0.13.jar;D:\app\lib\commons-digester-1.8.jar;D:\app\lib\commons-el-1.0.jar;D:\app\lib\commons-httpclient-3.1.jar;D:\app\lib\commons-io-2.1.jar;D:\app\lib\commons-io-2.4.jar;D:\app\lib\commons-lang-2.5.jar;D:\app\lib\commons-lang-2.6.jar;D:\app\lib\commons-logging-1.1.1.jar;D:\app\lib\commons-math-2.1.jar;D:\app\lib\commons-net-3.1.jar;D:\app\lib\ecj-3.7.2.jar;D:\app\lib\el-api.jar;D:\app\lib\findbugs-annotations-1.3.9-1.jar;D:\app\lib\gmbal-api-only-3.0.0-b023.jar;D:\app\lib\grizzly-framework-2.1.2.jar;D:\app\lib\grizzly-http-2.1.2.jar;D:\app\lib\grizzly-http-server-2.1.2.jar;D:\app\lib\grizzly-http-servlet-2.1.2.jar;D:\app\lib\grizzly-rcm-2.1.2.jar;D:\app\lib\guava-11.0.2.jar;D:\app\lib\guava-12.0.1.jar;D:\app\lib\guice-3.0.jar;D:\app\lib\guice-servlet-3.0.jar;D:\app\lib\hadoop-annotations-2.2.0.jar;D:\app\lib\hadoop-archives-2.2.0.jar;D:\app\lib\hadoop-auth-2.2.0.jar;D:\app\lib\hadoop-client-2.2.0.jar;D:\app\lib\hadoop-common-2.2.0.jar;D:\app\lib\hadoop-common-2.2.0-tests.jar;D:\app\lib\hadoop-datajoin-2.2.0.jar;D:\app\lib\hadoop-distcp-2.2.0.jar;D:\app\lib\hadoop-extras-2.2.0.jar;D:\app\lib\hadoop-gridmix-2.2.0.jar;D:\app\lib\hadoop-hdfs-2.2.0.jar;D:\app\lib\hadoop-hdfs-2.2.0-tests.jar;D:\app\lib\hadoop-hdfs-nfs-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-app-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-common-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-core-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-hs-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-hs-plugins-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-jobclient-2.2.0.jar;D:\app\lib\hadoop-mapreduce-client-jobclient-2.2.0-tests.jar;D:\app\lib\hadoop-mapreduce-client-shuffle-2.2.0.jar;D:\app\lib\hadoop-mapreduce-examples-2.2.0.jar;D:\app\lib\hadoop-nfs-2.2.0.jar;D:\app\lib\hadoop-rumen-2.2.0.jar;D:\app\lib\hadoop-streaming-2.2.0.jar;D:\app\lib\hadoop-yarn-api-2.2.0.jar;D:\app\lib\hadoop-yarn-applications-distributedshell-2.2.0.jar;D:\app\lib\hadoop-yarn-applications-unmanaged-am-launcher-2.2.0.jar;D:\app\lib\hadoop-yarn-client-2.2.0.jar;D:\app\lib\hadoop-yarn-common-2.2.0.jar;D:\app\lib\hadoop-yarn-server-common-2.2.0.jar;D:\app\lib\hadoop-yarn-server-nodemanager-2.2.0.jar;D:\app\lib\hadoop-yarn-server-resourcemanager-2.2.0.jar;D:\app\lib\hadoop-yarn-server-tests-2.2.0.jar;D:\app\lib\hadoop-yarn-server-tests-2.2.0-tests.jar;D:\app\lib\hadoop-yarn-server-web-proxy-2.2.0.jar;D:\app\lib\hadoop-yarn-site-2.2.0.jar;D:\app\lib\hamcrest-core-1.1.jar;D:\app\lib\hamcrest-core-1.3.jar;D:\app\lib\hbase-client-0.96.2-hadoop2.jar;D:\app\lib\hbase-common-0.96.2-hadoop2.jar;D:\app\lib\hbase-common-0.96.2-hadoop2-tests.jar;D:\app\lib\hbase-examples-0.96.2-hadoop2.jar;D:\app\lib\hbase-hadoop2-compat-0.96.2-hadoop2.jar;D:\app\lib\hbase-hadoop-compat-0.96.2-hadoop2.jar;D:\app\lib\hbase-it-0.96.2-hadoop2.jar;D:\app\lib\hbase-it-0.96.2-hadoop2-tests.jar;D:\app\lib\hbase-prefix-tree-0.96.2-hadoop2.jar;D:\app\lib\hbase-protocol-0.96.2-hadoop2.jar;D:\app\lib\hbase-server-0.96.2-hadoop2.jar;D:\app\lib\hbase-server-0.96.2-hadoop2-tests.jar;D:\app\lib\hbase-shell-0.96.2-hadoop2.jar;D:\app\lib\hbase-testing-util-0.96.2-hadoop2.jar;D:\app\lib\hbase-thrift-0.96.2-hadoop2.jar;D:\app\lib\hsqldb-2.0.0.jar;D:\app\lib\htrace-core-2.04.jar;D:\app\lib\httpclient-4.1.3.jar;D:\app\lib\httpcore-4.1.3.jar;D:\app\lib\jackson-core-asl-1.8.8.jar;D:\app\lib\jackson-jaxrs-1.8.8.jar;D:\app\lib\jackson-mapper-asl-1.8.8.jar;D:\app\lib\jackson-xc-1.8.8.jar;D:\app\lib\jamon-runtime-2.3.1.jar;D:\app\lib\jasper.jar;D:\app\lib\jasper-compiler-5.5.23.jar;D:\app\lib\jasper-el.jar;D:\app\lib\jasper-runtime-5.5.23.jar;D:\app\lib\javax.inject-1.jar;D:\app\lib\javax.servlet-3.1.jar;D:\app\lib\javax.servlet-api-3.0.1.jar;D:\app\lib\jaxb-api-2.2.2.jar;D:\app\lib\jaxb-impl-2.2.3-1.jar;D:\app\lib\jersey-client-1.9.jar;D:\app\lib\jersey-core-1.8.jar;D:\app\lib\jersey-core-1.9.jar;D:\app\lib\jersey-grizzly2-1.9.jar;D:\app\lib\jersey-guice-1.9.jar;D:\app\lib\jersey-json-1.8.jar;D:\app\lib\jersey-json-1.9.jar;D:\app\lib\jersey-server-1.8.jar;D:\app\lib\jersey-server-1.9.jar;D:\app\lib\jersey-test-framework-core-1.9.jar;D:\app\lib\jersey-test-framework-grizzly2-1.9.jar;D:\app\lib\jets3t-0.6.1.jar;D:\app\lib\jettison-1.1.jar;D:\app\lib\jettison-1.3.1.jar;D:\app\lib\jetty-6.1.26.jar;D:\app\lib\jetty-sslengine-6.1.26.jar;D:\app\lib\jetty-util-6.1.26.jar;D:\app\lib\jruby-complete-1.6.8.jar;D:\app\lib\jsch-0.1.42.jar;D:\app\lib\jsp-2.1-6.1.14.jar;D:\app\lib\jsp-api.jar;D:\app\lib\jsp-api-2.1.jar;D:\app\lib\jsp-api-2.1-6.1.14.jar;D:\app\lib\jsr305-1.3.9.jar;D:\app\lib\junit-4.8.2.jar;D:\app\lib\junit-4.10.jar;D:\app\lib\junit-4.11.jar;D:\app\lib\libthrift-0.9.0.jar;D:\app\lib\log4j-1.2.17.jar;D:\app\lib\management-api-3.0.0-b012.jar;D:\app\lib\metrics-core-2.1.2.jar;D:\app\lib\mockito-all-1.8.5.jar;D:\app\lib\netty-3.6.2.Final.jar;D:\app\lib\netty-3.6.6.Final.jar;D:\app\lib\paranamer-2.3.jar;D:\app\lib\protobuf-java-2.5.0.jar;D:\app\lib\servlet-api.jar;D:\app\lib\servlet-api-2.5.jar;D:\app\lib\servlet-api-2.5-6.1.14.jar;D:\app\lib\slf4j-api-1.6.4.jar;D:\app\lib\slf4j-api-1.7.5.jar;D:\app\lib\slf4j-log4j12-1.6.4.jar;D:\app\lib\slf4j-log4j12-1.7.5.jar;D:\app\lib\snappy-java-1.0.4.1.jar;D:\app\lib\stax-api-1.0.1.jar;D:\app\lib\tomcat-coyote.jar;D:\app\lib\tomcat-dbcp.jar;D:\app\lib\tomcat-i18n-es.jar;D:\app\lib\tomcat-i18n-fr.jar;D:\app\lib\tomcat-i18n-ja.jar;D:\app\lib\xmlenc-0.52.jar;D:\app\lib\xz-1.0.jar;D:\app\lib\zookeeper-3.4.5.jar
2014-08-26 22:31:51,261 INFO  [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.library.path=C:\Program Files\Java\jdk1.6.0_45\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;C:/Program Files (x86)/Java/jdk1.6.0_35/bin/../jre/bin/client;C:/Program Files (x86)/Java/jdk1.6.0_35/bin/../jre/bin;C:/Program Files (x86)/Java/jdk1.6.0_35/bin/../jre/lib/i386;D:\app\hadoop-2.2.0\bin;C:\Program Files (x86)\Java\jdk1.6.0_35\bin;C:\Program Files (x86)\NVIDIA Corporation\PhysX\Common;D:\app\Oracle11g\product\11.1.0\db_1\bin;C:\Program Files (x86)\Intel\iCLS Client\;C:\Program Files\Intel\iCLS Client\;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;C:\Program Files\Intel\WiFi\bin\;C:\Program Files\Common Files\Intel\WirelessCommon\;C:\Program Files\Intel\Intel(R) Management Engine Components\DAL;C:\Program Files\Intel\Intel(R) Management Engine Components\IPT;C:\Program Files (x86)\Intel\Intel(R) Management Engine Components\DAL;C:\Program Files (x86)\Intel\Intel(R) Management Engine Components\IPT;D:\Program Files\TortoiseSVN\bin;D:\Program Files\MySQL\MySQL Server 5.5\bin;C:\Program Files (x86)\Intel\OpenCL SDK\3.0\bin\x86;C:\Program Files (x86)\Intel\OpenCL SDK\3.0\bin\x64;C:\Program Files\Intel\WiFi\bin\;C:\Program Files\Common Files\Intel\WirelessCommon\;D:\Program Files\SSH Communications Security\SSH Secure Shell;.;;D:\app\eclipse;;.
2014-08-26 22:31:51,261 INFO  [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.io.tmpdir=C:\Users\LiuYQ\AppData\Local\Temp\
2014-08-26 22:31:51,261 INFO  [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.compiler=<NA>
2014-08-26 22:31:51,263 INFO  [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:os.name=Windows 7
2014-08-26 22:31:51,263 INFO  [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:os.arch=amd64
2014-08-26 22:31:51,264 INFO  [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:os.version=6.1
2014-08-26 22:31:51,264 INFO  [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:user.name=LiuYQ
2014-08-26 22:31:51,264 INFO  [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:user.home=C:\Users\LiuYQ
2014-08-26 22:31:51,264 INFO  [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:user.dir=E:\workspace\hadoop_study
2014-08-26 22:31:51,272 INFO  [main] zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString=slave130:2181,master129:2181,slave132:2181,slave131:2181 sessionTimeout=180000 watcher=hconnection-0x76b20352, quorum=slave130:2181,master129:2181,slave132:2181,slave131:2181, baseZNode=/hbase
2014-08-26 22:31:51,389 INFO  [main] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(120)) - Process identifier=hconnection-0x76b20352 connecting to ZooKeeper ensemble=slave130:2181,master129:2181,slave132:2181,slave131:2181
2014-08-26 22:31:51,404 INFO  [main-SendThread(master129:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(966)) - Opening socket connection to server master129/192.168.24.129:2181. Will not attempt to authenticate using SASL (无法定位登录配置)
2014-08-26 22:31:51,419 INFO  [main-SendThread(master129:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(849)) - Socket connection established to master129/192.168.24.129:2181, initiating session
2014-08-26 22:31:51,464 INFO  [main-SendThread(master129:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1207)) - Session establishment complete on server master129/192.168.24.129:2181, sessionid = 0x48129e3b750006, negotiated timeout = 150000
2014-08-26 22:31:53,018 INFO  [main] mapreduce.HFileOutputFormat2 (HFileOutputFormat2.java:configureIncrementalLoad(366)) - Looking up current regions for table testtable1
2014-08-26 22:31:53,046 INFO  [main] mapreduce.HFileOutputFormat2 (HFileOutputFormat2.java:configureIncrementalLoad(368)) - Configuring 1 reduce partitions to match current region count
2014-08-26 22:31:53,164 INFO  [main] mapreduce.HFileOutputFormat2 (HFileOutputFormat2.java:writePartitions(287)) - Writing partition information to /tmp/partitions_8f008fe0-9170-48ac-940a-83c2813f1378
2014-08-26 22:31:53,356 WARN  [main] zlib.ZlibFactory (ZlibFactory.java:<clinit>(50)) - Failed to load/initialize native-zlib library
2014-08-26 22:31:53,359 INFO  [main] compress.CodecPool (CodecPool.java:getCompressor(150)) - Got brand-new compressor [.deflate]
2014-08-26 22:31:53,904 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - hadoop.native.lib is deprecated. Instead, use io.native.lib.available
2014-08-26 22:31:53,964 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - hadoop.native.lib is deprecated. Instead, use io.native.lib.available
2014-08-26 22:31:54,006 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - hadoop.native.lib is deprecated. Instead, use io.native.lib.available
2014-08-26 22:31:54,059 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - hadoop.native.lib is deprecated. Instead, use io.native.lib.available
2014-08-26 22:31:54,110 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - hadoop.native.lib is deprecated. Instead, use io.native.lib.available
2014-08-26 22:31:54,310 INFO  [main] mapreduce.HFileOutputFormat2 (HFileOutputFormat2.java:configureIncrementalLoad(380)) - Incremental table testtable1 output configured.
2014-08-26 22:31:54,362 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - session.id is deprecated. Instead, use dfs.metrics.session-id
2014-08-26 22:31:54,365 INFO  [main] jvm.JvmMetrics (JvmMetrics.java:init(76)) - Initializing JVM Metrics with processName=JobTracker, sessionId=
2014-08-26 22:31:54,998 WARN  [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(149)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2014-08-26 22:31:55,083 INFO  [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(439)) - Cleaning up the staging area file:/tmp/hadoop-LiuYQ/mapred/staging/hadoop5469487/.staging/job_local5469487_0001
java.lang.IllegalArgumentException: Pathname /D:/app/lib/hadoop-mapreduce-client-core-2.2.0.jar from hdfs://master129:9000/D:/app/lib/hadoop-mapreduce-client-core-2.2.0.jar is not a valid DFS filename.
	at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:184)
	at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:92)
	at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106)
	at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102)
	at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:288)
	at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:224)
	at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestamps(ClientDistributedCacheManager.java:93)
	at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(ClientDistributedCacheManager.java:57)
	at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:264)
	at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:300)
	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:387)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
	at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265)
	at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1286)
	at com.lyq.study.example.File2HFile2HBase$1.run(File2HFile2HBase.java:76)
	at com.lyq.study.example.File2HFile2HBase$1.run(File2HFile2HBase.java:1)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:337)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1471)
	at com.lyq.study.example.File2HFile2HBase.run(File2HFile2HBase.java:65)
	at com.lyq.study.example.File2HFile2HBase.main(File2HFile2HBase.java:135)

 

最终重写了HFileOutputFormat2 为HFileOutputFormatBase,使用HFileOutputFormatBase,至此成功运行。

 

附件是

File2HFile2HBase.ava

HFileOutputFormatBase.java

data.txt

和源码HFileOutputFormat2.java

1
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics