基于AMQP实现RPC(Remote Procedure Call)的设计
guibin.beigjing@gmail.com
首先回忆一下RPC的过程。客户端将请求发送给服务器端,服务器端处理完毕之后将结果返回给客户端。那么在AMQP之上,如何实现RPC呢?客户端应当将请求publish给服务器端,服务器处理完毕之后,再将结果publish回给客户端。在这过程中有两次publish,显然应该是不同的publish。服务器端publish的消息,客户端不应该收到更不应该消费;给客户端publish的结果消息,应该只有客户端能收到;并且对于每次请求和相应,都应该有一组唯一的标识,标识并匹配相应的一对请求和响应,防止由于多次请求而收到多次响应,混淆了请求和响应之间的对应关系。
基于AMQP实现RPC的逻辑如下图所示:
- 服务器S提前建好一个专门接受来自客户端RPC请求的队列rpc_queue,并且客户端知道如何向此队列publish消息的rountingKey。通常情况下会建立一个和该接受请求的队列同名的rountingKey,并和该rpc_queue绑定。这样客户端向rountingKey发送请求后,服务器端就能收到请求了。
- 当客户端C启动的时候,生成一个匿名的、排他的、自动删除的回调队列(an anonymous, exclusive and auto delete callback queue),用来接受来自服务器端的响应。
- 对于一个PRC请求而言,客户端发送的消息必须包含两个属性“ reply_to”和“ correlation_id”。“reply_to”用来告诉服务器端计算后的结果发送到哪个rountingKey,“correlation_id”用做每个请求的唯一标识,以区分不同请求和不同响应。
- 客户端C将请求publish到rpc_queue,图中上面从左向右的过程。
- 服务器端S在监听队列rpc_queue,收到了来自客户端的请求之后,处理并得到结果,将结果包装成响应消息,publish给“reply_to”。
- 客户端C一直在监听“reply_to”队列,收到了结果消息后,取到消息中的属性“correlation_id”值,如果这个“correlation_id”和之前记录的请求的“correlation_id”相匹配,则确认这个响应是有效的,并将结果返回给调用者。
基于此设计,用Scala+RabbitMQ的实现源代码,请参考
AIOTrade中的lib.amqp包下的
RpcClient.scala和
RpcServer.scala。
参考文献:
http://www.rabbitmq.com/tutorial-six-python.html
http://www.infoq.com/articles/AMQP-RabbitMQ
Guibin
2011-02-20
- 大小: 14.4 KB
分享到:
相关推荐
开源项目-vibhavp-amqp-rpc.zip,amqprpc - An AMQP driver for net/rpc, feedback?
dart_amqp_rpc 使用 JSON、协议缓冲区或自定义用户定义的编解码器通过 AMQP 执行 RPC 的可选库。 该库依赖于包提供的 AMQP 功能。快速开始定义 RPC 客户端和服务器使用的接口: abstract class ArithInterface { ...
它是一种可立即投入生产,简单,基于承诺的且与回调兼容的AMQP RPC客户端。快速开始npm install amqp-rpc-client 然后: var AmqpRpcClient = require('amqp-rpc-client');var rpcClient = new AmqpRpcClient('...
在分析高级消息队列协议AMQP的基础上, 结合主动状态证书的订阅/发布要求, 设计了一种基于AMQP消息中间件的主动状态证书发布与订阅模型。通过实验分析表明, 该消息中间件系统应用灵活, 系统性能高效、稳定。
aioamqp, AMQP实现使用 asyncio aioamqp aioamqp 库是一个纯python实现,它是协议的。软件基于在 PEP中引入的python I/O 支持,提供了基于协同程序的API,使得编写高并发应用程序变得容易。Bug 报告,补丁和建议欢迎...
spring amqp 配置实现rabbitmq 路由
需要下载amqp-client-3.4.4.jar和JmeterAMQP.jar,并放在Jmeter的lib/ext目录
spring-cloud-config + spring-cloud-bus-amqp实现分布式集群配置动态更新,可更新实例对象,包含源码包+Rabbitmq安装包和安装说明
一个使用springamqp实现的异步消息队列的股票系统,来自springamqp的官网,对于学习springamqp很有帮助。
rabbitmq 路由spring-amqp 配置实现
spring rabbitmq rpc 测试代码
发布消息 package main import ( github.com/streadway/amqp log ) //我们还需要一个辅助函数来检查每个amqp调用的返回值... rabbbimqConn, err := amqp.Dial(amqp://admin:admin@ip地址:5672/) failOnError(err, F
Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性...
Spring AMQP API(Spring AMQP 开发文档).CHM 官网 Spring AMQP API。Spring AMQP 开发文档。
amqp, 基于EventMachine的RabbitMQ客户端更喜欢小兔子 ruby gem: 异步 ruby RabbitMQ客户端是一款功能丰富。基于eventmachine的RabbitMQ客户机,...它实现了 ,支持AMQP的 RabbitMQ扩展。警告: 仅在使用EventMachine时
amqp1.0协议翻译
Spring AMQP 是基于 Spring 框架的 AMQP 消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO。同时有 Java 和 .NET 的版本。
用于JMeter测试RabbitMQ等基于AMQP协议的采样插件.下载后,解压缩 后放到JMeter的lib/ext目录下即可