public static void initConnectDB(){ primaryKey = "id"; rdbmsUrl = "jdbc:mysql://hadoop/DB" ; rdbmsUserName = ""; rdbmsPassword = ""; connector = new RDBMSConnector(); try { con = connector.getConnection(rdbmsUrl, rdbmsUserName, rdbmsPassword); communicator = new RDBMSCommunicator2UFN(con); } catch (Exception e){ System.out.println("connect to db exception in initConnectDB()"); e.printStackTrace(); } } public static class GetUserID extends BaseBasicBolt{ //private RDBMSCommunicator communicator = null; private ResultSet rs = null; @Override public void prepare(Map stormConf, TopologyContext context) { System.out.println("in prepare con : "+con); //this.communicator = new RDBMSCommunicator(con); System.out.println("in pretpare communicator :"+communicator); } public void execute(Tuple input, BasicOutputCollector collector) { Object id = input.getValue(0); String userName = input.getString(1); String sql = String.format("select userID from usersinfo where username='%s'", userName); System.out.println("sql in get-user-id: "+sql); rs = communicator.selecteExec(sql); String userID = null; if (rs != null){ try { rs.next(); userID = rs.getString("userID"); } catch (Exception e){ e.printStackTrace(); } collector.emit(new Values(id, userID)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "userID")); } } public static class GetUserFunctionsID extends BaseBasicBolt{ //private RDBMSCommunicator communicator = null; private ResultSet rs = null; @Override public void prepare(Map stormConf, TopologyContext context) { //communicator = new RDBMSCommunicator(con); } public void execute(Tuple input, BasicOutputCollector collector) { Object id = input.getValue(0); String userID = input.getString(1); if (userID == null || userID.trim().length() == 0){ return; } String sql = String.format("select functionID from userfunctions where userID='%s'", userID); System.out.println("sql in get-user-functionid : "+sql); rs = communicator.selecteExec(sql); String functionID = null; if (rs != null){ try { while(rs.next()){ functionID = rs.getString("functionID"); collector.emit(new Values(id,functionID)); } } catch(Exception e){ e.printStackTrace(); } } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id","functionID")); } } public static class GetUserFunctionsName extends BaseBatchBolt{ //private RDBMSCommunicator communicator = null; private ResultSet rs = null; List<String> functionsName = new ArrayList<String>(); BatchOutputCollector _collector; Object _id; public void execute(Tuple tuple) { String functionID = tuple.getString(1); if (functionID == null || functionID.trim().length() == 0){ return ; } String sql = String.format("select functionName from functionsinfo where functionID='%s'",functionID); System.out.println("sql in get-user-functionname : "+sql ); rs = communicator.selecteExec(sql); String functionName = null; if(rs != null){ try { rs.next(); functionName = rs.getString("functionName"); functionsName.add(functionName); } catch (Exception e){ e.printStackTrace(); } } } public void finishBatch() { _collector.emit(new Values(_id,functionsName.toString())); } public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; _id = id; } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "user-funcions-name")); } } public static LinearDRPCTopologyBuilder construct(){ initConnectDB(); LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("user-functions-name"); builder.addBolt(new GetUserID(), 2); builder.addBolt(new GetUserFunctionsID(),2).shuffleGrouping(); builder.addBolt(new GetUserFunctionsName(),2).fieldsGrouping(new Fields("id","functionID")); return builder; } public static void main(String[] args) throws Exception{ LinearDRPCTopologyBuilder builder = construct(); Config conf = new Config(); if(args==null || args.length==0) { conf.setMaxTaskParallelism(3); LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("user-fn-drpc", conf, builder.createLocalTopology(drpc)); String[] userNames = new String[] { "qingwu.fu"}; for(String un: userNames) { System.out.println("Functions name of : " + un + ": " + drpc.execute("user-functions-name", un)); } cluster.shutdown(); drpc.shutdown(); } else { conf.setNumWorkers(6); StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology()); } }
相关推荐
这是storm中drpc应用的一个例子。
storm DRPC简单例程,服务器端是运行在集群环境中的,客户端去调用DRPC服务
storm之drpc操作demo示例
Storm-drpc节点适用于Node.js的Apache Storm DRPC客户端受启发,但不同之处在于可以选择将其设置为保持活动状态,它不需要在每个execute()调用中都创建连接,并且可以喜欢的传统方式或promise方式使用它。...
2、注重实践,对较抽象难懂的技术点如Grouping策略、并发度及线程安全、批处理事务、DRPC、Storm Trident均结合企业场景开发案例进行讲解,简单易懂; 3、分享积累的经验和技巧,从架构的角度剖析场景和设计实现...
1、课程中完整开发3个Storm项目,均为企业实际项目,其中一个是完全由Storm Trident开发。 项目源码均可以直接运行,也可直接用于商用或企业。 2、Storm全面、系统、深入讲解 3、注重实践,对较抽象难懂的技术点如...
第2章通过实际运行一个简单的例子,以及介绍本地环境和集群环境的搭建,让读者对Storm有了直观的认识;第3章深入讲解了Storm的基本概念,同时实现一个Topology运行;第4章和第5章阐述了Storm的并发度、可靠处理的...
01.Storm基础知识02.Storm集群安装-1-new .avi.baiduyun.p05.Storm配置文件配置项讲解07.Storm基本API介绍08.Storm Topology的并发度09.Strom消息机制原理讲解10.Storm DRPC实战讲解
• BasicBolt • Storm 批处理 • Storm TOPN • Storm 流程聚合 • Storm DRPC • Storm executor、worker、task之间的关系和调优 • Storm异常解决
内容概要: • Storm 记录级容错原理 • Storm 配置详解 • Storm 批处理 • Storm TOPN • Storm 流程聚合 • Storm DRPC • Storm executor、worker、task之间的关系和调优 • Storm异常解决
【Storm篇】--Storm中的同步服务DRPC 【Storm篇】--Storm从初始到分布式搭建 【Storm篇】--Storm 容错机制 【Storm篇】--Storm并发机制 【Storm篇】--Storm分组策略 【Storm篇】--Storm基础概念
dRPC 一些描述。要求移液器> = 0.1.4安装pip install dRPC特征一些功能。文献资料例子一些描述。 # Some code 新分支测试
风暴Debian包装 用于分布式实时计算系统的Debian打包。 这个项目的目标是提供一个灵活的工具来构建一个debian软件包,该软件包遵循debian标准并使用风暴发行版提供的默认配置。... 还有一个storm软件包,
Storm常见模式——分布式RPC 分布式RPC(distributedRPC,DRPC)用于对Storm上大量的...DRPC可以单独作为一个独立于Storm的库发布,但由于其重要性还是和Storm捆绑在了一起。1.接收到一个RPC调用请求;2.发送请求到Sto
go.mod 中只有 3 个要求,还有 9 行go mod graph ! 兼容的。 适用于许多 gRPC 用例! 。 DRPC 具有闪电般的快速。 可扩展。 DRPC 与传输无关,支持中间件,并围绕接口设计。 战斗测试。 已在数万台服务器的...
linux实验环境,storm搭建完毕后的开发。eclipse开发环境,大数据界hello world——wordcount详解,bolt、分组机制、storm DRPC详解
目前包括几个服务: storm-mesos - Nimbus(Mesos 调度程序) storm-ui - Web 界面(默认端口为8080 ) storm-drpc - DRPC 守护进程storm-log - 用于在 Web UI 中显示日志的服务(从计算节点获取日志) 请注意,不...
DRPC:简单的Discord RPC程序