`
BlackWing
  • 浏览: 196461 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

分拆TableSplit 让多个mapper同时读取

阅读更多
默认情况下,一个region是一个tableSplit,对应一个mapper进行读取,但单mapper读取速度较慢,因此想着把默认一个table split分拆成多个split,这样hadoop就能通过多个mapper读取。

由于HBase不能像hadoop一样通过以下参数调整split大小,而实现多个mapper读取
mapred.min.split.size
mapred.max.split.size


所以目前想到的方法有两种,一是修改TableInputFormatBase,把默认的一个TableSplit分拆成多个,另外一种方法是,通过Coprocessor处理。这里选择修改TableInputFormatBase类。

HBase权威指南里面有介绍怎么把HBase与MR结合,通过需要用到一下的辅助类实现把HBase表作为数据来源,读取数据:
TableMapReduceUtil.initTableMapperJob(table[0].getBytes(), scan,
					UserViewHisMapper2.class, Text.class, Text.class,
					genRecommendations);

而这个方法,最终是调用以下方法进行初始化设置的:
 public static void initTableMapperJob(byte[] table, Scan scan,
      Class<? extends TableMapper> mapper,
      Class<? extends WritableComparable> outputKeyClass,
      Class<? extends Writable> outputValueClass, Job job,
      boolean addDependencyJars)
  throws IOException {
      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
              outputValueClass, job, addDependencyJars, TableInputFormat.class);
  }


所以,思路就应该修改TableInputFormat这个类。而这个类的核心方法是继承了TableInputFormatBase:

public class TableInputFormat extends TableInputFormatBase
implements Configurable 


最终要修改的则是TableInputFormatBase这个类,修改其以下方法:

public List<InputSplit> getSplits(JobContext context) throws IOException {}


这个方法的核心是,获得table对应所有region的起始row,把每个region作为一个tableSplit:
  public List<InputSplit> getSplits(JobContext context) throws IOException {
	if (table == null) {
	    throw new IOException("No table was provided.");
	}
    Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
    if (keys == null || keys.getFirst() == null ||
        keys.getFirst().length == 0) {
      throw new IOException("Expecting at least one region.");
    }
    int count = 0;
    List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);
    for (int i = 0; i < keys.getFirst().length; i++) {
      if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
        continue;
      }
      String regionLocation = table.getRegionLocation(keys.getFirst()[i]).
        getHostname();
      byte[] startRow = scan.getStartRow();
      byte[] stopRow = scan.getStopRow();
      // determine if the given start an stop key fall into the region
      if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
           Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
          (stopRow.length == 0 ||
           Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
        byte[] splitStart = startRow.length == 0 ||
          Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
            keys.getFirst()[i] : startRow;
        byte[] splitStop = (stopRow.length == 0 ||
          Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
          keys.getSecond()[i].length > 0 ?
            keys.getSecond()[i] : stopRow;
        InputSplit split = new TableSplit(table.getTableName(),
          splitStart, splitStop, regionLocation);
        splits.add(split);
        if (LOG.isDebugEnabled())
          LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
      }
    }
    return splits;
  }


这里要做的就是,把本来属于一个tableSplit的row在细分,分成自己希望的多个小split。但没有找到轻巧的实现,唯有不断迭代,把一个tableSplit的row全部取出,再拆分了,有点蛮力。
以下是我的实现方法:

	public List<InputSplit> getSplits(JobContext context) throws IOException {
		if (table == null) {
			throw new IOException("No table was provided.");
		}
		Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
		if (keys == null || keys.getFirst() == null
				|| keys.getFirst().length == 0) {
			throw new IOException("Expecting at least one region.");
		}
		int count = 0;
		List<InputSplit> splits = new ArrayList<InputSplit>(
				keys.getFirst().length);
		for (int i = 0; i < keys.getFirst().length; i++) {
			if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
				continue;
			}
			String regionLocation = table.getRegionLocation(keys.getFirst()[i],true)
					.getHostname();
			byte[] startRow = scan.getStartRow();
			byte[] stopRow = scan.getStopRow();
			// determine if the given start an stop key fall into the region
			if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || Bytes
					.compareTo(startRow, keys.getSecond()[i]) < 0)
					&& (stopRow.length == 0 || Bytes.compareTo(stopRow,
							keys.getFirst()[i]) > 0)) {
				byte[] splitStart = startRow.length == 0
						|| Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys
						.getFirst()[i] : startRow;
				byte[] splitStop = (stopRow.length == 0 || Bytes.compareTo(
						keys.getSecond()[i], stopRow) <= 0)
						&& keys.getSecond()[i].length > 0 ? keys.getSecond()[i]
						: stopRow;

				Scan scan1 = new Scan();
				scan1.setStartRow(splitStart);
				scan1.setStopRow(splitStop);
				scan1.setFilter(new KeyOnlyFilter());
				scan1.setBatch(500);
				
				ResultScanner resultscanner = table.getScanner(scan1);
				
				//用来保存该region的所有key
				List<String> rows = new ArrayList<String>();
				//Iterator<Result>  it = resultscanner.iterator();
				
				for(Result rs : resultscanner)
				{
					if(rs.isEmpty())
						continue;
					rows.add(new String(rs.getRow()));
				}
				
				int splitSize = rows.size() / mappersPerSplit;
				
				for (int j = 0; j < mappersPerSplit; j++) {
					TableSplit tablesplit = null;
					if (j == mappersPerSplit - 1)
						tablesplit = new TableSplit(table.getTableName(),
								rows.get(j * splitSize).getBytes(),
								rows.get(rows.size() - 1).getBytes(),
								regionLocation);
					else
						tablesplit = new TableSplit(table.getTableName(),
								rows.get(j * splitSize).getBytes(),
								rows.get(j * splitSize + splitSize).getBytes(), regionLocation);
					splits.add(tablesplit);
					if (LOG.isDebugEnabled())
						LOG.debug((new StringBuilder())
								.append("getSplits: split -> ").append(i++)
								.append(" -> ").append(tablesplit).toString());
				}
				resultscanner.close();				
			}
		}
		return splits;
	}


通过配置设置需要拆分的split数。




分享到:
评论
2 楼 BlackWing 2013-03-26  
我的是可以拆分,表的数据只有一个region,多region情况下没有测试。

chenbaohua518 写道
你好!
我用你的代码貌似跑出来还是分不了每个region啊?
请问你自己测过这段代码吗?
感谢你的分享!

1 楼 chenbaohua518 2013-03-25  
你好!
我用你的代码貌似跑出来还是分不了每个region啊?
请问你自己测过这段代码吗?
感谢你的分享!

相关推荐

Global site tag (gtag.js) - Google Analytics