http://www.rabbitmq.com/tutorials/tutorial-three-java.html
在前面的章节中,我们创建了一个工作队列。工作队列假设每一个任务都交给一个工作者来处理。在这一章节,我们会处理一些完全不同的事情,我们会将消息发送给多个消费者。这种模式就叫做“publish/subscribe”
为了说明这种模式,我们会建立一个简单的日志系统。它包括两个程序:一个发送日志,另一个接受然后将他们打印出来。
在我们的日志系统中,每一个运行这的接受者,都将会收到消息。通过这种方式,我们可以在同一时间使一个接受者将消息保存到硬盘上,另一个接受者将消息打印到控制台。
本质上,被发布的消息将会广播到每一个接受者上
1:Exchanges(交换区)
在前面的章节中,我们都是将消息写到队列中,然后从队列中取出消息。现在是时候引入完整的Rabbit消息模型了
让我们回顾下前面章节都包括那些内容
一个producer发送消息的应用
一个queue缓存消息
一个consumer接受消息的应用
RabbitMQ消息模型的核心思想就是,生产者不把消息直接发送给队列。实际上,生产者在很多情况下都不知道消息是否会被发送到一个队列中。取而代之的是,生产者将消息发送到交换区。交换区是一个非常简单的东西,它一端接受生产者的消息,另一端将他们推送到队列中。交换区必须要明确的指导如何处理它接受到的消息。是放到一个队列中,还是放到多个队列中,亦或是被丢弃。这些规则可以通过交换区的类型来定义。
可用的交换区类型有:direct,topic,headers,fanout。下面将集中讨论最后一个:fanout。可以通过channel.exchangeDeclare(“logs”,”fanout”)来创建一个这种类型的交换区。它会把它接收到的所有消息,发送给所有和它绑定的队列。并且,这正是我们日志需要的。
可以通过客户端的rabbitmqctl list_exchanges查看所有的交换区可以通过rabbitmqctl list_exchanges列出所有的交换区。amp.*表示的是默认或没有命名的交换区。在前面的章节中,我们不知道交换区,但是我们还是可以发送消息到队列中。那是因为我们使用了默认的交换区,我们通过控制符串来制定。回想之前发布消息的代码channel.basicPublish(“”,”hello”,null,message.getBytes());第一个参数就是交换区的名字。空字符串表示这是默认的或者没有命名的交换区。消息通过指定的路由名routingkey被路由到队列中。
现在我们可以将消息发送到交换区
channel.basicPublish( "logs", "", null, message.getBytes());
2:Temporary queues 临时队列
之前使用的队列中有一个名字,给定一个名字对我们来说非常的重要,因为我们需要将消费者关联到队列。当我们希望在生产者和消费者之间共享队列时,指定队列的名字将变的非常重要。
但是,这不适用于我们的日志记录。我们希望记录所有的日志消息,而不是他们的一个子集。我们同时也对当前的信息感兴趣而不是老得信息。要解决这些问题,我们需要做如下两件事情:
首先:当我们连接到RabbitMQ时,我们需要一个新的,空的队列。为了解决这个问题,我们可以使用一个随机创建的名字;或者让RabbitMQ服务器随机帮我们创建一个队列名字。
其次:一旦我们断开连接,需要自动删除消费者队列
我们可以通过channel.queueDeclare()来创建一个非持久的队列名,并通过getQueue()来返回对列名。
3: Bindings(绑定)
我们已经创建了一个fanout交换区和一个队列,现在我们就可以让交换区来发送消息到我们的队列中来。交换区和队列的关系,可以通过调用channel.queueBind(queueName,”logs”,””);来绑定。
现在,所有的日志交换区将会将数据追加到我们的队列中
通过客户端Rabbitmqctl list_bindings命令,可以查看所有的绑定
4:Putting it all together
发送消息的生产者程序,和之前的代码看起来没有多大的区别。最主要的变化就是,我们现在希望将消息发送给我们的logs交换区,而不是之前的默认交换区。发送时,我们需要提供一个routingkey路由名,它的值对于fanout类型的交换区来说,将会被忽略。下面是EmitLog.java的代码片段
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
//...
}
正如你所看到的,在建立到RabbitMQ的连接之后,我们就申明了一个交换区。这一步非常重要,因为发布消息到一个不存在的交换区是不被允许的。如果没有队列绑定到交换区,消息将会丢失。
ReceiveLogs.java
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
使用rabbitmqctl list_bindings命令,你可以确认程序却是创建了bindings和queue。
相关推荐
这种模式被称为” 发布 / 订阅”。 交换器(Exchanges) 在本教程的前面部分,我们发送和接收到队列中的消息,现在是时候在 RabbitMQ 中引入完整的消息传递模式了。 让我们快速回顾一下之前了解的内容: 生产者...
文章目录rabbitmq7种实现方式搭建maven项目引入依赖创建连接简单队列消息生产者消息消费者work queues 工作队列生产者消费者能者多劳(公平分发):消费能力强则消费更多消息Publish/Subscribe 发布订阅模式生产者...
MQTT 全称(Message Queue Telemetry Transport):一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,通过订阅相应的主题来获取消息,是物联网(Internet of Thing)中的一个标准传输协议。 该协议将...
rabbitMq简单介绍还有下载,安装及配置,包括一些队列案例(Simble简单队列,.work queues 工作队列 公平分发 轮询分发,订阅模式 publish/subscribe,routing路由模式,Topic 主题模式,rabbitMq的消息确认机制)
3. 发布/订阅模式(Publish/Subscribe Mode): - 发布/订阅模式用于将消息广播到多个消费者。每个消费者都有自己的队列,并且订阅相同的交换机。生产者发送消息到交换机,然后交换机将消息广播到所有与之绑定的...
本例为.net下使用EasyNetQ操作RabbitMQ的Demo,例子采用MVC架构,包含完整的Publish/Subscribe。详情请看:https://www.cnblogs.com/imstrive/p/11078335.html
rabbit的各种工作模式的demo,包括work queues模式,publish/Subscribe模式,Routing模式等
一个简单的laravel的Rabbitmq库,基于Publish-Subscribe模式,其中订阅者是Consumer。 目录 2.1。 2.2。 2.3。 2.4。 2.5. 3.1。 3.2。 3.3。 可用的 CLI 命令 3.4. 自定义消息处理器 贡献 1.安装 跑步: ...
网络视频资源,如有侵权请留言/举报,资源过大上传乃是...7、Publish_Subscribe发布订阅模式3 d) y5 t6 A1 R 8、route 路由模式1 W. q/ p. b0 C 9、topic模式 10、整合spring方式-1 11、整合sring方式-2 12、消息持久化
ps publish/subscribe 发布订阅模式 routing topic 消息确认机制 基于AMQP事务实现的消息确认 同步confirm确认 异步confirm消息确认 延迟消息队列 基于插件rabbitmq_delayed_message_exchange去实现 后续待补充...
rabbitMQ官方Demo,包含01HelloWorld,02WorkQueues,03PublishSubscribe,04Routing,05Topics,06RPC六个示例
请在此处创建功能请求或错误:)用法阅读./docs文件夹中的文档能力 连接(CONNECT / CONACK) 平(PINGREQ / PINGRESP) 发布(发布/回传) 订阅(SUBSCRIBE / SUBACK) 退订 QoS 0消息 QoS 1消息 QoS 2消息 保留在...
兔子MQ RabbitMQ实践包括6种做法,例如01:Hello World 02:Work Queues 03:Publish Subscribe 04:Routing 05:Topics 06:RPC
发布者和订阅者都是事件发射者。 目前支持redis和amqp ,例如 RabbitMQ。目录渠道() 关闭() 渠道活动发布() 订阅活动关闭() 免责声明执照贡献例子 用法 发布 var queue = require ( 'message-queue' ) ( '...
publish/subscribe communication styles. Chapter 12, Securing an API, will describe varius ways of securing your microservices. We will implement a system consisting of all previously introduced ...
The publish/subscribe model Running a sample system Scaling and grouping Running multiple instances Consumer groups Partitioning Configuration options Spring Cloud Stream properties Binding properties...
easy_bunny_rpc 通用RPC客户端/工人库,用于处理基于兔子的数据序列... subscribe do | payload | publish_success ( payload ) # Send a success message to the client # publish_failure(payload) # Send a failu
使用RabbitMQ为Laravel发布事件 Nuwber的事件提供了一个简单的观察器实现,使您可以侦听当前应用程序和另一个应用程序中发生的各种事件。 例如,如果您需要对从另一个API发布的某个事件做出React。 不要将此软件包...