理清路由机制是了解RabbitMQ来龙去脉的关键。在前面的例子中我们常常遇见这三个概念:exchange,routingKey 和 queue。真正地消息传输流程是消息先到exchange,然后exchange根据对应的routingKey放入queue,如果routingKey不匹配则丢弃。网上网友的一张图很好的展示了这个流程:
0.9 版本的AMQP协议的exchange有如下4中类型:fanout,direct,topic 和 headers。RabbitMQ服务会在启动以后预先建立4个exchange,分别对应于4中类型:
默认的exchange
如果用空字符串去申明一个exchange,那么系统就会使用"amq.direct"这个exchange。我们在创建一个queue的时候,默认的都会有一个和新建queue同名的routingKey绑定到这个默认的exchange上去。就像我们在发送和接收例子中,发送者的发送代码:
channel.BasicPublish("", "TaskQueue", properties, bytes);
因为在第一个参数选择了默认的exchange,而我们申明的队列叫TaskQueue,所以默认的,它在新建一个也叫TaskQueue的routingKey,并绑定在默认的exchange上,导致了我们可以在第二个参数routingKey中写TaskQueue,这样它就会找到定义的同名的queue,并把消息放进去。
Direct Exchange
direct exchange 发送消息是要看routingKey的。举个例子,定义了一个direct exchange 名字是X1,然后一个queue名字为Q1 用routingKey=K1 绑定到exchange X1上,当一个routeKey为 K2 的消息到达X1上,那么只有K1=K2的时候,这个消息才能到达Q1上。代码例子如下:
channel.ExchangeDeclare("X1", "direct"); channel.QueueDeclare("Q1", true, false, false, null); channel.QueueBind("Q1", "X1", "K1"); channel.BasicPublish("X1", "K1", null, bytes);
这样Q1才能收到消息。
如果有两个接收程序都是用了同一个的queue和相同的routingKey去绑定direct exchange的话,分发的行为是负载均衡的,也就是说第一个是程序1收到,第二个是程序2收到,以此类推。
如果有两个接收程序用了各自的queue,但使用相同的routingKey去绑定direct exchange的话,分发的行为是复制的,也就是说每个程序都会收到这个消息的副本。行为相当于fanout类型的exchange。
官方图片展示direct exchange:
Fanout Exchange
fanout类型的exchange就比较好理解。就是简单的广播,而且是忽略routingKey的。所以只要是有queue绑定到fanout exchange上,通过这个exchange发送的消息都会被发送到那些绑定的queue中,不管你有没有输入routingKey。
您可以需要广播消息或分布式系统的消息同步等场景中广泛使用到它。
官方图片展示:
Topic Exchange
Topic类型的exchange给与我们更大的灵活性。通过定义routingKey可以有选择的订阅某些消息,此时routingKey就会是一个表达式。exchange会通过匹配绑定的routingKey来决定是否要把消息放入对应的队列中。有两种表达式符号可以让我们选择:#和*。
*(星号):代表任意的一个词。 例:*.a会匹配a.a,b.a,c.a等
#(井号):代码任意的0个或多个词。 例:#.a会匹配a.a,aa.a,aaa.a等
topic exchange 有时候的行为会像其他类型的exchange,比如说:
当routingKey只是有#号的时候,它的行为和fanout的行为是一样的。
当routingKey什么的没有,空字符串的时候,它的行为是和direct是一样的。
要注意的是,符号代表的是词不是字符。RabbitMQ中在表达式中词的定义是以.(点号)分隔的。
对于一个queue绑定到exchange,是可以多次绑定的:
channel.QueueBind(queue_name, "X1", "lazy.#"); channel.QueueBind(queue_name, "X1", "*.*.rabbit");
如上面的代码,表示这个queue即可以收lazy.开头的,又可以收.rabbit结尾的。
大家可以通过一些测试来理解这里面的规则,但是在测试的时候需要注意,每次修改表达式后,需要重置一下RabbitMQ。因为是可以多次绑定的,所以之前你改的所有的表达式都会记录下来,都会被尝试匹配。
重置的命令(RabbitMQ以控制台的形式运行):
Headers Exchange
Headers类型的exchange使用的比较少,它也是忽略routingKey的一种路由方式。是使用Headers来匹配的。Headers是一个键值对,可以定义成Hashtable。发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。之前的几种exchange的routingKey都需要要字符串形式的,而headers exchange则没有这个要求,因为键值对的值可以是任何类型。代码示例如下:
发送端:
channel.ExchangeDeclare("X1", "headers");
IBasicProperties properties = channel.CreateBasicProperties();
properties.Headers = new Hashtable();
properties.Headers.Add("Key1", 123);
properties.Headers.Add("Key2", 345);
XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
MemoryStream ms = new MemoryStream();
xs.Serialize(ms, message);
byte[] bytes = ms.ToArray();
channel.BasicPublish("X1", "", properties, bytes);
接收端:
channel.ExchangeDeclare("X1", "headers");
//随机创建一个队列
string queue_name = channel.QueueDeclare("headerssubscriber2", true, false, false, null);
//绑定
IDictionary ht = new Hashtable();
ht.Add("x-match", "any");
ht.Add("Key1", 12345);
ht.Add("Key2", 34567);
channel.QueueBind(queue_name, "X1", "", ht);
//定义这个队列的消费者
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queue_name, true, consumer);
while (true)
{
BasicDeliverEventArgs ea =
(BasicDeliverEventArgs)consumer.Queue.Dequeue();
byte[] bytes = ea.Body;
XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
using (MemoryStream ms = new MemoryStream(bytes))
{
RequestMessage message = (RequestMessage)xs.Deserialize(ms);
Console.WriteLine("Receive a Message, Id:" + message.MessageId + " Message:" + message.Message);
}
}
相关推荐
RabbitMQ rabbitmq-server-3.6.12-1.el6.noarch 及其安装所需要的软件打包都在这里面,主要报卡一下软件:socat-1.7.3.2.tar.gz、rabbitmq-server-3.6.12-1.el6.noarch.rpm、rabbitmq-release-signing-key.asc、otp_...
rabbitmq-server-3.10.5-1.el8.noarch.rpm
linux rabbitmq安装包 rabbitmq-server-generic-unix-3.6.1.tar 实测
这里提供了rabbitmq-server-3.7.3.exe百度网盘下载,官网下载实在是太慢了,亲测有效! rabbitmq-server-3.7.3.exe rabbitmq-server-3.7.3.exe rabbitmq-server-3.7.3.exe
rabbitmq-server-3.4.1-1.noarch.rpm rabbitmq-server-3.4.1-1.noarch.rpm
RabbitMQ源码 rabbitmq-server-generic-unix-3.8.8.tar.xz
下载好的包,因为下载太慢 rabbitmq-server-generic-unix-3.7.8.tar.xz
rabbitmq-server-3.8.8-1.el7.noarch
rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-...
最新版linux rabbitmq-server-generic-unix-3.9.1.tar.xz最新版linux rabbitmq-server-generic-unix-3.9.1.tar.xz
rabbitmq-server-generic-unix-3.7.18.tar.xz版本服务器安装 有需要的可以去官网查看一下rabbitmq版本对erlang版本的一个支持情况,官网地址:http://www.rabbitmq.com/which-erlang.html
rabbitmq安装包,linux离线安装。后面会有教程及配置。先在windows解压再使用。软件名称:rabbitmq-server-generic-unix-3.5.7.tar.gz
rabbitmq-server-3.7.10-1.el7.noarch.rpm包 配合erlang在RHEL Linux 7.x, CentOS 7.x, Fedora 19+ (supports systemd)等系统运行
最新版linux rabbitmq-server-generic-unix-3.9.0.tar.xz最新版linux rabbitmq-server-generic-unix-3.9.0.tar.xz
最新版linux rabbitmq-server-generic-unix-3.8.7.tar.xz
rabbitmq-server-3.9.7-1.el7.noarch.rpm
rabbitmq-server-3.8.3.exe和erlang22.2.exe 2020年3月最新版本, 64位
使用命令安装:yum -y install rabbitmq-server-3.6.6-1.el6.noarch.rpm 启动rabbitmq服务: 前台运行:rabbitmq-server start (用户关闭连接后,自动结束进程) 后台运行:rabbitmq-server -detached
rabbitmq离线安装 - 语言库 erlang-21.2.6-1.el7.x86_64.rpm - 依赖 socat-1.7.3.2-2.el7.x86_64.rpm - rabbitmq 服务器 rabbitmq-server-3.7.13-1.el7.noarch.rpm
JAVA开发工具 rabbitmq-server-3.7.5.tar.xz,RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件