`

.Net下RabbitMQ的使用(4) -- 订阅和发布

阅读更多

消息的订阅和发布是使用消息队列的常用场景。在上一篇文章中,虽然有多个消费者,但是一个消息只会有一个消费者来处理。而订阅和发布则是每个订阅该消息的消费者都会收到这个消息。RabbitMQ的路由机制让我们实现这个功能轻而易举。

 

要了解RabbitMQ的路由机制,exchange是一个关键。exchange可以叫做交换机,也似乎可以叫做路由器,反正它是用来选择路由的。前文说到,RabbitMQ的核心思想就是消息的发布者不是直接把消息发送到目标队列中的,事实上,通常它并不知道消息要发到哪个队列中,它只知道把消息队列发送到exchange中。exchange一边接收发送者发过来的消息,而另一边则把消息发送到目标队列中去。exchange一定知道哪些队列需要接收这个消息,是加到一个队列里还是加到好几个队列里,还是直接扔掉。下图中的X就是exchange。

 

 

RabbitMQ的exchange有一些类型,这些类型决定了exchange的行为。分别是 direct, topic, headers 和 fanout四种类型。在这一篇文章中介绍的最简单的发送和接收例子中使用的如下代码中

//指定发送的路由,通过默认的exchange直接发送到指定的队列中。
channel.BasicPublish("", "esbtest.rmq.consoleserver", null, bytes);

第一个参数我们输入了空字符串来代表一个exchange。空字符串的exchange在RabbitMQ中时默认的exchange,类型是direct。在这个例子中它会直接将消息发送到第二个参数route_key定义的同名的队列中。

而我们这篇介绍的订阅和发布是用了fanout这个类型。由于默认类型是direct的,所以需要使用fanout就需要额外定义。如下代码就是定义了一个名字叫publish的fanout类型的exchange。

 

channel.ExchangeDeclare("publish", "fanout");

 

这种exchange有分发的意思。那分发到哪些队列中呢?因为发布者不需要知道,所以这段代码页就在订阅者那边来实现。来看发送方得代码片段:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml.Serialization;

namespace SendService
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "192.168.12.111";
            factory.Port = 5672;
            factory.UserName = "admin";
            factory.Password = "admin";
            //定义要发送的数据

            List<RequestMessage> messages = new List<RequestMessage>();
            for (int i = 0; i < 100; i++)
            {
                RequestMessage message = new RequestMessage() { MessageId = Guid.NewGuid(), Message = "Send1this is a 请求。" + i, Type="email" };
                messages.Add(message);
            }
            for (int i = 0; i < 100; i++)
            {
                RequestMessage message = new RequestMessage() { MessageId = Guid.NewGuid(), Message = "Send1this is a 请求。" + i, Type = "sms" };
                messages.Add(message);
            }

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare("publish-topic", "topic", true);//定义一个交换机,且采用广播类型,并持久化该交换机
                    string smsqueue = channel.QueueDeclare("all.sms.message", true, false, false, null);//创建一个队列,第2个参数为true表示为持久队列
                    //绑定到名字叫publish的exchange上
                    channel.QueueBind(smsqueue, "publish-topic", "sms");
                    string emailqueue = channel.QueueDeclare("all.email.message", true, false, false, null);//创建一个队列,第2个参数为true表示为持久队列
                    channel.QueueBind(emailqueue, "publish-topic", "email");
                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2;//1表示不持久,2.表示持久化
                    foreach (var item in messages)
                    {
                        if (item.Type == "sms")
                        {
                            XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
                            using (MemoryStream ms = new MemoryStream())
                            {
                                xs.Serialize(ms, item);
                                byte[] bytes = ms.ToArray();
                                channel.BasicPublish("publish-topic", "sms", properties, bytes); //发送消息,这里指定了交换机名称,且routeKey会被忽略
                                Console.WriteLine(" [x] Sent {0}", item.Message);
                            }
                        }
                        if (item.Type == "email")
                        {
                            XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
                            using (MemoryStream ms = new MemoryStream())
                            {
                                xs.Serialize(ms, item);
                                byte[] bytes = ms.ToArray();
                                channel.BasicPublish("publish-topic", "email", properties, bytes); //发送消息,这里指定了交换机名称,且routeKey会被忽略
                                Console.WriteLine(" [x] Sent {0}", item.Message);
                            }
                        }
                    }
                }
            }
        }
    }

    public class RequestMessage
    {
        public Guid MessageId { set; get; }
        public string Message { set; get; }
        public string Type { set; get; }
    }
}

 接受方SMS:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml.Serialization;

namespace ReceiveSMSService
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "192.168.12.111";
            factory.Port = 5672;
            factory.UserName = "admin";
            factory.Password = "admin";
            
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //交换机持久化
                    channel.ExchangeDeclare("publish-topic", "topic", true);
                    bool durable = true;//队列持久化
                    string queue_name = channel.QueueDeclare("all.sms.message", durable, false, false, null);//hello是queue的名字
                    //绑定到名字叫publish的exchange上
                    channel.QueueBind(queue_name, "publish-topic", "sms");
                    //定义这个队列的消费者
                    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(queue_name, false, consumer);
                    Console.WriteLine(" [*] Waiting for messages." + "To exit press CTRL+C");
                    while (true)
                    {
                        //阻塞函数,获取队列中的消息
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//挂起的操作
                        //模拟长时间运行
                        Thread.Sleep(3000);

                        byte[] body = ea.Body;
                        XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
                        using (MemoryStream ms = new MemoryStream(body))
                        {

                            RequestMessage message = (RequestMessage)xs.Deserialize(ms);

                            Console.WriteLine("Receive a Message, Id:" + message.MessageId + " Message:" + message.Message + " Type:" + message.Type);

                        }
                        //发送应答包,消息持久化时候使用
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                }
            }
        }
    }

    public class RequestMessage
    {
        public Guid MessageId { set; get; }
        public string Message { set; get; }
        public string Type { set; get; }
    }
}

 接收方邮件:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml.Serialization;

namespace ReceiveEmailService
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "192.168.12.111";
            factory.Port = 5672;
            factory.UserName = "admin";
            factory.Password = "admin";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare("publish-topic", "topic", true);
                    bool durable = true;//队列持久化
                    string queue_name = channel.QueueDeclare("all.email.message", durable, false, false, null);//hello是queue的名字
                    //绑定到名字叫publish的exchange上
                    channel.QueueBind(queue_name, "publish-topic", "email");
                    //定义这个队列的消费者
                    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(queue_name, false, consumer);
                    Console.WriteLine(" [*] Waiting for messages." + "To exit press CTRL+C");
                    while (true)
                    {
                        //阻塞函数,获取队列中的消息
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//挂起的操作
                        //模拟长时间运行
                        Thread.Sleep(3000);

                        byte[] body = ea.Body;
                        XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
                        using (MemoryStream ms = new MemoryStream(body))
                        {

                            RequestMessage message = (RequestMessage)xs.Deserialize(ms);

                            Console.WriteLine("Receive a Message, Id:" + message.MessageId + " Message:" + message.Message + " Type:" + message.Type);

                        }
                        //发送应答包,消息持久化时候使用
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                }
            }
        }
    }

    public class RequestMessage
    {
        public Guid MessageId { set; get; }
        public string Message { set; get; }
        public string Type { set; get; }
    }
}

 

同样先定义一个fanout的exchange。对于为什么要在发布者和订阅者都要定义同名的exchange,我的理解是如果没有定义的一方先启动的话则会报错说找不到那个exchange。我测试过如果有定义的先启动,没定义的后启动也是没有问题的。

因为每个订阅者都需要一个队列来存放发给自己的消息,所以需要创建一个队列。通过QueueBind来和exchange关联了。所有发送给名字为publish的exchange的消息,都会被它分发给所有与之绑定的队列中,这样,每个对应的消费者都会收到一个副本。

image

 

在浏览器的管理界面上我们可以看见,RabbitMQ为每一个消费者(订阅者)创建了一个队列,而没有为发送者创建队列。

分享到:
评论

相关推荐

    RabbitMQ安装和C#使用案列(.net core)

    包括rabbitmq-server-3.6.12,otp_win64_20.1(64位系统)otp_win32_20.1(32位系统),其中opt(文件过大限制上传,自己在网上下载)要在server之前安装,安装完了后需配置server的用户名和密码,并附上C#的代码...

    rabbitmq发布订阅

    看了一天官方文档,自己写了一个发布订阅,已经用于我们的公网环境。

    RabbitMQ工具类封装实现

    封装了RabbitMQ的订阅者线程和发布者线程(还有个初始化工厂的连接工具类),另外附加一个安卓的使用demo

    NET Core 使用RabbitMQ源码 NETCORERABBITMQ.rar

    RabbitMQ从信息接收者角度可以看做三种模式,一对一,一对多(此一对多并不是发布订阅,而是每条信息只有一个接收者)和发布订阅。其中一对一是简单队列模式,一对多是Worker模式,而发布订阅包括发布订阅模式,路由...

    NET Core 使用RabbitMQ源码 LPNETCORERABBITMQ.rar

    RabbitMQ从信息接收者角度可以看做三种模式,一对一,一对多(此一对多并不是发布订阅,而是每条信息只有一个接收者)和发布订阅。其中一对一是简单队列模式,一对多是Worker模式,而发布订阅包括发布订阅模式,路由...

    EasyNetQ:一个易于使用的.NET API for RabbitMQ

    尽可能简化在.NET上使用RabbitMQ的工作。 要连接到RabbitMQ代理... var bus = RabbitHutch . CreateBus ( " host=localhost " ); 要发布消息... await bus . PubSub . PublishAsync ( message ); 要以5秒的...

    rabbitmq-labs:我的RabbitMQ教程的源代码-docker source code

    在ASP.NET Core Web API项目中发布RabbitMQ消息,使用辅助服务使用消息,并使用Docker Compose运行多容器应用程序。 演示版 具有一个消息生成者和两个消费者的基本发布/订阅模型。 交换类型: fanout 。 该演示演示...

    轻松使用rabbitmq.rar

    AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、...

    RabbitMQ实战 高效部署分布式消息队列完整版带书签

    AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、...

    RabbitMQ的原理和使用

    AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、...

    RabbitMQ代码示例(java maven)

    http://blog.csdn.net/lmj623565791/article/category/2386657 提供了很多代码示例,代码中的01-05分别为:hello world、工作队列、发布订阅、路由选择和主题

    .Net最流行的开源企业服务总线 NServiceBus.zip

    NServiceBus 是一个.Net平台下开源的消息服务框架,这类产品有时也被称作ESB(Enterprise Service Bus)——企业服务总线。NServiceBus也是dotnet世界里面最流行的开源企业服务总线。 NServiceBus 是一个用于构建企业...

    RabbitMQ实战 高效部署分布式消息队列

    AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、...

    RabbitMQ的一个调试工具

    在微服务架构下,少不了使用MQ和Redis,...工具具备创建Exchange,Queue,发布和订阅,自动应答处理等常用的消息处理操作。 工具使用.net开发,需要安装net461,其他的也没啥,有什么需要的,可以提出要求,方便开发嘛。

    RabbitMQ原理、集群、基本操作及常见故障处理

    AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、

    CAP:基于最终一致性的微服务中的分布式事务解决方案,也是具有发件箱模式的事件总线

    CAP提供了一种更简单的方法来实现事件发布和订阅。 在订阅和发送过程中,您不需要继承或实现任何接口。架构概述 CAP实现了描述的发件箱模式。入门NuGet 可以使用以下命令在您的项目中安装CAP。 PM&gt; Install-Package...

    .NET 分布式组件库 Exceptionless Foundatio.zip

    消息允许通过你的应用程序发布订阅消息。Foundatio 提供了一个IMessageBus接口,并且有4 种不同的实现。1、InMemoryMessageBus:一个内存消息总线实现。这个消息总线的生命周期为当前进程。2、RedisMessageBus:一个...

    RabbitMQTrial.zip

    rabbitmq的工作者模式、发布订阅模式实例代码 博文:https://blog.csdn.net/u010476739/article/details/115876011

    springCloud

    AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 Github:https://github.com/rabbitmq 官网地址:http://www.rabbitmq.com 安装RabbitMQ 安装RabbitMQ 可以参考之前的文章 ...

    MetroBus:MassTransit的轻量级消息传递包装

    目前仅支持RabbitMQ传输提供简便的方法来为发布/订阅创建生产者和消费者提供简单的方法来处理请求/响应对话提供消息调度包括可选的增量自动重试策略包括可选的断路器包括可选的速率限制器Autofac支

Global site tag (gtag.js) - Google Analytics