前面我们已经学习了发送消息及同步接收消息的例子了。下面我们来看看如何通过Spring配置来实现异步接收消息。
现在我们建立两个WEB项目。发送消息的项目命名为”rabbitmq-demo-producer“ ,异步接受的消息项目名称”rabbitmq-demo-consumer“。
下面来看看rabbitmq-demo-producer项目中发送信息的程序及配置。
MessageProducer类是用于发送消息的类。实现如下
- package com.abin.rabbitmq;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- public class MessageProducer {
- private RabbitTemplate rabbitTemplate;
- public void sendMessage(Integer i) {
- String message = "Hello World wubin " + "#" + i;
- //Exchange的名称为"hello.topic",routingkey的名称为"hello.world.q123ueue"
- rabbitTemplate.convertAndSend("hello.topic", "hello.world.q123ueue",
- message);
- System.out.println("发送第" + i + "个消息成功!内容为:" + message);
- // String messages = "Hello World direct " + "#" + i;
- // rabbitTemplate.convertAndSend("hello.direct", "hello.world.queue",
- // messages);
- // System.out.println("发送第" + i + "个消息成功!内容为:" + messages);
- }
- public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
- this.rabbitTemplate = rabbitTemplate;
- }
- }
spring的配置文件如下:applicationContext-rabbitmq.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
- <bean id="connectionFactory"
- class="org.springframework.amqp.rabbit.connection.SingleConnectionFactory">
- <constructor-arg value="localhost" />
- <property name="username" value="guest" />
- <property name="password" value="guest" />
- </bean>
- <bean id="amqpAdmin"
- class="org.springframework.amqp.rabbit.core.RabbitAdmin">
- <constructor-arg ref="connectionFactory" />
- </bean>
- <bean id="rabbitTemplate"
- class="org.springframework.amqp.rabbit.core.RabbitTemplate">
- <constructor-arg ref="connectionFactory"></constructor-arg>
- </bean>
- <bean id="messageProducer"
- class="com.abin.rabbitmq.MessageProducer">
- <property name="rabbitTemplate">
- <ref bean="rabbitTemplate" />
- </property>
- </bean>
- </beans>
对于发送消息的程序自己可以实现,我是通过Struts2来实现的,例如
- package com.abin.action;
- import java.util.Date;
- import com.abin.rabbitmq.MessageProducer;
- import com.opensymphony.xwork2.ActionSupport;
- public class SendAction extends ActionSupport {
- private MessageProducer messageProducer;
- public String execute() throws Exception {
- Date a = new Date();
- long b = System.currentTimeMillis();
- for (int i = 0; i <= 10000; i++) {
- messageProducer.sendMessage(i);
- }
- System.out.println(a);
- System.out.println(new Date());
- System.out.println("共花了" + (System.currentTimeMillis() - b) + "ms");
- return null;
- }
- public void setMessageProducer(MessageProducer messageProducer) {
- this.messageProducer = messageProducer;
- }
- }
发送消息项目的程序差不多就这些了
下面来看看接受消息的程序如下
HelloWorldHandler类用于接收消息的处理类,如下
- package com.abin.rabbitmq;
- import java.util.Date;
- public class HelloWorldHandler {
- public void handleMessage(String text) {
- System.out.println("Received: " + text);
- System.out.println(new Date());
- }
- }
spring的配置文件如下:applicationContext-rabbitmq.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
- <!-- 创建connectionFactory -->
- <bean id="connectionFactory"
- class="org.springframework.amqp.rabbit.connection.SingleConnectionFactory">
- <constructor-arg value="localhost" />
- <property name="username" value="guest" />
- <property name="password" value="guest" />
- </bean>
- <!-- 创建rabbitAdmin 代理类 -->
- <bean id="rabbitAdmin"
- class="org.springframework.amqp.rabbit.core.RabbitAdmin">
- <constructor-arg ref="connectionFactory" />
- </bean>
- <!-- 创建rabbitTemplate 消息模板类 -->
- <bean id="rabbitTemplate"
- class="org.springframework.amqp.rabbit.core.RabbitTemplate">
- <constructor-arg ref="connectionFactory"></constructor-arg>
- </bean>
- <!-- 声明Queue并设定Queue的名称 -->
- <bean id="helloWorldQueue"
- class="org.springframework.amqp.core.Queue">
- <constructor-arg value="hello.world.queue"></constructor-arg>
- </bean>
- <!-- 声明消息转换器为SimpleMessageConverter -->
- <bean id="messageConverter"
- class="org.springframework.amqp.support.converter.SimpleMessageConverter">
- </bean>
- <!-- 声明Exchange的类型为topic并设定Exchange的名称 -->
- <bean id="hellotopic"
- class="org.springframework.amqp.core.TopicExchange">
- <constructor-arg value="hello.topic"></constructor-arg>
- </bean>
- <!-- 声明Exchange的类型为direct并设定Exchange的名称 -->
- <bean id="hellodirect"
- class="org.springframework.amqp.core.DirectExchange">
- <constructor-arg value="hello.direct"></constructor-arg>
- </bean>
- <!-- 通过Binding来判定Queue、Exchange、routingKey -->
- <!-- 其中构建Binding的参数1是Queue,参数2是Exchange,参数3是routingKey -->
- <bean id="queuebling"
- class="org.springframework.amqp.core.Binding">
- <constructor-arg index="0" ref="helloWorldQueue"></constructor-arg>
- <constructor-arg index="1" ref="hellotopic"></constructor-arg>
- <constructor-arg index="2" value="hello.world.#"></constructor-arg>
- </bean>
- <!-- 监听生产者发送的消息开始 -->
- <!-- 用于接收消息的处理类 -->
- <bean id="helloWorldHandler"
- class="com.abin.rabbitmq.HelloWorldHandler">
- </bean>
- <!-- 用于消息的监听的代理类MessageListenerAdapter -->
- <bean id="helloListenerAdapter"
- class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
- <constructor-arg ref="helloWorldHandler" />
- <property name="defaultListenerMethod" value="handleMessage"></property>
- <property name="messageConverter" ref="messageConverter"></property>
- </bean>
- <!-- 用于消息的监听的容器类SimpleMessageListenerContainer,对于queueName的值一定要与定义的Queue的值相同 -->
- <bean id="listenerContainer"
- class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
- <property name="queueName" value="hello.world.queue"></property>
- <property name="connectionFactory" ref="connectionFactory"></property>
- <property name="messageListener" ref="helloListenerAdapter"></property>
- </bean>
- <!-- 监听生产者发送的消息结束 -->
- </beans>
相关推荐
你会使用Spring AMQP的RabbitTemplate内置应用系统来发布消息和使用一个MessageListenerAdapter POJO来订阅消息 git克隆 gradle bootRun 需要 大约几十分钟 一款文本编辑器或者IDE 你也可以从这个项目中入门代码...
一个使用springamqp实现的异步消息队列的股票系统,来自springamqp的官网,对于学习springamqp很有帮助。
使用Camunda,Spring Boot和RabbitMQ进行事件驱动的微服务编排 这是我的文章的示例项目: 。 概述 该样本包括三个子项目: amqp适配器 预制的通用适配器,隐藏RabbitMQ / AMQP配置的详细信息。 购物车服务 样本...
本项目基于Spring的AMQP模块,整合流行的开源消息队列中间件rabbitMQ,实现一个向rabbitMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。AMQP作为比JMS更加高级的消息协议,支持...
本项目基于Spring的AMQP模块,整合流行的开源消息队列中间件rabbitMQ,实现一个向rabbitMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。AMQP作为比JMS更加高级的消息协议,支持...
SpringBoot高级-消息-RabbitMQ基本概念简介 15、尚硅谷-SpringBoot高级-消息-RabbitMQ运行机制 16、尚硅谷-SpringBoot高级-消息-RabbitMQ安装测试 17、尚硅谷-SpringBoot高级-消息-RabbitTemplate发送接受消息&序列...
AMQP 消息传递通常用于异步事件驱动架构(发布/订阅)。 随着我们朝着拥有大量需要相互通信的微服务迈进,这通常是一种很好的方法。 但是仍然存在这样一种情况,即一个微服务需要从另一个微服务获取数据,并在对该...
RabbitMQ生产者消息确认:publisher-confirms(发送到交换机确认),publisher-returns(路由到队列确认) 定时任务:@EnableScheduling,@Scheduled,cron表达式 RabbitMQ消费者消息应答:@RabbitListener,listener.simple....
着重介绍SpringBoot的与各大场景的整合使用,内容包括:缓存(整合Redis),消息中间件(整合RabbitMQ),检索(整合ElasticSearch),任务(异步任务,定时任务,邮件任务),安全(整合SpringSecurity),分布式...
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现,AMQP就是一个协议,是一个高级抽象层消息通信协议。 虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),...
SpringFramework为与消息传递系统的集成提供了广泛的支持,从使用JmsTemplate简化JMSAPI的使用到异步接收消息的完整基础结构。SpringAMQP为高级消息队列协议提供了类似的功能集。SpringBoot还为RabbitTemplate和...