`
lucky_xingxing
  • 浏览: 118058 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

HBase Scan Filter 自定义 Comparator 比较器

阅读更多

    最近项目需求需要完善Sqoop的更多功能点,其中一项是将Hbase的数据导出到hdfs或hive,重点是Hbase出来的数据需要支持条件过滤。类似于Sql中的什么 > ,< ,=,主要是针对数字类型的数据过滤 等。

    研究了关于Hbase的过滤只能通过Filter来进行,其中符合我们条件的Filter有一个:

     SingleColumnValueFilter

    这个Filter支持根据字段值进行过滤。

    但是Filter 的 Comparator 没有一个支持数字类型比较器,BinaryComparator,BitComparator这些比较器没法实现我们的需求,使用他们过滤出来的数据不准确。于是目前想到的有两种方案

    1.Scan出数据以后自己通过条件过滤每一行数据满不满足条件。(不雅观)

     2.自定义满足条件的Comparator 。

 

     最终选择自定义Comparator这种方案。

 

     在网上搜索了一下,并且看了HBase现有的Comparator的源码,自定义Comparator需要做以下这些事:

     

 1.定义protobuf文件

        protobuf文件定义可以参考hbase源码的hbase-protocol模块下面的protobuf文件夹下面的Comparator.proto文件。我是直接拷贝过来然后修改修改。

至于为什么需要定义proto文件,是因为hbase所有的rpc数据交互都是通过protobuf来完成的。

下面是我定义的proto文件:

 ---------------------------------------------------------------------------------------------------------------      

// This file contains protocol buffers that are used for filters

 

option java_package = "com.star.hbase.defined.comparator";

option java_outer_classname = "ComparatorProtos";

option java_generic_services = true;

option java_generate_equals_and_hash = true;

option optimize_for = SPEED;

 

// This file contains protocol buffers that are used for comparators (e.g. in filters)

message NumberComparator{

required bytes value = 1;    

required string fieldType = 2;

---------------------------------------------------------------------------------------------------------------

通过以下命令生产java类:前提是protoc.exe必须在当前目录

protoc --java_out=D:\proto NumberComparator.proto

 

具体protobuf的用法及其他我就不说了,网上搜一下即可。

我定义了一个NumberComparator的vo类,它下面有两个字段 ,第一个是需要进行过滤的值,第二个是需要将hbase的指定列转成对应的类型进行比较 ,比如 int double等 只支持数字类型。

 

//          

 

2.创建比较器java类并且该类继承ByteArrayComparable

 

具体代码如下:

-------------------------------------------------------------------------------------------------------------------

package com.star.hbase.defined.comparator;

 

import com.google.protobuf.ByteString;

import com.google.protobuf.InvalidProtocolBufferException;

import org.apache.hadoop.hbase.exceptions.DeserializationException;

import org.apache.hadoop.hbase.filter.ByteArrayComparable;

import org.apache.hadoop.hbase.util.Bytes;

 

/**

 * Created with Intellij IDEA

 * User: star

 * Date: 2015-02-09

 * Time: 17:10

 * function:

 * To change this template use File | Settings | File Templates.

 */

public class NumberComparator extends ByteArrayComparable {

    private String fieldType;  

    private byte [] data;

 

    /**

     * Constructor

     * @param value value

     */

    public NumberComparator(byte[] value,String fieldType) {

        super(value);

        this.fieldType = fieldType;

        this.data = value;

    }

 

    @Override  //重写该方法

    public byte[] toByteArray() {

        ComparatorProtos.NumberComparator.Builder builder =

                ComparatorProtos.NumberComparator.newBuilder();

 

        builder.setValue(ByteString.copyFrom(this.data));

        builder.setFieldType(this.fieldType);

        return builder.build().toByteArray();

    }

    

   //定义该方法,用于对象反序列化操作

    public static NumberComparator parseFrom(final byte [] bytes) throws DeserializationException {

        ComparatorProtos.NumberComparator proto = null;

        try {

            proto = ComparatorProtos.NumberComparator.parseFrom(bytes);

        } catch (InvalidProtocolBufferException e) {

            throw new DeserializationException(e);

        }

        return new NumberComparator(proto.getValue().toByteArray(),proto.getFieldType());

    }

 

    @Override     //重写比较方法 里面就可以按照自己的意愿来实现自己的比较器

    public int compareTo(byte[] bytes, int offset, int length) {

        if(fieldType.equalsIgnoreCase("int") || fieldType.equalsIgnoreCase("integer")) {

            Integer paramValue = byteConvertObj(Integer.class,data);

            Integer currentValue = byteConvertObj(Integer.class,Bytes.copy(bytes, offset, length));

            return  paramValue.compareTo(currentValue);

        }else if(fieldType.equalsIgnoreCase("long") || fieldType.equalsIgnoreCase("bigint")){

            Long paramsValue =  byteConvertObj(Long.class,data);

            Long currentValue =  byteConvertObj(Long.class,Bytes.copy(bytes, offset, length));

            return paramsValue.compareTo(currentValue);

        }else if(fieldType.equalsIgnoreCase("float")){

            Float paramsValue =  byteConvertObj(Float.class,data);

            Float currentValue =  byteConvertObj(Float.class,Bytes.copy(bytes, offset, length));

            return paramsValue.compareTo(currentValue);

        }else if(fieldType.equalsIgnoreCase("double")){

            Double paramsValue =  byteConvertObj(Double.class,data);

            Double currentValue =  byteConvertObj(Double.class,Bytes.copy(bytes, offset, length));

            return paramsValue.compareTo(currentValue);

        }else if(fieldType.equalsIgnoreCase("short") || fieldType.equalsIgnoreCase("SMALLINT")){

            Short paramsValue =  byteConvertObj(Short.class,data);

            Short currentValue =  byteConvertObj(Short.class,Bytes.copy(bytes, offset, length));

            return paramsValue.compareTo(currentValue);

        }

        return 1;

    }

 

    private <T> T  byteConvertObj(Class<T> clazz,byte [] data){

        String clazzName  = clazz.getSimpleName();

        if(clazzName.equalsIgnoreCase("Integer")){

            Integer paramValue ;

            try {

                paramValue = Bytes.toInt(data);

            } catch (IllegalArgumentException  e) {

                paramValue = Integer.valueOf(Bytes.toString(data));

            }

            return (T)paramValue;

        }else if(clazzName.equalsIgnoreCase("Long")){

            Long paramValue ;

            try {

                paramValue = Bytes.toLong(data);

            } catch (IllegalArgumentException  e) {

                paramValue = Long.valueOf(Bytes.toString(data));

            }

            return (T)paramValue;

        }else if(clazzName.equalsIgnoreCase("Float")){

            Float paramValue ;

            try {

                paramValue = Bytes.toFloat(data);

            } catch (IllegalArgumentException  e) {

                paramValue = Float.valueOf(Bytes.toString(data));

            }

            return (T)paramValue;

        }else if(clazzName.equalsIgnoreCase("Double")){

            Double paramValue;

            try {

                paramValue = Bytes.toDouble(data);

            } catch (IllegalArgumentException  e) {

                paramValue = Double.valueOf(Bytes.toString(data));

            }

            return (T)paramValue;

        }else if(clazzName.equalsIgnoreCase("Short")){

            Short paramValue;

            try {

                paramValue = Bytes.toShort(data);

            } catch (IllegalArgumentException  e) {

                paramValue = Short.valueOf(Bytes.toString(data));

            }

            return (T)paramValue;

        }

        return null;

    }

}

---------------------------------------------------------------------------------------------------------------------------------

 

  至此 比较器定义完成,接着需要将该protobuf生产的java类和我们定义的Comparator类打成jar包,然后放到Hbase目录下面的lib目录里面,这样才真正执行的时候才能找到该类。放进去后需要重启以下hbase集群。

 

最后我们写一个测试来看下我们的自定义比较器是否生效:局部代码:

 -------------------------------------------------------------------------------------------------------------       

        Scan scan = new Scan();

        scan.addFamily(Bytes.toBytes("info"));

        FilterList filterList = new FilterList();

        NumberComparator comparator = new NumberComparator(Bytes.toBytes(1500),"int"); //自定义的比较器传入我们自己定义的两个参数

        SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("info"),Bytes.toBytes("id"),

                CompareFilter.CompareOp.GREATER,comparator);

        filterList.addFilter(filter);

        scan.setFilter(filterList);

 -------------------------------------------------------------------------------------------------------------  

最后成功实现该功能。

 

实现期间遇到的一个异常我在这里列出来 尤为深刻:

这个异常时因为自定义的Comparator里面出现了异常,然后数据传输遇到问题,数据一直传输不过去出现的问题。可能解释的不是特别好,希望大家可以完善,毕竟自身的对hbase还不是特别熟悉。

--------------------------------------------------------------------------------------------------------------------------

java.lang.RuntimeException: org.apache.hadoop.hbase.DoNotRetryIOException: Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?

at org.apache.hadoop.hbase.client.AbstractClientScanner$1.hasNext(AbstractClientScanner.java:94)

at org.springframework.data.hadoop.hbase.RowMapperResultsExtractor.extractData(RowMapperResultsExtractor.java:46)

at org.springframework.data.hadoop.hbase.RowMapperResultsExtractor.extractData(RowMapperResultsExtractor.java:30)

at org.springframework.data.hadoop.hbase.HbaseTemplate$1.doInTable(HbaseTemplate.java:131)

at org.springframework.data.hadoop.hbase.HbaseTemplate.execute(HbaseTemplate.java:58)

at org.springframework.data.hadoop.hbase.HbaseTemplate.find(HbaseTemplate.java:126)

at org.springframework.data.hadoop.hbase.HbaseTemplate.find(HbaseTemplate.java:155)

at com.csx.hbase.HBaseServiceImpl.scan(HBaseServiceImpl.java:116)

at com.csx.hbase.TestHbaseServiceImpl.test(TestHbaseServiceImpl.java:24)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)

at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)

at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)

at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)

at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:74)

at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:82)

at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:72)

at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:240)

at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)

at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)

at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)

at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)

at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)

at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)

at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)

at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)

at org.junit.runners.ParentRunner.run(ParentRunner.java:309)

at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:180)

at org.junit.runner.JUnitCore.run(JUnitCore.java:160)

at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)

at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)

at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)

Caused by: org.apache.hadoop.hbase.DoNotRetryIOException: Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?

at org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:402)

at org.apache.hadoop.hbase.client.AbstractClientScanner$1.hasNext(AbstractClientScanner.java:91)

... 39 more

Caused by: org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException: org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException: Expected nextCallSeq: 1 But the nextCallSeq got from client: 0; request=scanner_id: 14 number_of_rows: 100 close_scanner: false next_call_seq: 0

at org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3098)

at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29497)

at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2012)

at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:98)

at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler.consumerLoop(SimpleRpcScheduler.java:168)

at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler.access$000(SimpleRpcScheduler.java:39)

at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler$1.run(SimpleRpcScheduler.java:111)

at java.lang.Thread.run(Thread.java:745)

 

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:525)

at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)

at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:95)

at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRemoteException(ProtobufUtil.java:285)

at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:204)

at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:59)

at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:114)

at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:90)

at org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:354)

... 40 more

Caused by: org.apache.hadoop.hbase.ipc.RemoteWithExtrasException(org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException): org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException: Expected nextCallSeq: 1 But the nextCallSeq got from client: 0; request=scanner_id: 14 number_of_rows: 100 close_scanner: false next_call_seq: 0

at org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3098)

at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29497)

at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2012)

at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:98)

at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler.consumerLoop(SimpleRpcScheduler.java:168)

at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler.access$000(SimpleRpcScheduler.java:39)

at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler$1.run(SimpleRpcScheduler.java:111)

at java.lang.Thread.run(Thread.java:745)

 

at org.apache.hadoop.hbase.ipc.RpcClient.call(RpcClient.java:1453)

at org.apache.hadoop.hbase.ipc.RpcClient.callBlockingMethod(RpcClient.java:1657)

at org.apache.hadoop.hbase.ipc.RpcClient$BlockingRpcChannelImplementation.callBlockingMethod(RpcClient.java:1715)

at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.scan(ClientProtos.java:29900)

at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:174)

... 44 more

 

 

 

 

   

 

 

 

        

 

 

 

 

2
1
分享到:
评论

相关推荐

    hbase自定义Comparator进行数值比较

    hbase 自带的Comparator只能进行字符串的比较,不能进行数值比较,通过自定义代码实现该功能。 具体使用请参考 http://blog.csdn.net/mtj66/article/details/52574739

    hbase自定义数值型比较器

    hbase自定义数值型比较器,使用maven生成jar使用jar包;或者直接将生成的两个类复制到你的工程下用。

    hbasesink 自定义序列化类

    hbasesink 自定义序列化类 ,可实现自定义rowkey及去除字段两边索引,具体请看下代码。 hbasesink 自定义序列化类 ,可实现自定义rowkey及去除字段两边索引,具体请看下代码。

    HBASE-comparator.zip

    HBase中如果存储入字符串类型的,是按照字典序进行比较的,如9&gt;100,并不是按照数值的大小进行比较,我们可以自定义一个数值比较器进行比较。

    Hbase的JavaAPI

    Hbase的JavaAPI 包括表的增删改查

    hbase的rowkey设计与hbase的协处理器运用.docx

    该文档是介绍hbase的rowkey设计与hbase的协处理器运用,与大家分享!

    hbase性能测试.docx

    hbase性能报告

    HBase in Practise: 性能、监控和问题排查

    HBase在不同版本(1.x, 2.x, 3.0)中针对不同类型的硬件(以IO为例,HDD/SATA-SSD/PCIe-SSD/Cloud)和场景(single/batch, get/scan)做了(即将做)各种不同的优化,这些优化都有哪些?如何针对自己的生产业务和...

    hbase:habse自定义MR

    hbase从HBase表中导入数据到Hbase表中将fruit表中的一部分数据,通过MR迁入到fruit_mr表中从HDFS中导入数据到Hbase表中根据HDFS中的数据导入到fruit_hdfs表中

    HBase最佳实践–Scan用法大观园

    也算是Scan系列的其中一篇吧,后面对于Scan还会有一篇结合HDFS分析HBase数据读取在HDFS层面是怎么一个流程,敬请期待。HBase中Scan从大的层面来看主要有三种常见用法:ScanAPI、TableScanMR以及SnapshotScanMR

    hbase shell常用命令汇总

    总结常用的hbase shell操作命令。

    hbase phoenix sql

    对于使用了HBase API、协同处理器及自定义过滤器的Impala与OpenTSDB来说,进行相似的查询Phoenix的速度也会更快一些。 Phoenix查询引擎会将SQL查询转换为一个或多个HBase scan,并编排执行以生成标准的JDBC结果集...

    基于spring boot 的spring-boot-starter-hbase自动注解实现

    基于spring boot 的spring-boot-starter-hbase自动注解实现,HbaseTemplate的直接使用

    spring-boot-starter-hbase自定义的spring-boot的hbasestarter

    自定义的spring-boot的hbase starter,为hbase的query和更新等操作提供简易的api并集成spring-boot的auto configuration

    apache-phoenix-5.0.0-HBase-2.0-bin.tar.gz

    Apache Phoenix是构建在HBase之上的关系型数据库层,作为内嵌的客户端JDBC驱动用以对...直接使用HBase的API,结合协处理器(coprocessor)和自定义的过滤器的话,小范围的查询在毫秒级响应,千万数据的话响应速度为秒级

    hbasedatacompare:并发scan hbase,redis 的工具类

    Concurrent scanning of data sources 工具介绍 这是一个并发扫描数据源(hbase,redis)的工具,多线程scan提高效率,但会对机器以及集群造成一定压力。代码说明 1.concurrentRedisScan:多线程scan redis(每个线程...

    hbase shell命令详解

    hbase shell命令详解

    Hbase数据库界面管理器

    可以通过GUI管理Hbase数据库,支持多种过滤器查询。

    HbaseTemplate 操作hbase

    java 利用 sping-data-hadoop HbaseTemplate 操作hbase find get execute 等方法 可以直接运行

    HBase轻量级中间件simplehbase.zip

     hbase query封装:封装了hbase的filter,可以使用sql-like的方式操作hbase。  动态query封装:类似于myibatis,可以使用xml配置动态语句查询hbase。    insert,update支持: 建立在hbase的checkAndPut之上。  ...

Global site tag (gtag.js) - Google Analytics