`
cp1985chenpeng
  • 浏览: 43838 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

hadoop-Streaming 整合hbase执行C代码实例

阅读更多
 

这两天一直在弄hadoop-streaming整合hbase执行C语言的问题。

昨天在Eclipse上自己写了一个TextTableInputFormat类继承了InputFormat类去解析hbase,然后通过自己写的mapreduce测试没有问题。

下面这个类是分析InputFormat  Hbase的基类。

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableInputFormat;
import org.apache.hadoop.hbase.mapred.TableSplit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.RecordReader;

public abstract class TextTableInputFormatBase implements
        InputFormat<Text, Text>, Configurable {

    final Log LOG = LogFactory.getLog(TextTableInputFormatBase.class);

    /** Holds the details for the internal scanner. */
    private Scan scan = null;
    /** The table to scan. */
    private HTable table = null;

    public InputSplit[] getSplits(JobConf job, int numSplits)
            throws IOException {
        if (table == null) {
            throw new IOException("No table was provided.");
        }
        Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
        if (keys == null || keys.getFirst() == null
                || keys.getFirst().length == 0) {
            throw new IOException("Expecting at least one region.");
        }
        int count = 0;
        InputSplit[] splits = new InputSplit[keys.getFirst().length];
        for (int i = 0; i < keys.getFirst().length; i++) {
            if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
                continue;
            }
            String regionLocation = table.getRegionLocation(keys.getFirst()[i])
                    .getServerAddress().getHostname();
            byte[] startRow = scan.getStartRow();
            byte[] stopRow = scan.getStopRow();
            // determine if the given start an stop key fall into the region
            if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || Bytes
                    .compareTo(startRow, keys.getSecond()[i]) < 0)
                    && (stopRow.length == 0 || Bytes.compareTo(stopRow,
                            keys.getFirst()[i]) > 0)) {
                byte[] splitStart = startRow.length == 0
                        || Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys
                        .getFirst()[i] : startRow;
                byte[] splitStop = (stopRow.length == 0 || Bytes.compareTo(
                        keys.getSecond()[i], stopRow) <= 0)
                        && keys.getSecond()[i].length > 0 ? keys.getSecond()[i]
                        : stopRow;
                InputSplit split = new TableSplit(table.getTableName(),
                        splitStart, splitStop, regionLocation);
                splits[i] = split;
                if (LOG.isDebugEnabled())
                    LOG.debug("getSplits: split -> " + (count++) + " -> "
                            + split);
            }
        }
        return splits;
    }

    public RecordReader<Text, Text> getRecordReader(InputSplit split, JobConf job,
            Reporter reporter) throws IOException {
        if (table == null) {
            throw new IOException(
                    "Cannot create a record reader because of a"
                            + " previous error. Please look at the previous logs lines from"
                            + " the task's full log for more details.");
        }
        TableInputFormat inputFormat = new TableInputFormat();
        return new TextTableRecordReader(inputFormat.getRecordReader(split, job, reporter));
    }

    protected boolean includeRegionInSplit(final byte[] startKey,
            final byte[] endKey) {
        return true;
    }

    protected HTable getHTable() {
        return this.table;
    }

    protected void setHTable(HTable table) {
        this.table = table;
    }

    public Scan getScan() {
        if (this.scan == null)
            this.scan = new Scan();
        return scan;
    }

    public void setScan(Scan scan) {
        this.scan = scan;
    }

    public abstract String formatRowResult(Result row);

    public class TextTableRecordReader implements RecordReader<Text, Text> {

        private RecordReader<ImmutableBytesWritable, Result> tableRecordReader;

        public TextTableRecordReader(RecordReader<ImmutableBytesWritable, Result> reader) {
            tableRecordReader = reader;
        }

        public void close() throws IOException {
            tableRecordReader.close();
        }

        public Text createKey() {
            return new Text("");
        }

        public Text createValue() {
            return new Text("");
        }

        public long getPos() throws IOException {
            return tableRecordReader.getPos();
        }

        public float getProgress() throws IOException {
            return tableRecordReader.getProgress();
        }

        public boolean next(Text key, Text value) throws IOException {
            Result row = new Result();
            boolean hasNext = tableRecordReader.next(new ImmutableBytesWritable(key.getBytes()), row);
            if (hasNext) {
                key.set(row.getRow());
                value.set(formatRowResult(row));
            }
            return hasNext;
        }
    }
}

 

 

这个类主要是用来逐行获取hbase信息的。

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.StringUtils;

public class StringTableInputFormat extends TextTableInputFormatBase {

    /** Job parameter that specifies the input table. */
    public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
    /**
     * Base-64 encoded scanner. All other SCAN_ confs are ignored if this is
     * specified. See {@link TableMapReduceUtil#convertScanToString(Scan)} for
     * more details.
     */
    public static final String SCAN = "hbase.mapreduce.scan";
    /** Column Family to Scan */
    public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
    /** Space delimited list of columns to scan. */
    public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
    /** The timestamp used to filter columns with a specific timestamp. */
    public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
    /**
     * The starting timestamp used to filter columns with a specific range of
     * versions.
     */
    public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
    /**
     * The ending timestamp used to filter columns with a specific range of
     * versions.
     */
    public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
    /** The maximum number of version to return. */
    public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
    /** Set to false to disable server-side caching of blocks for this scan. */
    public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
    /** The number of rows for caching that will be passed to scanners. */
    public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";

    /** The configuration. */
    private Configuration conf = null;

    @Override
    public Configuration getConf() {
        return conf;
    }
   
    Scan convertStringToScan(String base64) throws IOException {
        ByteArrayInputStream bis = new ByteArrayInputStream(Base64.decode(base64));
        DataInputStream dis = new DataInputStream(bis);
        Scan scan = new Scan();
        scan.readFields(dis);
        return scan;
      }

    @Override
    public void setConf(Configuration configuration) {
        this.conf = configuration;
        String tableName = conf.get(INPUT_TABLE);
        try {
            setHTable(new HTable(new Configuration(conf), tableName));
        } catch (Exception e) {
            LOG.error(StringUtils.stringifyException(e));
        }

        Scan scan = null;

        if (conf.get(SCAN) != null) {
            try {
                scan = this.convertStringToScan(conf.get(SCAN));
            } catch (IOException e) {
                LOG.error("An error occurred.", e);
            }
        } else {
            try {
                scan = new Scan();

                if (conf.get(SCAN_COLUMNS) != null) {
                    scan.addColumns(conf.get(SCAN_COLUMNS));
                }

                if (conf.get(SCAN_COLUMN_FAMILY) != null) {
                    scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
                }

                if (conf.get(SCAN_TIMESTAMP) != null) {
                    scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
                }

                if (conf.get(SCAN_TIMERANGE_START) != null
                        && conf.get(SCAN_TIMERANGE_END) != null) {
                    scan.setTimeRange(
                            Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
                            Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
                }

                if (conf.get(SCAN_MAXVERSIONS) != null) {
                    scan.setMaxVersions(Integer.parseInt(conf
                            .get(SCAN_MAXVERSIONS)));
                }

                if (conf.get(SCAN_CACHEDROWS) != null) {
                    scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
                }

                scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
            } catch (Exception e) {
                LOG.error(StringUtils.stringifyException(e));
            }
        }

        setScan(scan);
    }

    @Override
    public String formatRowResult(Result row) {
        StringBuilder builder = new StringBuilder();
        for (KeyValue kv : row.list()) {
            builder.append(new String(kv.getRow())).append("  ")
                    .append(new String(kv.getFamily())).append(":")
                    .append(new String(kv.getQualifier())).append("  ")
                    .append(kv.getTimestamp()).append("  ")
                    .append(new String(kv.getValue()));
        }
        return builder.toString();
    }
}

 

 

然后通过这个简单的wordcount    mapreduce即可执行。

 

package com.cp.hbase.mapreduce;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.cp.hadoop.hbase.mapreduce.StringTableInputFormat;

public class WordCount {

    public static class TokenizerMapper extends
            Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    static String convertScanToString(Scan scan) throws IOException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(out);
        scan.write(dos);
        return Base64.encodeBytes(out.toByteArray());
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
       
        Scan scan = new Scan();
        scan.addColumn(Bytes.toBytes("author"),Bytes.toBytes("nickname"));
        scan.addColumn(Bytes.toBytes("article"),Bytes.toBytes("tags"));
       
        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);

        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setInputFormatClass(StringTableInputFormat.class);

        HBaseConfiguration.addHbaseResources(job.getConfiguration());
        job.getConfiguration().set(TableInputFormat.INPUT_TABLE, "blog");
        job.getConfiguration().set(TableInputFormat.SCAN, convertScanToString(scan));

        // FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path("/home/cp/output"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

 

 

 

分享到:
评论

相关推荐

    7.SparkStreaming(上)--SparkStreaming原理介绍.pdf

    2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战.pdf 3.Spark编程模型(下)--IDEA搭建及实战.pdf 4.Spark运行架构.pdf 5.Hive(上)--Hive介绍及部署.pdf 5.Hive(下)-...

    7.SparkStreaming(下)--SparkStreaming实战.pdf

    -SparkStreaming原理介绍.pdf7.SparkStreaming(下)--SparkStreaming实战.pdf8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf8.SparkMLlib(下)--SparkMLlib实战.pdf9.SparkGraphX介绍及实例.pdf10.分布式内存...

    Hadoop+Spark生态系统操作与实战指南.epub

    本书用于Hadoop+Spark快速上手,全面解析Hadoop和Spark生态系统,通过原理解说和实例操作每一个组件,让读者能够轻松跨入大数据分析与开发的大门。  全书共12章,大致分为3个部分,第1部分(第1~7章)讲解Hadoop的...

    Hadoop实战中文版

    9.4 在EC2 上运行MapReduce 程序 9.4.1 将代码转移到Hadoop集群上 9.4.2 访问Hadoop集群上的数据 9.5 清空和关闭EC2 实例 9.6 Amazon Elastic MapReduce 和其他AWS 服务 9.6.1 Amazon Elastic MapReduce 9.6.2...

    Hadoop权威指南 第二版(中文版)

     Nutch系统利用Hadoop进行数据处理的精选实例  总结  Rackspace的日志处理  简史  选择Hadoop  收集和存储  日志的MapReduce模型  关于Cascading  字段、元组和管道  操作  Tap类,Scheme对象和Flow对象 ...

    Hadoop权威指南(中文版)2015上传.rar

    第1章 初识Hadoop 数据!数据! 数据存储与分析 与其他系统相比 关系型数据库管理系统 网格计算 志愿计算 1.3.4 Hadoop 发展简史 Apache Hadoop和Hadoop生态圈 第2章 关于MapReduce 一个气象数据集 数据的格式 使用...

    9.SparkGraphX介绍及实例.pdf

    2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战.pdf 3.Spark编程模型(下)--IDEA搭建及实战.pdf 4.Spark运行架构.pdf 5.Hive(上)--Hive介绍及部署.pdf 5.Hive(下)-...

    8.SparkMLlib(下)--SparkMLlib实战.pdf

    2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战.pdf 3.Spark编程模型(下)--IDEA搭建及实战.pdf 4.Spark运行架构.pdf 5.Hive(上)--Hive介绍及部署.pdf 5.Hive(下)-...

    Hadoop实战中文版.PDF

    524.1.1 专利引用数据 534.1.2 专利描述数据 544.2 构建MapReduce程序的基础模板 554.3 计数 604.4 适应Hadoop API的改变 644.5 Hadoop的Streaming 674.5.1 通过Unix命令使用Streaming 684.5.2 通过...

    6.SparkSQL(中)--深入了解运行计划及调优.pdf

    2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战.pdf 3.Spark编程模型(下)--IDEA搭建及实战.pdf 4.Spark运行架构.pdf 5.Hive(上)--Hive介绍及部署.pdf 5.Hive(下)-...

    Hadoop实战(陆嘉恒)译

    实战第4 章 编写MapReduce基础程序4.1 获得专利数据集4.1.1 专利引用数据4.1.2 专利描述数据4.2 构建MapReduce 程序的基础模板4.3 计数4.4 适应Hadoop API 的改变4.5 Hadoop 的Streaming4.5.1 通过Unix命令使用...

    6.SparkSQL(上)--SparkSQL简介.pdf

    2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战.pdf 3.Spark编程模型(下)--IDEA搭建及实战.pdf 4.Spark运行架构.pdf 5.Hive(上)--Hive介绍及部署.pdf 5.Hive(下)-...

    8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf

    2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战.pdf 3.Spark编程模型(下)--IDEA搭建及实战.pdf 4.Spark运行架构.pdf 5.Hive(上)--Hive介绍及部署.pdf 5.Hive(下)-...

    10.分布式内存文件系统Tachyon介绍及安装部署.pdf

    2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战.pdf 3.Spark编程模型(下)--IDEA搭建及实战.pdf 4.Spark运行架构.pdf 5.Hive(上)--Hive介绍及部署.pdf 5.Hive(下)-...

    Hadoop实战

    1699.3.2 配置集群类型 1699.4 在EC2上运行MapReduce程序 1719.4.1 将代码转移到Hadoop集群上 1719.4.2 访问Hadoop集群上的数据 1729.5 清空和关闭EC2实例 1759.6 Amazon Elastic MapReduce和其他AWS服务 1769.6.1 ...

    大数据架构师应该做到的.pdf

    基于R和python的单机界⾯使⽤⼯具(分析挖掘) 基于spark kafka 的界⾯操作⼯具 基于预测数据的使⽤与展现 ⽀持pandas numpy ⽀持R ⽀持hive hbase spark sparksql sparkstreaming ⽀持keras matplotlib pysql 4)...

Global site tag (gtag.js) - Google Analytics