- 浏览: 839048 次
文章分类
- 全部博客 (365)
- java (124)
- spring mvc (21)
- spring (22)
- struts2 (6)
- jquery (27)
- javascript (24)
- mybatis/ibatis (8)
- hibernate (7)
- compass (11)
- lucene (26)
- flex (0)
- actionscript (0)
- webservice (8)
- rabbitMQ/Socket (15)
- jsp/freemaker (5)
- 数据库 (27)
- 应用服务器 (21)
- Hadoop (1)
- PowerDesigner (3)
- EJB (0)
- JPA (0)
- PHP (2)
- C# (0)
- .NET (0)
- html (2)
- xml (5)
- android (7)
- flume (1)
- zookeeper (0)
- 证书加密 (2)
- maven (1)
- redis (2)
- cas (11)
最新评论
-
zuxianghuang:
通过pom上传报错 Artifact upload faile ...
nexus上传了jar包.通过maven引用当前jar,不能取得jar的依赖 -
流年末年:
百度网盘的挂了吧???
SSO单点登录系列3:cas-server端配置认证方式实践(数据源+自定义java类认证) -
953434367:
UfgovDBUtil 是什么类
Java发HTTP POST请求(内容为xml格式) -
smilease:
帮大忙了,非常感谢
freemaker自动生成源代码 -
syd505:
十分感谢作者无私的分享,仔细阅读后很多地方得以解惑。
Nginx 反向代理、负载均衡、页面缓存、URL重写及读写分离详解
到http://github.com/momania/spring-rabbitmq下载其示例程序
实行远程接口调用,主要在com.rabbitmq.spring.remoting下几个类:
发布服务端(Server):RabbitInvokerServiceExporter.java
接口调用客户端(Client):RabbitInvokerProxyFactoryBean.java,RabbitInvokerClientInterceptor.java,
发布服务端(Server)——RabbitInvokerServiceExporter.java说明:
package com.rabbitmq.spring.remoting;
import static java.lang.String.format;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.SerializationUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.remoting.support.RemoteInvocation;
import org.springframework.remoting.support.RemoteInvocationBasedExporter;
import org.springframework.remoting.support.RemoteInvocationResult;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.RpcServer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.spring.ExchangeType;
import com.rabbitmq.spring.InvalidRoutingKeyException;
import com.rabbitmq.spring.channel.RabbitChannelFactory;
public class RabbitInvokerServiceExporter extends RemoteInvocationBasedExporter
implements InitializingBean, DisposableBean, ShutdownListener {
private final Log log = LogFactory
.getLog(RabbitInvokerServiceExporter.class);
private RabbitChannelFactory channelFactory;
private String exchange;
private ExchangeType exchangeType;
private String queueName;
private String routingKey;
private Object proxy;
private List<RpcServer> rpcServerPool;
private int poolsize = 1;
public void afterPropertiesSet() {
// 检查exchange type类型不能为fanout
if (exchangeType.equals(ExchangeType.FANOUT)) {
throw new InvalidRoutingKeyException(String.format(
"Exchange type %s not allowed for service exporter",
exchangeType));
}
exchangeType.validateRoutingKey(routingKey);
// 调用org.springframework.remoting.support.RemoteExporter的getProxyForService(),得到代理对象
proxy = getProxyForService();
// 初始化rpcServer池
rpcServerPool = new ArrayList<RpcServer>(poolsize);
// 初始化RpcServer,并开始接收请求
startRpcServer();
}
// 初始化RpcServer,并开始接收请求
private void startRpcServer() {
try {
log.info("Creating channel and rpc server");
// 创建临时的channel,用来定义queue,exchange,并进行bind
// 这里有两个用处:
// 1:在服务端也定义queue,避免因为先开服务端而出现queue没被定义的错误
// 2:这里先用一个channel定义一下qeueue,后面的for循环里面就不用每个都去定义了
Channel tmpChannel = channelFactory.createChannel();
tmpChannel.getConnection().addShutdownListener(this);
tmpChannel.queueDeclare(queueName, false, false, false, true, null);
if (exchange != null) {
tmpChannel.exchangeDeclare(exchange, exchangeType.toString());
tmpChannel.queueBind(queueName, exchange, routingKey);
}
// 创建poolsize个RpcServer,每个RpcServer使用一个单独的channel,并且分别使用单独的线程去接收请求,提升接收速度
for (int i = 1; i <= poolsize; i++) {
try {
// 每次都创建一个新的channel,因为一个channel在多个线程中使用是会有问题的(官方文档和channel的JavaDoc上是这样说的)
Channel channel = channelFactory.createChannel();
String format = "Starting rpc server %d on exchange [%s(%s)] - queue [%s] - routingKey [%s]";
log.info(String.format(format, i, exchange, exchangeType,
queueName, routingKey));
// 使用当前的channel创建一个RpcServer去处理请求
final RpcServer rpcServer = createRpcServer(channel);
rpcServerPool.add(rpcServer);
// 创建一个线程让当前的RpcServer去处理请求
Runnable main = new Runnable() {
@Override
public void run() {
try {
// rpcServer开始处理请求
throw rpcServer.mainloop();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
// 线程开始
new Thread(main).start();
} catch (IOException e) {
log.warn("Unable to create rpc server", e);
}
}
} catch (Exception e) {
log.error("Unexpected error trying to start rpc servers", e);
}
}
// 创建RpcServer对象
private RpcServer createRpcServer(Channel channel) throws IOException {
return new RpcServer(channel, queueName) {
// 重写处理接收到的消息的方法
public byte[] handleCall(byte[] requestBody,
AMQP.BasicProperties replyProperties) {
// 因为在客户端调用方法的时候,是将客户端调用的方法的信息封装成一个RemoteInvocation对象,然后序列化成一个byte数据再使用RpcClient发送到服务端的
// 所以在这里(服务端接收消息),将消息(requestBody)反序列化成RemoteInvocation对象
RemoteInvocation invocation = (RemoteInvocation) SerializationUtils
.deserialize(requestBody);
// 根据RemoteInvocation的信息,服务端使用代理对象执行相应的方法,并得到执行结果
RemoteInvocationResult result = invokeAndCreateResult(
invocation, proxy);
// 将执行结果序列化为byte数据,然后返回给客户端
return SerializationUtils.serialize(result);
}
};
}
public void setChannelFactory(RabbitChannelFactory channelFactory) {
this.channelFactory = channelFactory;
}
@Required
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public Object getProxy() {
return proxy;
}
@Override
public void destroy() throws Exception {
clearRpcServers();
}
// 清除所有的RpcServer
private void clearRpcServers() {
if (log.isInfoEnabled()) {
log.info(format("Closing %d rpc servers", rpcServerPool.size()));
}
for (RpcServer rpcServer : rpcServerPool) {
try {
// 中止处理请求
rpcServer.terminateMainloop();
rpcServer.close();
} catch (Exception e) {
log.warn("Error termination rpcserver loop", e);
}
}
rpcServerPool.clear();
if (log.isInfoEnabled()) {
log.info("Rpc servers closed");
}
}
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
if (log.isInfoEnabled()) {
log.info(String.format("Channel connection lost for reason [%s]",
cause.getReason()));
log.info(String.format("Reference [%s]", cause.getReference()));
}
if (cause.isInitiatedByApplication()) {
if (log.isInfoEnabled()) {
log.info("Sutdown initiated by application");
}
} else if (cause.isHardError()) {
log
.error("Shutdown is a hard error, trying to restart the RPC server...");
startRpcServer();
}
}
public void setExchange(String exchange) {
this.exchange = exchange;
}
@Required
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public void setPoolsize(int poolsize) {
this.poolsize = poolsize;
}
@Required
public void setExchangeType(ExchangeType exchangeType) {
this.exchangeType = exchangeType;
}
}
发表评论
-
rabbitmq 学习-13- 发送接收消息示例-2
2012-07-06 17:17 1726Basic RPC As a programming con ... -
rabbitmq 学习-12- 发送接收消息示例-1
2012-07-06 17:17 14392这里是同步发送消息,异步接收消息接收有两种方式:http:// ... -
rabbitmq 学习-积累
2012-07-06 17:17 13161,temporary queue(由server自动命名)在 ... -
rabbitmq 学习-11- 几个发送接收消息的重要类
2012-07-06 17:17 16141,ChannelbasicPublish() 用来发送消息, ... -
rabbitmq 学习-10-channel 说明
2012-07-06 17:17 5230rabbitmq java api 关于消息处理的一个重要的类 ... -
rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理
2012-06-30 16:44 3015本身使用RpcClient发送消息与同步接收消息的代码是很 ... -
rabbitmq 学习-8- Exchange Queue RoutingKey关系说明
2012-06-30 16:44 3095String queue = channel.queueDec ... -
rabbitmq 学习-7-rabbitmq 支持场景
2012-06-30 16:43 1240What messaging scenarios are su ... -
rabbitmq 学习-6-rabbitmq基础
2012-06-30 16:43 1720rabbitmq的中文资料真少,和同事lucas经过两周的 ... -
rabbitmq 学习-4-初试2
2012-06-29 14:32 1048RpcClient,RpcServer同步发送接收消息Chan ... -
rabbitmq 学习-3-初试1
2012-06-29 14:32 1038本例是一个简单的异步 ... -
rabbitmq 学习-1-AMQP介绍
2012-06-29 14:30 1366Windows 1,下载下载erlang:erlang. ... -
rabbitmq学习1:hello world
2012-06-29 14:27 1345rabbitMQ是一个在AMQP基础上完整的,可服用的企业消息 ... -
rabbitmq操作命令
2012-06-14 13:54 201161.必需掌握的指令 添加用户: ...
相关推荐
NULL 博文链接:https://wubin850219.iteye.com/blog/1076093
基于spring-rabbitmq的异步消息发送和RPC远程调用实例
基于Vue+SpringCloud博客的设计与实现---微服务基础版本组件1.0版本 博客采用Vue+SpringCloud前后分离的方式。博客采用了高可用Eureka(可以替换成其他微服务组件)以及高可用Zuul,使用以Es搜索引擎作为Zpkin的存储...
spring rabbitmq rpc 测试代码
Spring Boot与RabbitMQ的整合,内容非常简单,纯API的调用操作。 操作之间需要加入依赖Jar Message Broker是一种消息验证、传输、路由的架构模式,其设计目标主要应用于下面这些场景: 消息路由到一个或多个目的地 ...
AMQP 服务:Spring Boot + Spring AMQP + Rabbitmq 部署:单个可运行 jar + 嵌入式 tomcat ###邮件服务后端 邮件服务:Spring Boot + Spring Mail AMQP 服务:Spring Boot + Spring AMQP + Rabbitmq 部署:单个...
RabbitMQ柔性事务方案、SpringCloud-Gateway网关、Feign远程调用、Sleuth+Zipkin链路追踪系统、Spring Cache缓存、SpringSession跨子域Session同步方案、基于ElasticSearch7全文检索、异步编排与线程池、压力测试...
zipkin、rabbitmq、mysql整合的代码类demo,导入即可运行,SpringCloud 必备神器,Zipkin是一个分布式跟踪系统,Sleuth负责为服务之间的调用提供链路追踪,而Zipkin用于收集在微服务体系结构中延迟问题排除所需的定时...
mall 高并发,分布式架构 尚硅谷谷粒...远程调用 - openFeign 对象存储 - 阿里云对象存储 OSS 全文检索 - Elasticsearch 消息队列 - RabbitMQ 链路追踪 - Zipkin + Sleuth 线上监控系统 - Prometheus 日志系统 - Ela
springcloud-gateway:网关接口,暴露给调用方调用,包含负载均衡、重试、熔断等功能。 springcloud-zipkin:链路跟踪工具,监控并就持久化微服务集群中调用链路的通畅情况,采用rabbitmq异步传输、elasticsearch...
在spring4.0x的项目背景下,封装的一个好用简便的rabbitmq相关的sdk. 封装内容 消息base: BaseMqMessage消息基类,所有的业务消息都需要继承它,其中的eventType(根据业务需求是否要填)和routingKey(根据...
zipkin、rabbitmq、mysql整合的代码类demo,导入即可运行,SpringCloud 必备神器,Zipkin是一个分布式跟踪系统,Sleuth负责为服务之间的调用提供链路追踪,而Zipkin用于收集在微服务体系结构中延迟问题排除所需的定时...
- chapter1:[基本项目构建(可作为工程脚手架),引入...由于Spring Cloud偏宏观架构,Spring Boot偏微观细节,内容上越来越多,为了两部分内容不互相干扰,所以迁移Spring Cloud内容到:[SpringCloud-Learning项目]...
chapter5-2-1:Spring Boot中使用RabbitMQ 其他功能 chapter6-1-1:使用Spring StateMachine框架实现状态机 Spring Boot Actuator监控端点小结 在传统Spring应用中使用spring-boot-actuator模块提供监控端点 Spring ...
期待订单(orderNumber,productId,数量) 调用API时,定单(Json格式)将发送到RabbitMQ 监控来自Kibana的数据主流1 / POST到API端点 (转到 ,您将看到更多详细信息) 2 / Spring-boot通过“ Exchange”将Order...
14.Spring Cloud中Hystrix的请求合并 15.Spring Cloud中Hystrix仪表盘与Turbine集群监控 16.Spring Cloud中声明式服务调用Feign 17.Spring Cloud中Feign的继承特性 18.Spring Cloud中Feign配置详解 19.Spring ...
用户安全中心:SMTP邮箱注册邮箱,阿里云短信API注册手机以及其他个人安全信息和调用安全认证服务的接口,安全完成度最全。 用户文件头像上传中心:博客所用到的所有的图片和用户的图片均用阿里云OSS文件服务器,外...
如果哪天在某个服务调用另外一个服务端额时候,调用链路上出现网络的闪断或者其他故障,层级简单的调用还容易排查定位,但是调用层级复杂的话这就有点儿坑了,这样一来,zipkin-server由于服务无法追踪而导致问题...
简介:本项目是基于guli商城做的开发,其功能与guli商城一模一样,但项目结构是基于企业模式进行搭建的,比如有微服务之间依赖的具有远程调用的feign client jar包。只需要本地服务开启feign远程调用并添加对应包的...