`
wbj0110
  • 浏览: 1557153 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

RabbitMQ 使用教程 与 spring整合

阅读更多

spring integration系列 之RabbitMQ and spring

文章中 很多人都不熟悉讲这个 大家云里雾里,我确实忽略了 这个问题,这一集 就是基础篇 基本的RabbitMQ

在实际工作中我们常常使用RabbitMQ发布以及监听消息

其实大多数时候 我用它来做大量的异步消息通讯 数据的临时存储 用途

下面第一步 添加相关依赖

 

1 <!-- Spring RabbitMQ -->
2         <dependency>
3             <groupId>org.springframework.amqp</groupId>
4             <artifactId>spring-rabbit</artifactId>
5             <version>${spring.amqp.version}</version>
6         </dependency>

这里需要添加好相关的依赖支持

 

在我们常用系统日志服务 系统监控消息中,我们常常需要记录系统的异常 消息,系统的执行方法 时间 以及相应的指标 下面看看这个过程的实现

在上面的过程中 我们可以用AOP + RabbitMQ +数据库 很方便的实现,

下面讲讲这个过程 常用简单的方式有实现 ThrowAdvice 捕获异常 实现AfterReturningAdvice获取返回后 MethodBeforeAdvice 方法执行前 ,你还能实现自定义拦截器 这是简单的使用用法 ,关于aop的几种方式网上好多 自行百度 谷歌

配置好Aop 然后 定义好相关记录的实体 如Record 有执行时间 方法 结果  其他信息 等等

运行时候aop监听 在执行监听方法前记录当前微秒数 执行后记录微秒 方法执行结果 方法名称 获取存入Record实体bean中

然后加入rabbitMQ的相关配置 ,这里我说一下 前一章的那张图,rabbitMQ 客户端通过 exchange 转发消息到相应的channel通道中 而exchange通过绑定与chanenl建立联系

 

下面看看我们的配置文件 首先加入rabbit头schema 命名空间

 

01 <?xml version="1.0" encoding="UTF-8"?>
02 <beans   xmlns="http://www.springframework.org/schema/beans"
03         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
04         xmlns:p="http://www.springframework.org/schema/p"
05         xmlns:context="http://www.springframework.org/schema/context"
06         xmlns:rabbit="http://www.springframework.org/schema/rabbit"
07          
08         xsi:schemaLocation="
09             http://www.springframework.org/schema/beans
10             http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
11             http://www.springframework.org/schema/context
12             http://www.springframework.org/schema/context/spring-context-3.1.xsd
13             http://www.springframework.org/schema/rabbit
14             http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

 

加入命名空间后,加入rabbitmq相关连接工厂

1 <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory" p:username="admin" p:password="admin" p:port="5672">
2         <constructor-arg value="localhost" />
3     </bean>

这个编辑器真不好对齐这些代码。。。

下面加入相关的rabbitTemplate模板的整合

01 <rabbit:queue id="logErrorQueue"/>
02     <rabbit:queue id="logMessageQueue"/>
03      
04     <rabbit:topic-exchange name="zypExchange">
05         <rabbit:bindings>
06             <rabbit:binding queue="logErrorQueue" pattern="logerror.*"/>
07             <rabbit:binding queue="logMessageQueue" pattern="logMessageInfo.*"/>
08         </rabbit:bindings>
09     </rabbit:topic-exchange>
10      
11     <rabbit:template connection-factory="rabbitConnectionFactory" exchange="zypExchange"/>
12  
13     <rabbit:admin connection-factory="rabbitConnectionFactory"/>
14  
15     <rabbit:listener-container>
16         <rabbit:listener queues="logErrorQueue" ref="jmsService" method="handleError"/>
17         <rabbit:listener queues="logMessageQueue" ref="jmsService" method="handleMessageInfo"/>
18     </rabbit:listener-container>

下面就可以在方法中 前面的aop中 拦截器中 使用

@resource 或者@autoWire 或者@Inject注入模板amqpTemplate

1 amqpTemplate.convertAndSend(这是上面配置的exchange名称, 这是上面配置的队列名称, 这是我要发送的消息record+":"+new Date());

也就是说 amqpTemplate.convertAndSend("zypExchange","logMessageInfo.*","我要发送的消息 可以定制自己的格式 系列化方式"); 记住如果exchange传递“”空字符串那么代表使用默认default的exchange来委派channel也就是相当于没有绑定一样 当然你可以设置default的exchange的绑定方式,

当然还有amqpTemplate多种发送的方式 

下面的交给上面配置rabbitMq监听器 JmsService类 的方法处理 我是用队列接受的 

看下我的处理方式 

JmsService

01 import java.util.Queue;
02 import java.util.concurrent.LinkedBlockingQueue;
03  
04 import org.springframework.stereotype.Service;
05  
06 import com.alibaba.fastjson.JSON;
07 import com.google.common.base.Splitter;
08  
09 @Service
10 public class JmsService
11 {
12  
13      
14     private  Queue<String> logs = new LinkedBlockingQueue<String>();//可以用其他的队列都行 jdk5的并发阻塞队列也行 在多线程环境下用它
15     public void handleMessageInfo(String logMessage){
01                  logs.add(logMessage)//这个消息队列可以判断大小 传递到下一节点
02         //前面我们知道我们传递过来的消息是record的对象json字符串+":"+时间
03         String[] strs=logMessage.split(":");
04         String record=strs[0];
05          Record rd=(Record) JSON.parse(record);
06          String time=strs[1];
07          recordService.saveRecord(time,record);//写入数据库中 或者其他应用 直接交给前端的controller也行
08     }
09      
10      
11  
12      
13 }

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics