HBase的coprocessor分为两类,Observer和EndPoint。Observer相当于触发器,代码部署在服务端,相当于对API调用的代理。介绍这方面的文章不少,在此不赘述。这里想说一下EndPoint的使用。
EndPoint相当于存储过程。0.94.x之前使用EndPoint需要实现CoprocessorProtocol接口,而0.96.x的EndPoint改为用protobufs作为RPC的协议。在此用一个具体的例子说明一下新版的EndPoint该怎么使用。
例如:统计一张表的行数。
首先首先编写protobuf文件并编译。
option java_package = "linecounter";
option java_outer_classname = "LineCounterServer";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for=SPEED;
message CountRequest {
required string askWord = 1;
}
message CountResponse {
required int64 retWord = 1;
}
service LineCounter {
rpc countLine(CountRequest)
returns (CountResponse);
}
编译后会生成LineCounterServer.java
CountRequest是发送给服务端的消息,这里定义字符串askWord来存放具体消息内容。CounterResponse是返回的结果,统计的是行数,所以用long类型存放。LineCounter中定义一个方法countLine,传递请求,返回响应。具体说明请参见protobuf。
实现EndPoint
public class LineCounterEndPoint extends LineCounterServer.LineCounter implements Coprocessor, CoprocessorService {
private RegionCoprocessorEnvironment env;
@Override
public void start(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
if (coprocessorEnvironment instanceof RegionCoprocessorEnvironment)
this.env = (RegionCoprocessorEnvironment) coprocessorEnvironment;
else throw new CoprocessorException("Must be loaded on a table region!!");
}
@Override
public void stop(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
}
@Override
public Service getService() {
return this;
}
@Override
public void countLine(RpcController controller, LineCounterServer.CountRequest request, RpcCallback<LineCounterServer.CountResponse> done) {
RegionScanner scanner = null;
LineCounterServer.CountResponse.Builder respBuilder = LineCounterServer.CountResponse.newBuilder();
if (!"count".equals(request.getAskWord())) {
respBuilder.setRetWord(23333);
} else {
long count = 0;
try {
Scan scan = new Scan();
scan.setMaxVersions(1);
scanner = env.getRegion().getScanner(scan);
List<Cell> list = new ArrayList<>();
while (scanner.next(list))
count += 1;
respBuilder.setRetWord(count);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (scanner != null)
try {
scanner.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
done.run(respBuilder.build());
}
}
LineCounterEndPoint需要继承抽象类LineCounter并实现Coprocessor和CoprocessorService接口。LineCounter在刚才生成的java文件里。
start和stop方法分别负责endpoint执行前的初始化和结束后的清理工作。start方法的参数是一个接口,需要根据实际环境将其转成需要的类型。
主要需要实现的是countLine方法,这也刚才在protobuf中定义的方法。为了测试效果,这里对请求做了一个区分:如果收到的请求信息不是“count”,那么返回23333;否则统计region的记录行数并返回。
实现Client端
public class LineCounterClient {
public static void main(String[] args) throws Throwable {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "zk_host1:2181,zk_host2:2181,zk_host3:2181");
conf.set("hbase.master", "host_master:60000");
HTable table = new HTable(conf, "count_test");
final LineCounterServer.CountRequest req = LineCounterServer.CountRequest.newBuilder().setAskWord("count").build();
Map<byte[], Long> tmpRet = table.coprocessorService(LineCounterServer.LineCounter.class, null, null, new Batch.Call<LineCounterServer.LineCounter, Long>() {
@Override
public Long call(LineCounterServer.LineCounter instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<LineCounterServer.CountResponse> rpc = new BlockingRpcCallback<>();
instance.countLine(controller, req, rpc);
LineCounterServer.CountResponse resp = rpc.get();
return resp.getRetWord();
}
});
long ret = 0;
for (long l : tmpRet.values())
ret += l;
System.out.println("lines: " + ret);
}
}
首先设置zookeeper和master的地址和接口信息。然后构造请求即CountRequest,先将请求信息设置为“count”。调用HTable的coprocessorService方法
public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
该方法有四个参数,第1个参数是protobuf生成的LineCounter类对象。第2个和第3个参数分别为起始和结束rowkey,这里的意思是范围内rowkey所在的region都会调用endpoint,这里设为null表明所有的region都会调用。第4个参数为接口,需要重写call方法。
方法的返回值是Map类型,Map的size与参与计算的region个数一致。所以最后需要做的一步是讲返回结果进行累加,得到最后的结果。
此程序返回5782,是表count_test的行数。若请求消息设置为“hello”,程序返回23333。
coprocessorService还有一个五参数方法,第五个参数是一个CallBack接口,还可以如此实现:
public class LineCounterClient {
public static void main(String[] args) throws Throwable {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "zk_host1:2181,zk_host2:2181,zk_host3:2181");
conf.set("hbase.master", "host_master:60000");
HTable table = new HTable(conf, "count_test");
final LineCounterServer.CountRequest req = LineCounterServer.CountRequest.newBuilder().setAskWord("count").build();
final AtomicLong ret = new AtomicLong();
table.coprocessorService(LineCounterServer.LineCounter.class, null, null, new Batch.Call<LineCounterServer.LineCounter, Long>() {
@Override
public Long call(LineCounterServer.LineCounter instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<LineCounterServer.CountResponse> rpc = new BlockingRpcCallback<>();
instance.countLine(controller, req, rpc);
LineCounterServer.CountResponse resp = rpc.get();
return resp.getRetWord();
}
}, new Batch.Callback<Long>() {
@Override
public void update(byte[] region, byte[] row, Long result) {
ret.getAndAdd(result);
System.out.println(Bytes.toString(row)+": "+result);
}
});
System.out.println("lines: " + ret.get());
}
}
每调用一次call方法之后会调用一次update方法,因此在外部定义一个变量ret存放结果,每次调用update时更新ret的值即可。
分享到:
相关推荐
HBaseCoprocessor的实现与应用.pdf
在hbtc2012上的发言,介绍hbase coprocessor的优化。hbase的cp是其自带的分布式并行计算框架。
讲师:陈杨——快手大数据高级研发工程师 ...内容概要:(1)讲解hbase coprocessor的原理以及使用场景,(2) coprocessor整个流程实战,包括开发,加载,运行以及管理(3)结合1,2分析coprocessor在rsgroup中的具体使用
藏经阁-HBase Coprocessor-22.pdf
HBaseCoprocessor的实现与应用.zip
使用Hbase协作器(Coprocessor)同步数据到ElasticSearch(hbase 版本 1.2.0-cdh5.8.0, es 2.4.0 版本)源代码
hbase示例 twitbase
hbase-solr-coprocessor 测试代码,目的是借助solr实现hbase二级索引,以使hbase支持高效的多条件查询。主要通过hbase的coprocessor的Observer实现,通过coprocessor在记录插入hbase时向solr中创建索引。 项目核心为...
HBase-coprocessor
本人原创, 1.Hbase连接需要改Hbase包中的两个配置文件,加上Hbase所在机器ip及端口 2.HBaseDMLT初始化环境 3.MapDataIni为建表 4.MapDataInsert为写数据 5.MapDataDelete为删除数据 6.MapDataRead为读数据
本代码是java链接hbase数据库并对hbase进行增删改查操作的实例代码,包括批量操作
2-6+HBase+Coprocessor
java代码使用thrift2操作hbase示例,thrift2连接hbase添加数据,单条查找,删除数据,根据扫描器查找,修改数据等测试实例
经过3天测试,总结出可运行成功的C#For HBase示例代码 经过3天测试,总结出可运行成功的C#For HBase示例代码
javaapi实现hbase的触发器,包含hbase的工具类
HBase 示例 我在使用 HBase 时编写的一些 HBase 代码示例,用作更多相关工作的起点。 要编译示例,需要在编译前将位于 hbase/lib 目录中的 JAR 添加到 CLASSPATH。 包括一些有用的协处理器示例,其中一个用于 ...
增量式的Apriori算法,有点像分布式的Apriori,因为我们可以把已挖掘的事务集和新增的事务集看作两个互相独立的数据集,挖掘新增的事务集,获取所有新增频繁集,然后与已有的频繁集做并集,对于两边都同时频繁的项集...
HBase RDD示例 这是的示例项目。 它目前可以在CDH 5.5上运行,尽管它可以在CDH的其他版本上进行较小的修改。 跑步 首先,使用 sbt assembly 这将生成target/scala-2.12/hbase-rdd-examples-assembly-0.9.1.jar 。 ...
scala语言编写的spark streamming消费kafka数据存入hbase示例代码。打包成jar包可以在spark2.4下运行,测试环境是CDH6.2,运行没有问题。
Hbase本身只有一级索引rowkey,现在通过Hbase coprocessor协处理器把Hbase的数据索引存储到Elasticsearch,从而建立二级索引;ppt中讲述了一些注意事项,挺有用的,希望能有所帮忙!