摘要: RabbitMQ发送消息时,都是先把消息发送给ExChange(交换机),然后再分发给有相应RoutingKey(路由)关系的Queue(队列)。ExChange和Queue之前是多对多的关系。RabbitMQ 3.0之后创建ExChange时,有四种类型可选“fanout、direct、topic、headers”。
RabbitMQ发送消息时,都是先把消息发送给ExChange(交换机),然后再分发给有相应RoutingKey(路由)关系的Queue(队列)。
ExChange和Queue之前是多对多的关系。
RabbitMQ 3.0之后创建ExChange时,有四种类型可选“fanout、direct、topic、headers”。
一、fanout
当向一个fanout发送一个消息时,RoutingKey的设置不起作用。
消息会被发送给同一个交换机下的所有队列,每个队列接收到的消息是一样的;
一个队列内有所有消费者(包含那些并没有相应RoutingKey的消费者),将平分队列接收到的消息。
----------------消息生产者----------------
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null);
String message = "hello world! ";
for(int i=0;i<100;i++)
{
channel.basicPublish(EXCHANGE_NAME, "", null, (message+i).getBytes());
}
System.out.println("Sent msg finish");
channel.close();
connection.close();
----------------消息消费者----------------
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null);
//声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//绑定路由和队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routkey2", null);
System.out.println(" Waiting for msg....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
String message = "";
try
{
message = new String(body, "UTF-8");
}
catch (UnsupportedEncodingException e)
{
e.printStackTrace();
}
catch (Throwable ex)
{
ex.printStackTrace();
}
System.out.println("Received msg='" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
二、direct
当向一个direct发送一个消息时,消息会被发送给同一个交换机下的拥有相应RoutingKey的队列,每个队列接收到的消息是一样的;
一个队列内拥有相应RoutingKey的消费者,将平分队列接收到的消息。
----------------消息生产者----------------
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
String message = "hello world! ";
for(int i=0;i<100;i++)
{
channel.basicPublish(EXCHANGE_NAME, "routingkey1", null, (message+i).getBytes());
}
System.out.println("Sent msg is '" + message + "'");
channel.close();
connection.close();
----------------消息消费者----------------
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
//声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//绑定路由和队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", null);
System.out.println(" Waiting for msg....");
Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body)
{
String message = "";
try
{
message = new String(body, "UTF-8");
}
catch (UnsupportedEncodingException e)
{
e.printStackTrace();
}
catch (Throwable ex)
{
ex.printStackTrace();
}
System.out.println("1 Received msg='" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
三、topic
当向一个topic发送一个消息时,消息会被发送给同一个交换机下的拥有相应RoutingKey的队列,每个队列接收到的消息是一样的;
一个队列内有所有消费者(包含那些并没有相应RoutingKey的消费者),将平分队列接收到的消息。
----------------消息生产者----------------
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);
String message = "hello world! ";
// int i=101;
for (int i = 0; i < 100; i++)
{
channel.basicPublish(EXCHANGE_NAME, "routingkey1", null, (message + i).getBytes());
}
System.out.println("Sent msg is '" + message + "'");
channel.close();
connection.close();
----------------消息消费者----------------
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);
//声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//绑定路由和队列// 把队列绑定到路由上并指定headers
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", null);
System.out.println("1 Waiting for msg....");
Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
{
String message = "";
try
{
message = new String(body, "UTF-8");
}
catch (UnsupportedEncodingException e)
{
e.printStackTrace();
}
catch (Throwable ex)
{
ex.printStackTrace();
}
System.out.println("1 Received msg='" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
相关推荐
jmeter测试Rabbitmq的方法 AMQP Publisher/Consumer 请求 介绍 Exchange(交换机):如果此处不填写,使用默认名称--(AMQP default) Exchange Type:分别是direct(单播),fanout(广播),topic(组播),headers,每种类型...
问题五:RabbitMQ 概念里的 channel、exchange 和 queue 这些东东是逻辑概念,还是对应着进程实体?这些东东分别起什么作用? 问题六:vhost 是什么?起什么作用? 问题七:在单 node 系统和多 node 构成的 cluster ...
本文实例讲述了Python队列RabbitMQ 使用方法。分享给大家供大家参考,具体如下: 目前的exchange的路由策略是:每个需要队列的服务独享一个队列(queue),消费者(consumer)采用ACK自动应答模式处理队列消息。 ...
rabbitmq的提供的各个接口函数使用说明,函数有 1、amqp_new_connection(声明一个新的connection) 2、amqp_open_socket(获取socket) 3、amqp_set_sockfd(将connection和sockfd进行绑定) 4、amqp_login(用于登录...
1.rabbitMQ消息中间件(注意:安装mq之前先安装erlang,原因在于RabbitMQ服务端代码是使用并发式语言erlang编写的) 2.Redis(这里redis的目的是保存消费次数) 二、Mq消息机制: 1、mq消息机制几个重要的组件: 1)、...
RabbitMQController RabbitMQController类用于通过RabbitMQ连接,公开和使用消息的简便方法。 要使用该类,您将需要pika模块python -m pip install pika --upgrade要使用该类,您需要导入它并创建一个对象: from ...
厨师-logstash食谱 适用于Chef的基本Logstash食谱,可从官方中安装Logstash。 要求 在Ubuntu / Debian上进行了测试,但也应在其他平台上工作。 用法 只需包含默认配方logstash::... " exchange " : " rawlogs " ,
RabbitMQ SpringBoot入门基本网址: : 兔子MQ 安装并启动服务器 使用命令“ rabbitmq-server”启动服务器,然后使用来宾/来宾启动 或添加用户 使用Spring Boot 了解AMQP结构在/ config中queue()方法创建一个AMQP...
自动队列声明和绑定死字支持轻松发布简单的基于承诺的 API示例用法 const AMQP = require ( 'simple-amqplib' ) ;var config = { url : process . env . AMQP_URL , exchange : process . env . AMQP_EXCHANGE , ...
用法 shovel --pipeline pipeline.yml --config config.yml --workers 1 --log-level debug --metrics-port 3001 普罗米修斯指标 管道指标在指定端口上以Prometheus格式公开 管道配置 name : message input : use ...
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送...
此种情况便只能使用socket编程了,然而不同程序之间的通信便不再像线程进程之间的那么简单了,要考虑多种情况(比如其中一方断线另一方如何处理;消息群发,多个程序之间的通信等等),如果每遇到一次程序间的通信,...
buneary,发音为bun-ear-y,是易于使用的RabbitMQ命令行客户端,用于管理交换,管理队列以及将消息发布到交换。 buneary buneary,发音为bun-ear-y,是易于使用的RabbitMQ命令行客户端,用于管理交换,管理队列以及...
该项目是一个测试微服务项目,用于测试对另一个名为currency-exchange-service的微服务的调用。在此项目中,这是对调用微服务的不同方式的测试 1- RestTemplate。 调用其他服务的旧方法。 2-假装。 简化了对其他...
这是librabbitmq-c的一个简单的Objective-C包装器,librabbitmq-c是开发RabbitMQ AMQP服务器的人们的C AMQP库。 要求 可可 librabbitmq-c包括头文件 建造 没有什么可建的。 只需将源文件和头文件包含到您的Xcode项目...
Laravel AMPQ助手在Laravel中使用RabbitMQ进行发布/订阅。发布配置php artisan vendor:发布--tag = laravel-ampq --force听众在laravel-ampq.cfg设置我们的监听laravel-ampq.cfg 'callbacks' => [ 'example_stack' ...
ci防火墙用于在防火墙后运行CI并将结果流收集到公共场所免责声明警告:通过使用此方法,您正在防火墙上打一个洞。 请确保您的触发机制配置正确,以便不允许随机人以滥用系统(通过触发随机构建等)或公开方式更改您...
用法 安装 pip install franz (推荐) pip -e git+git@github.com:eshares/franz.git@master#egg=franz 将@master更改为版本或根据需要提交。 兔子MQ 传送讯息 import random import time import franz class ...
工作中经常用到rabbitmq,而用的语言主要是python,所以也就经常会用到python中的pika模块,但是这个模块的使用,也给我带了很多问题,这里整理一下关于这个模块我在使用过程的改变历程已经中间碰到一些问题的解决...