- 浏览: 196461 次
- 性别:
- 来自: 广州
文章分类
最新评论
-
永立s:
这篇博客帮我解决了一个问题,十分感谢.
HBase表增加snappy压缩 -
BlackWing:
日志是job运行日志,看你怎么配置了,一般就在hadoop安装 ...
解决Exception from container-launch: ExitCodeException exitCode=1的另类错误 -
heymaomao:
heymaomao 写道有两个问题,想请教下楼主 第一是日志楼 ...
解决Exception from container-launch: ExitCodeException exitCode=1的另类错误 -
heymaomao:
有两个问题,想请教下楼主 第一是日志楼主到底看的是哪个日志文件 ...
解决Exception from container-launch: ExitCodeException exitCode=1的另类错误 -
atomduan:
本地的Unix 进程创建失败,检查下服务器内存是否够用,是不是 ...
解决Exception from container-launch: ExitCodeException exitCode=1的另类错误
默认情况下,一个region是一个tableSplit,对应一个mapper进行读取,但单mapper读取速度较慢,因此想着把默认一个table split分拆成多个split,这样hadoop就能通过多个mapper读取。
由于HBase不能像hadoop一样通过以下参数调整split大小,而实现多个mapper读取
所以目前想到的方法有两种,一是修改TableInputFormatBase,把默认的一个TableSplit分拆成多个,另外一种方法是,通过Coprocessor处理。这里选择修改TableInputFormatBase类。
HBase权威指南里面有介绍怎么把HBase与MR结合,通过需要用到一下的辅助类实现把HBase表作为数据来源,读取数据:
而这个方法,最终是调用以下方法进行初始化设置的:
所以,思路就应该修改TableInputFormat这个类。而这个类的核心方法是继承了TableInputFormatBase:
最终要修改的则是TableInputFormatBase这个类,修改其以下方法:
这个方法的核心是,获得table对应所有region的起始row,把每个region作为一个tableSplit:
这里要做的就是,把本来属于一个tableSplit的row在细分,分成自己希望的多个小split。但没有找到轻巧的实现,唯有不断迭代,把一个tableSplit的row全部取出,再拆分了,有点蛮力。
以下是我的实现方法:
通过配置设置需要拆分的split数。
由于HBase不能像hadoop一样通过以下参数调整split大小,而实现多个mapper读取
mapred.min.split.size mapred.max.split.size
所以目前想到的方法有两种,一是修改TableInputFormatBase,把默认的一个TableSplit分拆成多个,另外一种方法是,通过Coprocessor处理。这里选择修改TableInputFormatBase类。
HBase权威指南里面有介绍怎么把HBase与MR结合,通过需要用到一下的辅助类实现把HBase表作为数据来源,读取数据:
TableMapReduceUtil.initTableMapperJob(table[0].getBytes(), scan, UserViewHisMapper2.class, Text.class, Text.class, genRecommendations);
而这个方法,最终是调用以下方法进行初始化设置的:
public static void initTableMapperJob(byte[] table, Scan scan, Class<? extends TableMapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job, boolean addDependencyJars) throws IOException { initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, job, addDependencyJars, TableInputFormat.class); }
所以,思路就应该修改TableInputFormat这个类。而这个类的核心方法是继承了TableInputFormatBase:
public class TableInputFormat extends TableInputFormatBase implements Configurable
最终要修改的则是TableInputFormatBase这个类,修改其以下方法:
public List<InputSplit> getSplits(JobContext context) throws IOException {}
这个方法的核心是,获得table对应所有region的起始row,把每个region作为一个tableSplit:
public List<InputSplit> getSplits(JobContext context) throws IOException { if (table == null) { throw new IOException("No table was provided."); } Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { throw new IOException("Expecting at least one region."); } int count = 0; List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length); for (int i = 0; i < keys.getFirst().length; i++) { if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { continue; } String regionLocation = table.getRegionLocation(keys.getFirst()[i]). getHostname(); byte[] startRow = scan.getStartRow(); byte[] stopRow = scan.getStopRow(); // determine if the given start an stop key fall into the region if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && (stopRow.length == 0 || Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { byte[] splitStart = startRow.length == 0 || Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys.getFirst()[i] : startRow; byte[] splitStop = (stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && keys.getSecond()[i].length > 0 ? keys.getSecond()[i] : stopRow; InputSplit split = new TableSplit(table.getTableName(), splitStart, splitStop, regionLocation); splits.add(split); if (LOG.isDebugEnabled()) LOG.debug("getSplits: split -> " + (count++) + " -> " + split); } } return splits; }
这里要做的就是,把本来属于一个tableSplit的row在细分,分成自己希望的多个小split。但没有找到轻巧的实现,唯有不断迭代,把一个tableSplit的row全部取出,再拆分了,有点蛮力。
以下是我的实现方法:
public List<InputSplit> getSplits(JobContext context) throws IOException { if (table == null) { throw new IOException("No table was provided."); } Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { throw new IOException("Expecting at least one region."); } int count = 0; List<InputSplit> splits = new ArrayList<InputSplit>( keys.getFirst().length); for (int i = 0; i < keys.getFirst().length; i++) { if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { continue; } String regionLocation = table.getRegionLocation(keys.getFirst()[i],true) .getHostname(); byte[] startRow = scan.getStartRow(); byte[] stopRow = scan.getStopRow(); // determine if the given start an stop key fall into the region if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || Bytes .compareTo(startRow, keys.getSecond()[i]) < 0) && (stopRow.length == 0 || Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { byte[] splitStart = startRow.length == 0 || Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys .getFirst()[i] : startRow; byte[] splitStop = (stopRow.length == 0 || Bytes.compareTo( keys.getSecond()[i], stopRow) <= 0) && keys.getSecond()[i].length > 0 ? keys.getSecond()[i] : stopRow; Scan scan1 = new Scan(); scan1.setStartRow(splitStart); scan1.setStopRow(splitStop); scan1.setFilter(new KeyOnlyFilter()); scan1.setBatch(500); ResultScanner resultscanner = table.getScanner(scan1); //用来保存该region的所有key List<String> rows = new ArrayList<String>(); //Iterator<Result> it = resultscanner.iterator(); for(Result rs : resultscanner) { if(rs.isEmpty()) continue; rows.add(new String(rs.getRow())); } int splitSize = rows.size() / mappersPerSplit; for (int j = 0; j < mappersPerSplit; j++) { TableSplit tablesplit = null; if (j == mappersPerSplit - 1) tablesplit = new TableSplit(table.getTableName(), rows.get(j * splitSize).getBytes(), rows.get(rows.size() - 1).getBytes(), regionLocation); else tablesplit = new TableSplit(table.getTableName(), rows.get(j * splitSize).getBytes(), rows.get(j * splitSize + splitSize).getBytes(), regionLocation); splits.add(tablesplit); if (LOG.isDebugEnabled()) LOG.debug((new StringBuilder()) .append("getSplits: split -> ").append(i++) .append(" -> ").append(tablesplit).toString()); } resultscanner.close(); } } return splits; }
通过配置设置需要拆分的split数。
评论
2 楼
BlackWing
2013-03-26
我的是可以拆分,表的数据只有一个region,多region情况下没有测试。
chenbaohua518 写道
你好!
我用你的代码貌似跑出来还是分不了每个region啊?
请问你自己测过这段代码吗?
感谢你的分享!
我用你的代码貌似跑出来还是分不了每个region啊?
请问你自己测过这段代码吗?
感谢你的分享!
1 楼
chenbaohua518
2013-03-25
你好!
我用你的代码貌似跑出来还是分不了每个region啊?
请问你自己测过这段代码吗?
感谢你的分享!
我用你的代码貌似跑出来还是分不了每个region啊?
请问你自己测过这段代码吗?
感谢你的分享!
发表评论
-
新版hadoop MultipleOutputs多文件输出
2015-03-11 14:22 3472转载请标明出处:http://blackwing.iteye. ... -
解决直接读HFile时因表数据写入而导致文件目录变化问题
2015-03-02 18:22 1491转载请标明出处:http://blackwing.iteye. ... -
解决Exception from container-launch: ExitCodeException exitCode=1的另类错误
2014-08-21 18:18 23658转载请标明出处:http://blackwing.iteye. ... -
LoadIncrementalHFiles是copy而不是move的疑惑
2013-12-19 10:57 4062转载请标明出处:http://blackwing.iteye. ... -
Hadoop生成HFile直接入库HBase心得
2013-12-18 16:15 5197转载请标明出处:http://blackwing.iteye. ... -
NullPointerException SerializationFactory.getSerializer解决
2013-12-04 17:30 1587转载请标明出处:http://blackwing.iteye. ... -
Hadoop的Text类getBytes字节数据put到HBase后有多余字符串问题
2013-11-21 15:53 2099转载请标明出处:http://blackwing.iteye. ... -
编译YCSB 解决Not a host:port pair问题
2013-09-18 17:25 1929转载请标明出处:http://blackwing.iteye. ... -
HBase使用SNAPPY压缩遇到compression test fail问题解决
2013-09-18 10:51 10782转载请标明出处:http://blackwing.iteye. ... -
HBase表增加snappy压缩
2013-09-13 17:54 4292转载请标明来源:http://blackwing.iteye. ... -
hadoop 1.0.3增加snappy压缩
2013-09-11 17:27 1767转载请标明来源:http://blackwing.iteye. ... -
把hadoop的metrics加入ganglia监控
2013-09-04 17:02 1545hadoop的metrics加入ganglia其实是很简单的, ... -
ROOT不在线的另外一种原因及解决办法
2013-07-29 14:28 1460转载请声明出处:http://blackwing.iteye. ... -
enable和disable表时出现表未disable/enable异常处理
2013-07-29 11:42 4893转载请标明出处:http://blackwing.iteye. ... -
MultithreadedMapper多线程读取数据
2013-04-27 15:51 0由于当前业务需求读取HBase表时,会存在数据倾斜,大部分数据 ... -
shuffle & sort解释
2013-04-16 17:31 1209转载请标明出处:http://blackwing.iteye. ... -
hadoop的 IncompatibleClassChangeError
2013-02-06 17:26 2048开发环境中,使用的是官方版的hadoop 1.0.1版,而集群 ... -
HBase的start key和end key疑惑
2013-02-05 15:57 4858转载请标明来源:http://blackwing.iteye. ... -
HBase的coprocessor分拆HRegion
2013-02-04 15:15 3269引用转载请注明出处,文章链接:http://blackwing ... -
GET查询HBase无结果时 Result的size也不为空
2012-11-28 11:15 2030用Get查询hbase某个row时,就算该row不存在,但还是 ...
相关推荐
FC的ROM独有的mapper使得制作卡带比较麻烦, 通常mapper4时最常用的mapper, 此教程讲解如何将mapper0, mapper1, mapper2, mapper3,mapper23(VRC2)转换为mapper4
mapper.py"""import sys# input comes from STDIN (standard input)for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words words = ...
这个是我今天学的一个Java里面mybatis的主配置中的其中一个包,由于软件问题,只可以传一个,这是接上一个config.dtd的mapper.dtd,这个下载了,记得解压,这个是mapper.dtd还有一个config,等会发下一个教程和代码,...
springboot + mybatis(通用mapper) + druid多数据源
输入文件和输出文件名分别为为:humpName.txt(注意,格式为:每行一个字符,空格换行),humpTool.txt,默认在E盘根目录,humpTool.jar也默认E盘根目录, 然后在windows命令提示符中输入(快捷键:windows键 + R,...
Global Mapper软件操作教程.pdfGlobal Mapper软件操作教程.pdfGlobal Mapper软件操作教程.pdfGlobal Mapper软件操作教程.pdfGlobal Mapper软件操作教程.pdfGlobal Mapper软件操作教程.pdf
GlobalMapper17补丁,不懂E文,压缩包内有一个global_mapper14.1.0汉化资源的电子表格文件,希望懂的朋友在此基础上完成汉化。
<tk-mapper.version>4.1.5</tk-mapper.version> <pagehelper.version>1.2.10 <!-- Mybatis通用Mapper --> <groupId>tk.mybatis <artifactId>mapper ${tk-mapper.version} ...
global mapper的使用教程,很不错的。也正在学习中
Mybatis Mapper.xml中字符串形式传参,逗号分隔 AND中拼接OR。
尚硅谷-通用Mapper视频 帮助你快速理解映射文件的使用
Global Mapper系列教程
Mapper用于映射SQL语句,可以说是MyBatis操作数据库的核心特性之一,这里我们来讨论Java的MyBatis框架中Mapper映射配置的使用及原理解析,包括对mapper的xml配置文件的读取流程解读.
TKmybatis提供一种通用mapper,它不需要针对每个mapper写sql语句,只需继承通用mapper调用相应的接口来完成单表的增删改查。
FieldMapper用户手册
dao和mapper 抛弃了mybatis插件生成的一个POJO一个mapper 很冗余的数据,这里直接生成增删改查3个dao/mapper对照,依托于IDEA-DataBase-Tools能够直接选取需要生成POJOS 表模型的选择器。可以很方便的生成pojo/dao/...
mybatis自动生成实体类和mapper文件。自动添加实体类注释。
Global_Mapper中文教程 学习使用Global_Mapper软件
globalmapper操作手册(上帝之眼Global Mapper系列教程开课啦,本系列教程由风侠老师主讲,坐沙发蹲地板的同Global Mapper 是一款地图绘制软件,不仅能够将数据(例如:SRTM数据)显示为光栅地图、高程地图、矢量...
GlobalMapper14安装包