`

rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理

阅读更多

本身使用RpcClient发送消息与同步接收消息的代码是很简单的,如下:

RpcClient client = new RpcClient(channel, exchange, routingKey);

String msg = "hello world!";

byte[] result = client.primitiveCall(msg.getBytes());

这里的primitiveCall调用后,当前线程会进行同步等待,等待消息接收端给自己的回复消息

一个完整的发送消息与接收回复消息的图例:

rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理 - micro sun - 学无止境

整个流程详解:

 

  • l  创建RpcClient实例

 

RpcClient client = new RpcClient(channel, exchange, routingKey);

创建RpcClient时会做两件事:

A:创建一个回复queue,接收当前RpcClient发送的消息的消息接收人会将回复消息发到这个replyQueue上供当前RpcClient去接收回复消息

_replyQueue = setupReplyQueue();

   

protected String setupReplyQueue() throws IOException {

return _channel.queueDeclare("", false, false, true, true, null).getQueue();

//这里实际上是由rabbitmq server去定义一个唯一的queue(因为queueName是空的,所以是由server去生成queueName),最后返回这个queueNamequeueName是由server生成的,使用的是以下这个方法:

Queue.DeclareOk queueDeclare(String queueName, boolean passive, boolean durable, boolean exclusive, boolean autoDelete,

                                 Map<String, Object> arguments)

}

 

B:创建一个接收回复消息的consumer

_consumer = setupConsumer();

 

protected DefaultConsumer setupConsumer() throws IOException {

//创建一个接收消息的DefaultConsumer实例

DefaultConsumer consumer = new DefaultConsumer(_channel) {

    @Override //发生shutdown的时候回调

    public void handleShutdownSignal(String consumerTag,

                ShutdownSignalException signal) {

synchronized (_continuationMap) {

    for (Entry<String, BlockingCell<Object>> entry : _continuationMap.entrySet()) {

    entry.getValue().set(signal);

    }

    _consumer = null;

}

    }

 

    @Override //处理消息交付

    public void handleDelivery(String consumerTag,

               Envelope envelope,

               AMQP.BasicProperties properties,

               byte[] body)

    throws IOException {

//这部分就是和下面的代码一起协作来实现将异步接收强制变成同步接收

synchronized (_continuationMap) {

    String replyId = properties.getCorrelationId();

    BlockingCell<Object> blocker = _continuationMap.get(replyId);

    _continuationMap.remove(replyId);

    blocker.set(body);

}

    }

};

//让接收消息的consumerreplyQueue上去接收消息,这个过程对于主线程来说是异步进行的,只要replyQueue上有消息了,consumer就会去replyQueue上去接收消息,并回调它的handleDelivery方法

_channel.basicConsume(_replyQueue, true, consumer);

return consumer;

}

 

 

  • l  发送消息

 

byte[] result = rpcClient.primitiveCall(msg.getBytes());

使用rpcClientprimitiveCall发送消息,看看是怎么做的

public byte[] primitiveCall(byte[] message) throws IOException, ShutdownSignalException {

return primitiveCall(null, message);

}

继续跟踪,核心方法是这个

public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message) throws IOException, ShutdownSignalException{

//检查consumer是否为空,若为空,抛出异常

checkConsumer();

 

BlockingCell<Object> k = new BlockingCell<Object>();

synchronized (_continuationMap) {

    _correlationId++;

    String replyId = "" + _correlationId;

//如果props不为空,则将上一步骤创建的replyQueue设置到props上去,还有replyId

    if (props != null) {

        props.setCorrelationId(replyId);

        props.setReplyTo(_replyQueue);

    }

    else {

//如果props为空,则创建一个,并将replyIdreplyQueue都设置到props

        props = new AMQP.BasicProperties(null, null, null, null,

                    null, replyId,

                    _replyQueue, null, null, null,

                    null, null, null, null);

    }

    _continuationMap.put(replyId, k);

}

//使用上面的props发送消息,这样replyQueuereplyId就跟着传递到了接收消息的那一方去了,接收消息的clientprops上去取到replyQueue,它就知道了它接收的消息的回复queue,然后它会将回复消息发送到replyQueue上去,而在上一步骤我们已经指定了一个consumerreplyQueue上去取消息,所以整个发送和接收消息的所有client是有条不紊的进行着

publish(props, message);  //这行代码执行完后,只是将消息发送出去了,接收回复消息是异步的,由上一步骤的consumer去接收回复消息

//这里就是进行同步等待接收回复消息,将异步接收变成同步回复接收的核心就在这里

Object reply = k.uninterruptibleGet();

if (reply instanceof ShutdownSignalException) {

    ShutdownSignalException sig = (ShutdownSignalException) reply;

    ShutdownSignalException wrapper =

    new ShutdownSignalException(sig.isHardError(),

                   sig.isInitiatedByApplication(),

                   sig.getReason(),

                   sig.getReference());

    wrapper.initCause(sig);

    throw wrapper;

} else {

    return (byte[]) reply;

}

}


完整描述
  • 创建RpcClient实例:
1,定义一个Map,用于存放每个消息的相关信息:
    private final Map<String, BlockingCell<Object>> _continuationMap = new HashMap<String, BlockingCell<Object>>();
    Key是一个correlationId,相当于当前rpcClient实例发送消息的一个计数器,初始化时是0,每发送一个消息时,加1
    Value是一个com.rabbitmq.utility.BlockingCell对象,它是在发送消息前创建,并和当前的correlationId进行关联,放进来
    _continuationMap.put(correlationId, blockingCell);   
2,correlationId初始化为0
3,创建一个回复queue,replyQueue=channel.queueDeclare("", false, false, true, true, null).getQueue();
4,创建一个接收回复消息的consumer
5,指定consumer接收replyQueue上的消息,channel.basicConsume(replyQueue, true, consumer);

  • RpcClient发送消息:
1,创建一个BlockingCell<Object>对象blockingCell
1,correlationId++
2,创建BasicProperties对象,并将correlationId,replyQueue设置到它上面,发送消息时,它会被传递到接收方
3,以correlationId为Key,将blockingCell放入到_continuationMap中
4,发送消息:channel.basicPublish(exchange,  routingKey,  上面 步骤得到的BasicProperties对象,  message);
5,获取回复消息,Object reply = blockingCell.uninterruptibleGet();这里就是同步等待回复消息

  • RpcServer接收消息:
1,接收消息
2,从request中获取BasicProperties对象requestProperties,requestProperties=request.getProperties()
3,从requestProperties中得到correlationId,replyQueue
4,创建一个回复消息用的BasicProperties对象replyProperties,并将correlationId设置到它上面
4,发送回复消息:channel.basicPublish("", replyQueue, replyProperties, replyMessage);

  • RpcClient接收回复:
1,replyQueue一有消息,consumer就会接收到并回调consumer的handleDelivery方法
2,获取传递过来的BasicProperties获取correlationId
3,根据correlationId去continuationMap中取BlockingCell对象,BlockingCell<Object> blocker = continuationMap.get(correlationId);
4,从continuationMap中删除,continuationMap.remove(correlationId);
5,将回复消息设置到blocker对象里面,blocker.set(replyMessage);

  • 同步等待回复消息:
1,【RpcClient发送消息】第4步主线程,发送消息后,第5步就去获取回复消息
2,【RpcClient发送消息】第5步主线程,blockingCell.uninterruptibleGet(),如果blockingCell没有被set(value)过,那么让当前主线程处于等待wait(),等待状态
3,【RpcClient接收回复】第5步blocker.set(replyMessage);这里的blocker其实就是上面主线程创建的blockingCell,因为它是根据correlationId去continuationMap中取的,set(replyMessage),blocker会用一个属性将replyMessage保存起来,供get的时候去返回这个属性,然后调用notify();唤醒处于等待的主线程(当前这步所在的线程和上一步主线程是在两个线程,所以主线程的等待是可以被这个线程唤醒的),主线程被唤醒后,get()就会取到replyMessage,最终整个步骤实现了将异步接收强制转换为同步等待接收

  • BlockingCell类
public class BlockingCell<T> {

    private boolean _filled = false;
    private T _value;
   
    private static final long NANOS_IN_MILLI = 1000 * 1000;
    private static final long INFINITY = -1;

    public BlockingCell() {
    }

    public synchronized T get() throws InterruptedException {
        while (!_filled) {    //如果value没有被设置过
            wait();  //让当前线程处于等待,直到其它线程调用当前对象的notify()或notifyAll()为止
        }
        return _value;
    }
   
    //带超时的get
    public synchronized T get(long timeout) throws InterruptedException, TimeoutException {
        if (timeout < 0 && timeout != INFINITY)
            throw new AssertionError("Timeout cannot be less than zero");
        if (!_filled && timeout != 0) {
            wait(timeout == INFINITY ? 0 : timeout);
        }
        if (!_filled)
            throw new TimeoutException();
        return _value;
    }
   
    //无限制的等待,直到取到值为止
    public synchronized T uninterruptibleGet() {
        while (true) {
            try {
                return get();
            } catch (InterruptedException ex) {
            }
        }
    }
   
    public synchronized T uninterruptibleGet(int timeout) throws TimeoutException {
        long now = System.nanoTime() / NANOS_IN_MILLI;
        long runTime = now + timeout;
        do {
            try {
                return get(runTime - now);
            } catch (InterruptedException e) {
            }
        } while ((timeout == INFINITY) || ((now = System.nanoTime() / NANOS_IN_MILLI) < runTime));
        throw new TimeoutException();
    }

    public synchronized void set(T newValue) {
        if (_filled) {
            throw new AssertionError("BlockingCell can only be set once");
        }
        _value = newValue;
        _filled = true;
        notify(); //唤醒当前线程(处于等待状态)
    }

    //保证只能被set(value)一次
    public synchronized boolean setIfUnset(T newValue) {
        if (_filled) {
            return false;
        }
        set(newValue);
        _filled = true;
        return true;
    }
}
分享到:
评论

相关推荐

    基于rabbitmq的rpc调用的3中方式实战-rabbitmq-rpc.zip

    在代码实现上,通常会使用RabbitMQ的客户端库,如Java的`rabbitmq-client`,Python的`pika`等,它们提供了方便的API来创建连接、通道、声明交换器和队列,以及发送和接收消息。 理解并掌握这些RPC实现方式有助于...

    RabbitMQ学习案例Demo

    - **打开通道**:在连接上打开一个Channel,这是实际发送和接收消息的地方。 - **声明队列**:在点对点和应答模式中,需要声明队列。在发布/订阅模式中,订阅者也需要声明订阅的队列。 - **发送消息**:使用...

    JAVA 面试入坟系列(0.0.1) By_萧曵丶.pdf

    - **RabbitMQ**:AMQP标准的消息代理和队列服务器,适用于复杂的路由场景。 #### 五、并发与多线程 - **Synchronized和ReentrantLock的区别** - **Synchronized**:内置锁,使用关键字修饰方法或代码块。 - **...

    IT 单词1

    25. **Consumer**:在微服务或消息队列中,消费者是指接收和处理消息的应用程序。 26. **Reference**:在编程中,引用通常指代对象的一个别名,可以用来访问和操作该对象。 27. **Registry**:注册表在Windows操作...

    spring-boot-2.3.6.RELEASE.jar中文-英文对照文档.zip

    # 压缩文件中包含: 中文-英文对照文档 jar包下载地址 Maven依赖 Gradle依赖 源代码下载地址 # 本文件关键字: jar中文-英文对照文档.zip,java,jar包,Maven,第三方jar包,组件,开源组件,第三方组件,Gradle,中文API文档,手册,开发手册,使用手册,参考手册 # 使用方法: 解压最外层zip,再解压其中的zip包,双击 【index.html】 文件,即可用浏览器打开、进行查看。 # 特殊说明: ·本文档为人性化翻译,精心制作,请放心使用。 ·只翻译了该翻译的内容,如:注释、说明、描述、用法讲解 等; ·不该翻译的内容保持原样,如:类名、方法名、包名、类型、关键字、代码 等。 # 温馨提示: (1)为了防止解压后路径太长导致浏览器无法打开,推荐在解压时选择“解压到当前文件夹”(放心,自带文件夹,文件不会散落一地); (2)有时,一套Java组件会有多个jar,所以在下载前,请仔细阅读本篇描述,以确保这就是你需要的文件;

    scratch少儿编程逻辑思维游戏源码-糖果狩猎 多人游戏.zip

    scratch少儿编程逻辑思维游戏源码-糖果狩猎 多人游戏.zip

    spring-boot-1.1.12.RELEASE.jar中文文档.zip

    # 压缩文件中包含: 中文文档 jar包下载地址 Maven依赖 Gradle依赖 源代码下载地址 # 本文件关键字: jar中文文档.zip,java,jar包,Maven,第三方jar包,组件,开源组件,第三方组件,Gradle,中文API文档,手册,开发手册,使用手册,参考手册 # 使用方法: 解压最外层zip,再解压其中的zip包,双击 【index.html】 文件,即可用浏览器打开、进行查看。 # 特殊说明: ·本文档为人性化翻译,精心制作,请放心使用。 ·只翻译了该翻译的内容,如:注释、说明、描述、用法讲解 等; ·不该翻译的内容保持原样,如:类名、方法名、包名、类型、关键字、代码 等。 # 温馨提示: (1)为了防止解压后路径太长导致浏览器无法打开,推荐在解压时选择“解压到当前文件夹”(放心,自带文件夹,文件不会散落一地); (2)有时,一套Java组件会有多个jar,所以在下载前,请仔细阅读本篇描述,以确保这就是你需要的文件;

    scratch少儿编程逻辑思维游戏源码-小船.zip

    scratch少儿编程逻辑思维游戏源码-小船.zip

    【车间调度】基于遗传算法GA求解车间调度优化问题【含Matlab源码 13251期】.zip

    985研究生,Matlab领域优质创作者 (1)如需代码 加腾讯企鹅号,见评论区或私信; (2)代码运行版本 Matlab 2019b (3)其他仿真咨询 1 完整代码包运行+运行有问题可咨询 2 期刊或论文复现; 3 程序定制; 4 期刊写作或指导; 5 科研合作;

    spring-boot-1.4.4.RELEASE.jar中文-英文对照文档.zip

    # 压缩文件中包含: 中文-英文对照文档 jar包下载地址 Maven依赖 Gradle依赖 源代码下载地址 # 本文件关键字: jar中文-英文对照文档.zip,java,jar包,Maven,第三方jar包,组件,开源组件,第三方组件,Gradle,中文API文档,手册,开发手册,使用手册,参考手册 # 使用方法: 解压最外层zip,再解压其中的zip包,双击 【index.html】 文件,即可用浏览器打开、进行查看。 # 特殊说明: ·本文档为人性化翻译,精心制作,请放心使用。 ·只翻译了该翻译的内容,如:注释、说明、描述、用法讲解 等; ·不该翻译的内容保持原样,如:类名、方法名、包名、类型、关键字、代码 等。 # 温馨提示: (1)为了防止解压后路径太长导致浏览器无法打开,推荐在解压时选择“解压到当前文件夹”(放心,自带文件夹,文件不会散落一地); (2)有时,一套Java组件会有多个jar,所以在下载前,请仔细阅读本篇描述,以确保这就是你需要的文件;

    spring-boot-1.4.4.RELEASE.jar中文文档.zip

    # 压缩文件中包含: 中文文档 jar包下载地址 Maven依赖 Gradle依赖 源代码下载地址 # 本文件关键字: jar中文文档.zip,java,jar包,Maven,第三方jar包,组件,开源组件,第三方组件,Gradle,中文API文档,手册,开发手册,使用手册,参考手册 # 使用方法: 解压最外层zip,再解压其中的zip包,双击 【index.html】 文件,即可用浏览器打开、进行查看。 # 特殊说明: ·本文档为人性化翻译,精心制作,请放心使用。 ·只翻译了该翻译的内容,如:注释、说明、描述、用法讲解 等; ·不该翻译的内容保持原样,如:类名、方法名、包名、类型、关键字、代码 等。 # 温馨提示: (1)为了防止解压后路径太长导致浏览器无法打开,推荐在解压时选择“解压到当前文件夹”(放心,自带文件夹,文件不会散落一地); (2)有时,一套Java组件会有多个jar,所以在下载前,请仔细阅读本篇描述,以确保这就是你需要的文件;

    Java超市管理系统样本.doc

    Java超市管理系统样本

    基于MATLAB的柔性车间调度系统设计与实现:遗传算法优化及甘特图生成

    内容概要:本文介绍了基于MATLAB的柔性车间调度系统的设计与实现。该系统旨在通过遗传算法优化车间调度,以最大化生产效率并最小化生产成本。系统不仅考虑了机器的柔性(即不同类型的机器具有不同的加工能力和成本),还实现了甘特图和收敛曲线的自动生成,便于分析和优化调度结果。主要内容包括:问题描述与需求分析、系统设计(算法选择、机器柔性设计)、系统实现(MATLAB源代码编写、柔性车间调度模型构建)以及实验与分析。 适合人群:从事制造行业、工业工程、自动化控制领域的研究人员和技术人员,尤其是对车间调度优化感兴趣的读者。 使用场景及目标:适用于需要优化生产调度的企业和研究机构,帮助他们提升生产效率、降低成本,并提供可视化的调度结果分析工具。 阅读建议:读者可以通过本文详细了解遗传算法在车间调度中的应用,掌握MATLAB编程技巧,学会利用甘特图和收敛曲线进行调度结果的分析和优化。

    GIS和旅游景点规划视域分析专题培训课件.ppt

    GIS和旅游景点规划视域分析专题培训课件.ppt

    spring-boot-1.4.5.RELEASE.jar中文-英文对照文档.zip

    # 压缩文件中包含: 中文-英文对照文档 jar包下载地址 Maven依赖 Gradle依赖 源代码下载地址 # 本文件关键字: jar中文-英文对照文档.zip,java,jar包,Maven,第三方jar包,组件,开源组件,第三方组件,Gradle,中文API文档,手册,开发手册,使用手册,参考手册 # 使用方法: 解压最外层zip,再解压其中的zip包,双击 【index.html】 文件,即可用浏览器打开、进行查看。 # 特殊说明: ·本文档为人性化翻译,精心制作,请放心使用。 ·只翻译了该翻译的内容,如:注释、说明、描述、用法讲解 等; ·不该翻译的内容保持原样,如:类名、方法名、包名、类型、关键字、代码 等。 # 温馨提示: (1)为了防止解压后路径太长导致浏览器无法打开,推荐在解压时选择“解压到当前文件夹”(放心,自带文件夹,文件不会散落一地); (2)有时,一套Java组件会有多个jar,所以在下载前,请仔细阅读本篇描述,以确保这就是你需要的文件;

    NU1680低成本、无固件、高集成度无线充电电源接收器电路图和BOM

    BOM建议直接使用原理图,BOM只供参考 支持QI协议的无线充电接收端芯片 可编程的3.5-9V输出电压 5W BPP 无线功率接收 Rx 极简的电路设计解决方案:1 个线圈+1 片 NU1680 + 12 颗外围器件 无固件烧入,可节省研发和生产时间和资源 去除了同步整流桥上的自举电容,使成本更低 具备 I2C 功能,可通过它配置 FOD 等寄存器参数 小尺寸,16 脚 QFN 封装,3.0mm x 3.0mm,脚间距 0.5mm 原理图和BOM可点绑定资源下载,LC部分电容建议X7R。

    spring-boot-2.2.6.RELEASE.jar中文文档.zip

    # 压缩文件中包含: 中文文档 jar包下载地址 Maven依赖 Gradle依赖 源代码下载地址 # 本文件关键字: jar中文文档.zip,java,jar包,Maven,第三方jar包,组件,开源组件,第三方组件,Gradle,中文API文档,手册,开发手册,使用手册,参考手册 # 使用方法: 解压最外层zip,再解压其中的zip包,双击 【index.html】 文件,即可用浏览器打开、进行查看。 # 特殊说明: ·本文档为人性化翻译,精心制作,请放心使用。 ·只翻译了该翻译的内容,如:注释、说明、描述、用法讲解 等; ·不该翻译的内容保持原样,如:类名、方法名、包名、类型、关键字、代码 等。 # 温馨提示: (1)为了防止解压后路径太长导致浏览器无法打开,推荐在解压时选择“解压到当前文件夹”(放心,自带文件夹,文件不会散落一地); (2)有时,一套Java组件会有多个jar,所以在下载前,请仔细阅读本篇描述,以确保这就是你需要的文件;

    spring-boot-1.1.4.RELEASE.jar中文文档.zip

    # 压缩文件中包含: 中文文档 jar包下载地址 Maven依赖 Gradle依赖 源代码下载地址 # 本文件关键字: jar中文文档.zip,java,jar包,Maven,第三方jar包,组件,开源组件,第三方组件,Gradle,中文API文档,手册,开发手册,使用手册,参考手册 # 使用方法: 解压最外层zip,再解压其中的zip包,双击 【index.html】 文件,即可用浏览器打开、进行查看。 # 特殊说明: ·本文档为人性化翻译,精心制作,请放心使用。 ·只翻译了该翻译的内容,如:注释、说明、描述、用法讲解 等; ·不该翻译的内容保持原样,如:类名、方法名、包名、类型、关键字、代码 等。 # 温馨提示: (1)为了防止解压后路径太长导致浏览器无法打开,推荐在解压时选择“解压到当前文件夹”(放心,自带文件夹,文件不会散落一地); (2)有时,一套Java组件会有多个jar,所以在下载前,请仔细阅读本篇描述,以确保这就是你需要的文件;

    spring-data-redis-1.4.4.RELEASE.jar中文-英文对照文档.zip

    # 压缩文件中包含: 中文-英文对照文档 jar包下载地址 Maven依赖 Gradle依赖 源代码下载地址 # 本文件关键字: jar中文-英文对照文档.zip,java,jar包,Maven,第三方jar包,组件,开源组件,第三方组件,Gradle,中文API文档,手册,开发手册,使用手册,参考手册 # 使用方法: 解压最外层zip,再解压其中的zip包,双击 【index.html】 文件,即可用浏览器打开、进行查看。 # 特殊说明: ·本文档为人性化翻译,精心制作,请放心使用。 ·只翻译了该翻译的内容,如:注释、说明、描述、用法讲解 等; ·不该翻译的内容保持原样,如:类名、方法名、包名、类型、关键字、代码 等。 # 温馨提示: (1)为了防止解压后路径太长导致浏览器无法打开,推荐在解压时选择“解压到当前文件夹”(放心,自带文件夹,文件不会散落一地); (2)有时,一套Java组件会有多个jar,所以在下载前,请仔细阅读本篇描述,以确保这就是你需要的文件;

    spring-boot-2.2.1.RELEASE.jar中文文档.zip

    # 压缩文件中包含: 中文文档 jar包下载地址 Maven依赖 Gradle依赖 源代码下载地址 # 本文件关键字: jar中文文档.zip,java,jar包,Maven,第三方jar包,组件,开源组件,第三方组件,Gradle,中文API文档,手册,开发手册,使用手册,参考手册 # 使用方法: 解压最外层zip,再解压其中的zip包,双击 【index.html】 文件,即可用浏览器打开、进行查看。 # 特殊说明: ·本文档为人性化翻译,精心制作,请放心使用。 ·只翻译了该翻译的内容,如:注释、说明、描述、用法讲解 等; ·不该翻译的内容保持原样,如:类名、方法名、包名、类型、关键字、代码 等。 # 温馨提示: (1)为了防止解压后路径太长导致浏览器无法打开,推荐在解压时选择“解压到当前文件夹”(放心,自带文件夹,文件不会散落一地); (2)有时,一套Java组件会有多个jar,所以在下载前,请仔细阅读本篇描述,以确保这就是你需要的文件;

Global site tag (gtag.js) - Google Analytics