发布/订阅:分发一个消息给多个消费者(consumers)接收一个生产者生产的消息
交换器(Exchanges)
rabbitmq完整的消息模型
- 发布者(producer)是发布消息的应用程序。
- 队列(queue)用于消息存储的缓冲。
- 消费者(consumer)是接收消息的应用程序。
RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。
发布者(producer)只需要把消息发送给一个交换器(exchange)。交换器非常简单,它一边从发布者方接收消息,一边把消息推入队列。交换器必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过exchange type来定义的。
有几个可供选择的交换器类型:direct, topic, headers和 fanout。
创建一个fanout类型的交换器,命名为logs
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
fanout交换器很简单,从名字上就能猜测出来,它把消息发送给它所知道的所有队列。
交换器列表 rabbitmqctl能够列出服务器上所有的交换器: rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic ...done.
这个列表中有一些叫做amq.*的交换器。这些都是默认创建的,不过这时候你还不需要使用他们。
匿名的交换器
我们对交换器一无所知,但仍然能够发送消息到队列中。因为使用了命名为空字符串("")默认的交换器。
之前是如何发布一则消息:
channel.basicPublish("", "hello", null, message.getBytes());
exchange参数就是交换器的名称。空字符串代表默认或者匿名交换器:消息将会根据指定的routing_key分发到指定的队列。
发送消息到一个具名交换器了:
channel.basicPublish( "logs", "", null, message.getBytes());
临时队列
一个消息同时分发给多个消费者,需要指定exchange名称,另外,当我们连接上RabbitMQ的时候,我们需要一个全新的、空的队列。我们可以手动创建一个随机的队列名,或者让服务器为我们选择一个随机的队列名(推荐)。我们只要在调用queue_declare方法的时候,不提供queue参数就可以了:
String queueName = channel.queueDeclare().getQueue();
绑定
已经创建了一个fanout类型的交换器和一个队列。现在我们需要告诉交换器如何发送消息给我们的队列。交换器和队列之间的关系我们称之为绑定(binding)
channel.queueBind(queueName, "logs", "");
logs交换器将会把消息添加到我们的队列中logs交换器将会把消息添加到我们的队列中
使用rabbitmqctl list_bindings队列出所有存在的绑定
生产者代码
public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
消费者代码
public class ReceiveLogs { private final static String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
查看banding的exchange
Listing bindings ... exchange amq.gen-zKK_8tEJV79W5r1twM87lg queue amq.gen-zKK_8 V79W5r1twM87lg [] exchange hello queue hello [] logs exchange amq.gen-zKK_8tEJV79W5r1twM87lg queue [] ...done.
相关推荐
1. docker load < rabbitmq.tar.gz 1. 简单模式 3. 发布/订阅模式 4. 路由模式 5. 主题模式
最近整理学习的RabbitMQ入门Dome,文件是一个普通java项目导入完成后在lib文件夹中amqp-client-5.2.0.jar,slf4j-api-1.7.25.jar添加进去即可,里面有5个dome分是 dome1 : 简单队列,dome2 :work模式,dome3 : 订阅...
这个指南将引导你建立一个RabbitMQ AMQP服务器发布和订阅消息的过程。 声明 可以使用本人阿里云安装好的RabbitMQ服务器 host:http://120.27.114.229 username:root password:root port:5672 web management: ...
RabbitMQ相关的代码,包括了入门案例,发布订阅模式,路由模式还有SpringBoot整合RabbitMQ的代码
水车 Watermill是一个Go库,用于有效处理消息... 灵活地使用中间件,插件和发布/订阅配置。 弹性-使用成熟的技术并通过压力测试(请参阅)。入门选择您最喜欢的东西或按顺序查看: 遵循《 。 请参阅下面的示例。 阅读完
python入门到高级全栈工程师培训视频学习资料;本资料仅用于学习,请查看后24小时之内删除。 【课程内容】 第1章 01 计算机发展史 02 计算机系统 03 小结 04 数据的概念 05 进制转换 06 原码补码反码 07 物理层和...
CAP提供了一种更简单的方法来实现事件发布和订阅。 在订阅和发送过程中,您不需要继承或实现任何接口。架构概述 CAP实现了描述的发件箱模式。入门NuGet 可以使用以下命令在您的项目中安装CAP。 PM> Install-Package...
发布/订阅 WebRTC gRPC :high_voltage: 特征 生命的产生者 创造生活的观点 创造生活 WebRTC对等 Web套接字 gRPC服务器 流媒体 直播 看法 聊天显示和Chat互动 聊天 参与者之间的实时交流 储存讯息 将消息发送到...
openagua-engine是一个Python软件包,可帮助将模型引擎连接到OpenAgua,以便可以从OpenAgua进行控制并与之通信。 这包括 任务队列订阅者来处理在OpenAgua应用程序中实例化的模型任务 向OpenAgua应用报告进度的功能 ...
入门: 创建一个表 创建控制器 订阅更改 文档 用于 fastify 的 Api 构建库,使用可重用的 json 模式、开放的 api v3 兼容文档和可生成的集成测试简化了快速构建企业级 api 的过程。 支持事件驱动架构的事件层。 ...