`
jiq408694711
  • 浏览: 33229 次
  • 性别: Icon_minigender_1
  • 来自: 南京
文章分类
社区版块
存档分类
最新评论

利用MSMQ发送消息(对象)到NServiceBus终结点(不采用Send-Only方式)

 
阅读更多

待解决的问题:

某些应用程序需要异步发送消息到远端机器的NServiceBus终结点,但是又不希望知道接收并处理异步消息的是NServiceBus这种ESB终结点,也许异步消息处理者将来会换成其他的ESB终结点,所以不能采用Send-Only的方式,否则应用程序(消息发送方)还是需要引入NserviceBus.dll等类库。


所以我的思路是:

应用程序调用我的接口发送消息到指定的私有消息队列,然后NServiceBus终结点有一个线程,接收这样一个消息,这个消息没有实现IComman/Imessage接口,所以还要再将这个消息转换为NServiceBus能够处理的消息类型(实现ICommand,IMessage等接口,可以出发MessageHandler调用),然后将这个消息通过:

Bus.Send("EndPointName@localhost", message);

发送给本机的NServiceBus终结点。

后面会展示详细的代码。。。。


其实我还想过提供给应用程序的接口是一个WCF服务,服务的实现中采用Send-Only的方式将接收到的消息转换为NServiceBus能够识别的消息,然后采用Send方式发送到本机的NServiceBus终结点,但是我试了一下,WCF服务的实现中配置Send-Only方式那段代码会抛出一个很奇怪的异常,到现在我还没有分析出来为什么。


所以综上所述,提供给应用程序一个接口,将消息发送到我的ESB终结点,同时又没有任何耦合,我们采用的方式是直接操作消息队列,代码如下(关键片段),如要下载我的完整代码,地址为百度云盘


提供给应用程序的接口,包括消息定义,和消息发送接口:

namespace MSMQMessageSender
{
    [Serializable]
    public class AsynMessage
    {
        public AsynMessage() { }

        public int id { set; get; }
        public string body { set; get; }
    }
}
public static bool SendMessage(AsynMessage msg)
        {
            try
            {
                MessageQueue msmq = new MessageQueue(@"FormatName:Direct=TCP:192.1.11.186\Private$\NServiceBus.EndPoint"); //这里的消息队列地址是能够将消息发送到远端机器的消息队列的关键。
                msmq.Formatter = new XmlMessageFormatter(new Type[] { typeof(AsynMessage) });

                System.Messaging.Message message = new System.Messaging.Message();
                message.Label = "消息标题";
                message.Body = msg;
                msmq.Send(message);
            }
            catch (Exception ee)
            {
                Console.WriteLine("发送消息出现异常,原因是:" + ee.Message);
                return false;
            }
            return true;
        }


下面是NserviceBus中,首先是启动的时候需要创建消息队列:

namespace CBIP.Server
{
    class EndConfigPoint : IConfigureThisEndpoint, AsA_Publisher { }

    class ConfiguringTheDistributorWithTheFluentApi : INeedInitialization
    {
        public void Init()
        {
            if (!MessageQueue.Exists(@".\Private$\NServiceBus.EndPoint"))
            {
                System.Messaging.MessageQueue.Create(@".\Private$\NServiceBus.EndPoint");
            }
        }
    }
}

然后定义NserviceBus中能够识别的消息:

namespace NServiceBus.Messages
{
    [Serializable]
    public class CBIPAsynMessage : ICommand
    {
        public int id { get; set; }
        public string body { get; set; }
    }
}

然后是在NServiceBus的Start()方法中开启一个轮训线程,轮训消息队列中的消息:

namespace CBIP.Server
{
    public class ServerEndPoint : IWantToRunWhenBusStartsAndStops
    {
        public IBus Bus { get; set; }

        public void Start()
        {
            Thread thread = new Thread(MyThread);
            thread.Start();        
        }

        public void MyThread()
        {

            while (true)
            {
                MessageQueue MQ = new MessageQueue(@".\Private$\NServiceBus.EndPoint");

                //调用MessageQueue的Receive方法接收消息
                System.Messaging.Message message = null;
                try
                {
                    message = MQ.Receive(TimeSpan.FromSeconds(5));
                }
                catch (MessageQueueException ee)
                {
                    message = null;
                    Console.WriteLine("超时:" + ee.Message);
                }

                if (message != null)
                {                    
                    message.Formatter = new XmlMessageFormatter(new Type[] { typeof(MSMQMessageSender.AsynMessage) });
                    AsynMessage msg = (AsynMessage)message.Body;
                    Console.WriteLine(msg.id + ", " + msg.body);
                    
                    CBIPAsynMessage cbipMsg = new CBIPAsynMessage() //转换成NServiceBus能够识别的消息
                    {
                        id = msg.id,
                        body = msg.body
                    };
                    Bus.Send("CBIP.Server@localhost", cbipMsg);	//发送到本机NServiceBus节点
                    Console.WriteLine("发送到ESB完成");
                    
                }
                else
                {
                    Console.WriteLine("没有找到消息!");
                }                
            }
        }

        public void Stop()
        {

        }

    }
}

就这样在NServiceBus的消息处理者中就能正确收到消息了:

namespace CBIP.Server
{
    public class AsynMessageHandler : IHandleMessages<CBIPAsynMessage>
    {
        public void Handle(CBIPAsynMessage message)
        {
            Console.WriteLine("AsynMessageHandler收到一个消息");
        }
    }
}


直接以进程的方式启动NServiceBus终结点,上面的代码是可以正确工作的,但是当我将我的NServiceBus终结点安装位Windows NT服务之后,问题来了,轮询线程无论如何也收不到消息,然后我去查看消息队列NServiceBus.EndPoint,发现里面是有消息的,那就是轮询线程没办法取出来。


后来调试才发现时没有访问权限,错误是“Access to Message Queuing System id denied”

网上查了一下是需要在创建消息队列的时候,给指定用户配置访问权限的,NServiceBus中创建队列的代码改为下面这样就可以了:

if (!MessageQueue.Exists(@".\Private$\NServiceBus.EndPoint"))
{
        MessageQueue mq = System.Messaging.MessageQueue.Create(@".\Private$\NServiceBus.EndPoint");
	mq.SetPermissions("Administrator", MessageQueueAccessRights.FullControl);
}



======================================= 华丽的分割线 ==============================

另外,正常来说,NserviceBus的Master终结点安装了NServiceBus的软件,有实现了IConfigureThisEndpoint, AsA_Publisher { }这两个接口的类,有消息处理者,寄宿到NServiceBus.Host.exe,填写好命令行参数NServiceBus.Integration和NServiceBus.Master,并且电脑安装了消息队列之后,是可以没有问题地启动起来,并自动创建消息队列的,我的项目中开始也是这样,但是不知道为什么一段时间之后启动报错:“无法创建消息队列,或者没有对应权限”。


这个问题还没找到原因,目前采用的办法只能是显示判断是否存在消息队列,不存在就显示创建一个。

分享到:
评论

相关推荐

    MSMQ微软消息队列收发信息源码

    它与电子邮件很相似,它们都包含多个属性,用于保存消息,消息类型中都指出发送者和接收者的地址,而且它们都不需要发送者和接收者同时在场,就可以将其信息存储在消息队列或邮件服务器中。但是它们是大有区别,其...

    msmq消息队列相关

    msmq消息队列相关

    MSMQ使用总结(事务性、线程监控、文件发送接收和远程信息交互)

    打包文件包含:MQ事务性消息传输,MSMQ单线程监控, MSMQ基类,数据库数据传输, MSMQ文件发送接收,MSMQ远程信息交互和 MSMQ应用文档.doc

    C#MSMQ 消息队列工具

    C#MSMQ 消息队列工具,MSMQ(消息队列) 前段时间研究WCF接触到了MSMQ,弄了一个小工具 分享一下

    WPF实现MSMQ的接收和发送

    使用MSMQ开发分布式应用  首先,实现服务器端。创建一个控制台项目,添加System.Messaging引用,因为消息队列的类全部封装在System.Messaging.dll程序集里。 界面采用WPF开发

    msmq消息队列相关demo

    msmq消息队列相关

    java msmq 接收和发送消息(支持64位)

    本示例主要讲述java向msmq中发送消息,代码非常简单,支持64位操作系统(包含代码示例和MsmqJava.dll、MsmqJava64.jar)

    消息队列MSMQ创建,同步异步接受消息

    消息队列MSMQ创建,同步异步接受消息,适合初学者,代码详细~

    MSMQ消息队列演示程序

    VS2005环境下用C#编写的演示消息队列发送与接收的程序,含全部源码及一个包装的类,调用简单.放到这里的目的是给初学者一个学习的例子.

    .Net最流行的开源企业服务总线 NServiceBus.zip

     NServiceBus的核心并不依赖于MSMQ。NServiceBus可扩展性允许我们插入自行编写的通信传送器,、订阅存储器和工作流的实现。 NServiceBus的特性1、高性能和可扩展性可以广泛应用于许多业务领域,可扩展性和性能都经过...

    MSMQ消息队列

    MSMQ,消息队列

    VC下的消息队列MSMQ编程

    利用 MSMQ(Microsoft Message Queue),应用程序开发人员可以通过发送和接收消息方便地与应用程序进行快速可靠的通信。消息处理为您提供了有保障的消息传递和执行许多业务处理的可靠的防故障方法。

    MSMQ应用--传命令

    An Sample show how to send command from an application to another one by using MSMQ

    基于消息队列-MSMQ 的开发

    asp.net 基于消息队列的开发,实现异步消息传递处理

    Msmq.rar_msmq_service_消息队列

    介绍了MSMQ的基本编程(如存储和接收消息)和基本的管理功能(如创建和删除队列)。虽然使用.Net API来是非常方便和简单的,但是在实际的MSMQ项目中,需要了解消息队列作为架构的概念。通过使用MSMQ,系统会更加松散...

    微软MSMQ消息件研究

    微软MSMQ消息件研究 消息队列微软MSMQ消息件研究 消息队列微软MSMQ消息件研究 消息队列

    net的MSMQ(消息队列)的使用和配置方法详细讲解

    大神的资料--此文件有详细的范例,涉及到了MSMQ的调用、权限等关键技术和方法。附赠的文档非常详细的讲解了范例的说明,详细讲解了MSMQ的相关知识等技术应用。通过本资料就可以完全掌握MSMQ消息队列的技术

    MSMQ_Demo.rar

    MicroSoft Message Queuing(微软消息队列)是在多个不同的应用之间实现相互通信的一种异步传输模式,相互通信的应用可以分布于同一台机器上,也可以分布于...本实例采用c#实现了MSMQ实现了消息生成段和消费端的示例。

    消息队列(MSMQ)

    它的实现原理是:消息的发送者把自己想要发送的信息放入一个容器中(我们称之为Message),然后把它保存至一个系统公用空间的消息队列(Message Queue)中;本地或者是异地的消息接收程序再从该队列中取出发给它的消息...

Global site tag (gtag.js) - Google Analytics