Publish/Subscribe
/* Pub/Sub Model / [...](Q1) -> subscriber1 P -> X(type fanout) \ [...](Q2) -> subscriber1 */
这是很常见的应用场景,生产者在某一时刻将一条消息发送给多个消费者。只是需要注意rabbitmq和其他典型的消息队列有些区别的是生产者并不直接将消息放入队列中,实际上生产者并不知道队列的存在。
它只能将消息发送给一个exchange交换组件,这个交换组件要做的事很简单。一方面接收消息,另一方面将消息放入队列。不过它需要知道根据什么规则来处理这些消息(是否放入一个特殊队列还是多个队列,或者被丢弃)。有一些规则可以使用direct,topic,headers,fanout.
pub/sub模型中我们用到fanout类型
channel.exchangeDeclare("chatroom","fanout"); channel.basicPublish("chatroom","",null,mesage.getBytes());
前面简单的producer/consumer与taskqueue都是需要指定一个队列名,而我们这里需要一个临时队列(缘由如下)
1,consumer无论何时接入rabbit我们都需要一个空的队列(自己随机命名或让rabbitmq给我们指定一个随机队列)
2,consumer不再监听的时候临时队列需要及时删除
String queueName = channel.queueDeclare().getQueue();
当创建完exchange和queue,需要绑定它们两
channel.queueBind(queueName, "chatroom", "");
Publisher: Send.java
package test; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { public static void main(String[] args) throws IOException { // 创建一个连接连接服务器 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); //factory.setPort(1987); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个队列,可以对队列做配置,如持久化等。然后往队列发送数据 channel.exchangeDeclare("chatroom","fanout"); for(int i = 1; i < 10; i ++){ String message = "message " + i; channel.basicPublish("chatroom", "", null, message.getBytes()); System.out.println(" [x] Sent '" + message +"'"); } channel.close(); connection.close(); } }
Subscriber: Recv.java
package test; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ShutdownSignalException; public class Recv { public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { // 创建一个连接接收数据 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); //factory.setPort(1987); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("chatroom","fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "chatroom", ""); // 等待消息 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
$ javac -cp rabbitmq-client.jar Recv.java Send.java
运行多个Recv实例,可以看到每个接收端都能获取到所有的消息。(后启动的Recv实例只会获取后它启动后的消息)
相关推荐
rabbitmq 发布/订阅 java 实现
Linux 安装 RabbitMQ 应用 / RabbitMQ 延时队列
WCF/RabbitMQ绑定/负载均衡。修改RabbitMQ.ServiceModel,以支持WCF负载均衡;RabbitMQBinding WCF客户端/服务端示例;
socket.io-rabbitmq socket.io 适配器实现。 安装 $ npm install socket.io-rabbitmq 用法 var io = require ( 'socket.io' ) ( 3000 ) ; var rabbitmq = require ... port :连接到rabbitmq pub/sub服务器的端口
学习RabbitMQ的学习笔记
该资源为rabbitMQ,附件内容包含了,rabbitMQ安装说明,配置说明,三大消息代码示例
RabbitMQ客户连接池的Java实现。我们刚开始也是采用这种方式来实现的,但做压力测试时,发现这种每次新建Connection和新建Channel是非常耗时的,在大并发下,一般都要8毫秒左右,慢的话,好多都是几十毫秒。因此我们...
x-oss-process=style/pnp8 (42.73KB, 下载次数:227) 下载附件 2019-12-2023 :01 上传【课程介绍】:第一章 : RabbitMQ介绍:消息中间件概念、RabbitMQ安装、RabbitMq客户端调用的Java实现。 第 2 章:RabbitMQ 概念...
该文档介绍docker部署redis/mongod/rabbitmq/nacos/mysql等服务详细步骤
rabbitMQ实战java版-rabbitMQ-demo
Rabbitmq-pub Rabbitmq发布者
rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ...
RabbitMQ:安装、配置与使用初探
Java使用RabbitMq的一个简单demo,自留。
Java全能学习面试手册——Java面试题库.zip 01 7道消息队列ActiveMQ面试题!.pdf 02 10道Java高级必备的Netty面试题!.pdf 03 10道Java面试必备的设计模式面试题!.pdf 04 10个Java经典的List面试题!.pdf 05 10个...
javaAPI SpringMVC 集成rabbitMQ 很全的例子,实现了生产消费,重复消费等功能
导入里面的sql,也就是一个user表,直接运行,访问http://localhost:8080/rabbitMq/getUserSyc/1同步调用mq 访问http://localhost:8080/rabbitMq/getUser/1异步调用mq 通过反射调用业务代码,实现一个队列可以多种...
RabbitMQClientUtil是MQ的测试工具类,他封装了fanout、direct、topic三种exchange模式,并包括发送数据和接收数据。...rabbitmq.properties配置文件根据自己需要自行放在,放置完毕后,请自己修改util中的文件地址
RabbitMQ 三种Exchange.wps————————三种exchange解释及代码 rabbitmq结构.wps————————rabbitmq架构简介 rabbitmq入门.pdf——————入门的文档 RabbitMQ研究与应用.pdf——————简单的研究