`
jimmee
  • 浏览: 564547 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

rpc中怎么处理方法的调用的?

阅读更多

1. rpc请求中怎么分发请求方法

 

方法一: 直接使用反射, 通过方法名, 参数名等反射调用

实际使用中的示例, hadoop的实现, 具体可参见 http://jimmee.iteye.com/blog/1206598 例如:

 

org.apache.hadoop.ipc.RPC

 

 

 public Writable call(Class<?> protocol, Writable param, long receivedTime) 
    throws IOException {
      try {
        Invocation call = (Invocation)param;
        if (verbose) log("Call: " + call);

        Method method =
          protocol.getMethod(call.getMethodName(),
                                   call.getParameterClasses());
        method.setAccessible(true);

        long startTime = System.currentTimeMillis();
        Object value = method.invoke(instance, call.getParameters());

   .......
 }

 

 

 

 

方式二: 使用一个标记值来区分, 例如, 如果readInt()=1, 则表示method1, 若readInt()=2, 则表示method2

 

现实中的示例, 同样是hadoop, 例如:

 

org.apache.hadoop.hdfs.server.datanode.DataXceiver

 

 

 

  * Read/write data from/to the DataXceiveServer.
   */
  public void run() {
    DataInputStream in=null; 
    try {
      in = new DataInputStream(
          new BufferedInputStream(NetUtils.getInputStream(s), 
                                  SMALL_BUFFER_SIZE));
      short version = in.readShort();
      if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {
        throw new IOException( "Version Mismatch" );
      }
      boolean local = s.getInetAddress().equals(s.getLocalAddress());
      byte op = in.readByte();
      // Make sure the xciver count is not exceeded
      int curXceiverCount = datanode.getXceiverCount();
      if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
        throw new IOException("xceiverCount " + curXceiverCount
                              + " exceeds the limit of concurrent xcievers "
                              + dataXceiverServer.maxXceiverCount);
      }
      long startTime = DataNode.now();
      switch ( op ) {
      case DataTransferProtocol.OP_READ_BLOCK:
        readBlock( in );
        datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
        if (local)
          datanode.myMetrics.readsFromLocalClient.inc();
        else
          datanode.myMetrics.readsFromRemoteClient.inc();
        break;
      case DataTransferProtocol.OP_WRITE_BLOCK:
    .....
 }

 

 

 

 

方式三: thrift的实现方式, 就是一个接口方法对应一个类, 接口的所有参数对应一个类.

 

 

 

public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor implements org.apache.thrift.TProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName());
    public Processor(I iface) {
      super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
    }

    protected Processor(I iface, Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
      super(iface, getProcessMap(processMap));
    }

    private static <I extends Iface> Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> getProcessMap(Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
      // 一个接口对应一个类
      processMap.put("query", new query());
      .....
    }


    
    // 接口的参数
    public static class query_args implements org.apache.thrift.TBase<query_args, query_args._Fields>, java.io.Serializable, Cloneable   {
    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("query_args");

 

 

 

 

执行过程:

(1) 线程池处理连接(题外话: 这里的实现允许连接进来, 实际上会排队, 线程池处理链接, 如果都是长连接, 则后面的会等很久很久)

 

 

 setServing(true);
    while (!stopped_) {
      int failureCount = 0;
      try {
        TTransport client = serverTransport_.accept();
        WorkerProcess wp = new WorkerProcess(client);
        executorService_.execute(wp);
      } catch (TTransportException ttx) {
        if (!stopped_) {
          ++failureCount;
          LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
        }
      }
    }

 

 

(2) 读取消息:

 

 

 public boolean process(TProtocol in, TProtocol out) throws TException {
    TMessage msg = in.readMessageBegin();
    // 方法名称
    ProcessFunction fn = processMap.get(msg.name);
    if (fn == null) {
      TProtocolUtil.skip(in, TType.STRUCT);
      in.readMessageEnd();
      TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
      out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
      x.write(out);
      out.writeMessageEnd();
      out.getTransport().flush();
      return true;
    }
// 方法处理
    fn.process(msg.seqid, in, out, iface);
    return true;
  }


 public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException {
    T args = getEmptyArgsInstance();
    try {
	// 读取参数值
      args.read(iprot);
    } catch (TProtocolException e) {
      iprot.readMessageEnd();
      TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
      oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.EXCEPTION, seqid));
      x.write(oprot);
      oprot.writeMessageEnd();
      oprot.getTransport().flush();
      return;
    }
    iprot.readMessageEnd();
    // 实际方法执行
    TBase result = getResult(iface, args);
    oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.REPLY, seqid));
    result.write(oprot);
    oprot.writeMessageEnd();
    oprot.getTransport().flush();
  }


  protected query_result getResult(I iface, query_args args) throws org.apache.thrift.TException {
        query_result result = new query_result();
        try {
          result.success = iface.query(args.domainName, args.query, args.mode);
        } catch (OperationException excp) {
          result.excp = excp;
        }
        return result;
      }
 
  

 

 

  

 

分享到:
评论

相关推荐

    cmd-bat-批处理-脚本-98下获取当前路径.zip

    cmd-bat-批处理-脚本-98下获取当前路径.zip

    cmd脚本-bat批处理-去掉字符串不同部位的空格.zip

    cmd脚本-bat批处理-去掉字符串不同部位的空格.zip

    25年上半年湖师大学位报考附件.zip

    25年上半年湖师大学位报考附件.zip

    cmd-bat-批处理-脚本-清空指定大小的文件夹.zip

    cmd-bat-批处理-脚本-清空指定大小的文件夹.zip

    基于MATLAB实现的六种图像增强技术代码总结

    这是一份关于基础图像增强代码的整理,共包含六种常见的图像增强方法,分别是三种Retinex图像增强方法、灰度拉伸、直方图均衡化以及自适应直方图均衡化。这些代码经过验证,均可正常运行,能够帮助用户节省寻找相关代码的时间,从而更专注于自身的实验 。

    cmd-bat-批处理-脚本-局域网扫描.zip

    cmd-bat-批处理-脚本-局域网扫描.zip

    深度学习技术驱动的口罩佩戴情况检测

    首先,利用已有的人脸检测算法对图像进行人脸检测。检测完成后,将检测到的每个人脸单独切割出来,并进行是否戴口罩的二分类判断。在进行分类之前,需要对图像中的人脸进行标注,标注内容包括戴口罩和不戴口罩两种情况。对于标注好的人脸图片,佩戴口罩的人脸图片命名为mask_1,未佩戴口罩的人脸图片命名为nomask_1。当数据集准备完毕后,可以使用train.py文件进行训练。关于训练效果及详细解读,可以参考哔哩哔哩上的视频,视频链接为:基于深度学习的口罩佩戴检测。

    【数据库管理】MySQL元数据查询与管理:数据库结构、表信息及服务器状态获取方法详解

    内容概要:本文主要介绍了MySQL元数据的概念及其获取方式。MySQL元数据是关于数据库和其对象(如表、列、索引等)的信息,存储在系统表中,这些表位于information_schema数据库中。文章详细列举了多种常用的MySQL元数据查询命令,如查看所有数据库(SHOW DATABASES)、选择数据库(USE database_name)、查看数据库中的所有表(SHOW TABLES)、查看表的结构(DESC table_name)、查看表的索引(SHOW INDEX FROM table_name)、查看表的创建语句(SHOW CREATE TABLE table_name)、查看表的行数(SELECT COUNT(*) FROM table_name)、查看列的信息以及查看外键信息等。此外,还介绍了information_schema数据库中的多个表,包括SCHEMATA表、TABLES表、COLUMNS表、STATISTICS表、KEY_COLUMN_USAGE表和REFERENTIAL_CONSTRAINTS表,这些表提供了丰富的元数据信息,可用于查询数据库结构、表信息、列信息、索引信息等。最后,文章还给出了获取查询语句影响的记录数的Perl和PHP实例,以及获取数据库和数据表列表的方法。 适合人群:对MySQL数据库有一定了解,想要深入学习MySQL元数据获取和使用的数据库管理员或开发人员。 使用场景及目标:①帮助用户掌握MySQL元数据的获取方法,以便更好地管理和维护数据库;②通过查询information_schema数据库中的系统表,深入了解数据库结构、表信息、列信息、索引信息等;③提供Perl和PHP实例,方便用户在不同编程环境中获取查询语句影响的记录数和数据库及数据表列表。 其他说明:在使用上述SQL语句时,请注意将查询中的'your_database_name'和'your_table_name'替换为实际的数据库名和表名。此外,在获取数据库和数据表列表时,如果没有足够的权限,结果将返回null。

    cmd-bat-批处理-脚本-实现延时不完全总结.zip

    cmd-bat-批处理-脚本-实现延时不完全总结.zip

    基于经验模态分解(EMD)的信号去噪MATLAB代码实现

    经验模态分解(Empirical Mode Decomposition,EMD)是一种基于数据的信号处理技术,由Nigel Robert Hocking在1998年提出,主要用于分析非线性、非平稳信号。它能够将复杂的信号自适应地分解为若干个本征模态函数(Intrinsic Mode Function,IMF),每个IMF代表信号中不同的频率成分和动态特征。在MATLAB环境下实现EMD去噪,通常包括以下步骤: 信号预处理:对原始信号进行预处理,例如平滑处理或去除异常值,以提高后续分解的准确性。 EMD分解:利用EMD算法对预处理后的信号进行分解,将其拆分为多个IMF和一个残余项。每个IMF对应信号的一个内在频率成分,而残余项通常包含低频或直流成分。 希尔伯特变换:对每个IMF进行希尔伯特变换,计算其瞬时幅度和相位,形成希尔伯特谱,从而更直观地分析信号的时频特性。 去噪策略:常见的去噪策略有两种。一种是根据IMF的频率特性,选择保留低频或高频部分,去除噪声;另一种是利用IMF的Hurst指数,噪声IMF的Hurst指数通常较低,因此可以去除Hurst指数低于阈值的IMF。 重构信号:根据保留的IMF和残余项,通过逆希尔伯特变换和累加,重构出去噪后的信号。 Hurst分析:Hurst指数是评估时间序列长期依赖性的指标,用于区分随机性和自相似性。在EMD去噪中,Hurst分析有助于识别噪声IMF,从而提升去噪效果。 在提供的压缩包中,“license.txt”可能是软件的许可协议文件,用户需遵循其条款使用代码。“EMD-DFA”可能是包含EMD去噪和去趋势波动分析(Detrended Fluctuation Analysis,DFA)的MATLAB代码。DFA是一种用于计算信号长期自相关的统计方法,常与EMD结合,进一步分析信号的分形特征,帮助识别噪声并优化去噪效果。该MATLA

    Python的蚁群优化算法实现与多维函数优化示例

    通过Python实现一个改进型ACO算法,并探讨其在多维函数优化中的应用,为工程优化问题提供新的解决思路。

    cmd-bat-批处理-脚本-枚举显示.zip

    cmd-bat-批处理-脚本-枚举显示.zip

    python实现爬取网络图片爬虫

    python实现爬取网络图片爬虫,亲测可用

    Python实现随机森林算法及配套数据集

    本文件包含随机森林算法的代码实现、对应的数据集以及详细的中文注释,且代码已经经过调试并能正常运行。文件中有两份代码:一份是从网上下载的,另一份是经过自己整理并重新编写的。编程环境是Python 2.7。由于主要目的是用于学习随机森林算法,所以在参数调整方面没有花费太多精力,因此模型的正确率可能并不高。当然,数据集规模较小也是影响正确率的一个因素。如果有兴趣的同学可以自行调整参数,以提高模型的准确率。

    DXCG分析与原理04

    DXCG分析与原理04

    1743390592614.osm

    1743390592614.osm

    cmd脚本-bat批处理-变色+翻滚字符.zip

    cmd脚本-bat批处理-变色+翻滚字符.zip

    全球主动进气格栅市场报告:年复合增长率CAGR为9.9%(2025-2031)

    主动进气格栅是车辆前部空气动力学的关键部件,旨在通过减少空气阻力来提高燃油效率。高速行驶时,百叶窗会自动关闭,将空气从发动机舱中导流出去,从而增强车辆的空气动力学性能。发动机升温时,百叶窗会打开,使空气流过发动机舱并冷却发动机。 据QYResearch调研团队最新报告“全球主动进气格栅市场报告2025-2031”显示,预计2031年全球主动进气格栅市场规模将达到38.4亿美元,未来几年年复合增长率CAGR为9.9%。 根据QYResearch头部企业研究中心调研,全球范围内主动进气格栅生产商主要包括RochlingMagna InternationalValeoContinentalSRG GlobalOpmobilityWirthweinBATZ GroupSTARLITE东阳事业集团等。2024年,全球前五大厂商占有大约48.0%的市场份额。 主要驱动因素: D1:随着人们日益重视降低油耗和碳排放,汽车制造商正在采用AGS系统来优化空气动力学性能。AGS系统会根据车速、发动机温度和驾驶条件自动开启和关闭,调节气流,从而减少阻力并提高燃油效率。 D2:随着汽车行业向电动汽车(EV)转型,更高效的能源管理变得至关重要。AGS有助于提高冷却系统的效率,延长电动汽车的续航里程并提升车辆的整体性能。它们还有助于优化电动动力总成的冷却气流,确保最佳的温度管理。 D3:消费者越来越追求效率更高、运行成本更低、整体性能更佳的汽车。AGS技术通过提高燃油经济性和发动机性能,帮助汽车制造商满足这些需求。 主要阻碍因素: C1:AGS 系统虽然可以提高燃油效率并减少排放,但也会带来额外成本。这些系统的设计、材料以及与车辆的集成都会增加前期生产成本。这可能是一项重大挑战,尤其对于那些瞄准入门级或注重预算的消费者的制造商而言,他们可能认为为这项技术额外付费并不值得。 C2:AGS 系统本质上是机

    基于51单片机的温度报警器设计与Proteus仿真

    51单片机温度报警器的设计是一个典型的嵌入式系统开发项目,主要用于实时监测环境温度,并在温度超出预设阈值时发出报警信号。该项目以51系列单片机作为核心处理单元,结合温度传感器采集温度数据,通过C语言编程实现数据处理和报警功能,并借助Proteus仿真软件进行模拟验证。 51单片机是由Intel公司推出的8位微控制器,因其出色的兼容性和易用性,在众多嵌入式系统中得到广泛应用。它具备丰富的内置资源,例如定时器、中断系统和I/O端口等,能够满足基本的控制需求。 在本设计中,可选用的温度传感器包括DS18B20或LM35。这两种传感器可以将环境温度转换为相应的信号供单片机读取。DS18B20是一款数字温度传感器,能够通过单总线与51单片机直接通信;而LM35输出的是与温度成正比的模拟电压信号,需要借助模数转换器(ADC)将其转换为数字信号后才能被单片机读取。 C语言是51单片机开发中常用的高级编程语言,具有结构紧凑、运行效率高等优点。在本项目中,需要通过C语言编写程序来实现以下功能: 初始化温度传感器和I/O端口。 定时采集温度数据。 将采集到的温度值与预设的报警阈值进行比较。 当温度超过阈值时,通过GPIO端口输出报警信号,例如LED闪烁或蜂鸣器发声。 如有必要,还可以实现简单的显示功能,例如通过LCD显示屏显示当前温度。 Proteus是一款功能强大的电子电路仿真软件,支持多种微控制器和元器件模型。在51单片机温度报警器的设计中,Proteus的主要用途包括: 布局硬件电路,包括51单片机、温度传感器、LED、蜂鸣器、LCD等元件的连接。 仿真运行C程序,验证温度采集、比较和报警逻辑的正确性。 模拟真实环境下的温度变化,测试系统的响应性能。 在没有实物硬件的情况下,提供一个直观的调试平台,方便开发和优化。 设计硬件电路图:确定各元件之间的连接方式,例如单片机与传感器的接口、报警

    cmd-bat-批处理-脚本-OptimizeXp.zip

    cmd-bat-批处理-脚本-OptimizeXp.zip

Global site tag (gtag.js) - Google Analytics