`

Mule的消息路由

    博客分类:
  • mule
 
阅读更多

摘选:http://www.infoq.com/cn/articles/mule-message-routing

   

     当谈论整合应用时,消息路由备受关注。当我们确定了各个应用,并选择Mule作为整合平台,也知道在Mule的服务中可以使用哪些Java类和 web services处理消息,那么为了让消息正确地在服务间流转,该如何将所有的事情整合在一起,从而确保获得我们所需要的的结果呢?

Mule为您的Mule应用中的服务间的路由消息提供了强大而灵活的可选项。本文描述了Mule的常用消息类型和几种可用的特殊路由器。

下面介绍消息路由器的相关核心概念:

  • 端点(Endpoints) 定义了发送和接收消息的通道(channel)。例如,一个购物组件可能会接收到一个HTTP 订单请求。一旦该组件处理完订单请求,它可能会发送一个 JMS消息到一个主题(topic)上,以通知审计系统,并返回一个HTTP响应。可以通过端点监听JMS消息、发送email、调用web services等。
  • 入站路由器(Inbound router) 控制服务如何处理入站(incoming)消息,比如,有选择地只消费符合特定条件的消息,或者在将消息转发给服务处理之前,将拥有同一group ID的消息聚合(group)在一起。
  • 出站路由器(Outbound router) 控制如何分发经服务处理过的消息,比如,将消息发送到一个接受者列表,或者将消息分解成多个部分,并将它们分别发送至不同的端点。
  • 异步回复路由器(Asynchronous reply router) 常用于request/response场景。在这些场景中,发送一个请求会触发一个或者更多的请求,并且在返回响应之前,需要考虑这些请求的结果。典型的例子是请求发送后,会并行执行任务(task)。在返回响应之前,必须执行完每个任务,处理完结果。
  • Catch-all策略 在当前消息找不到路由路径时才被调用。入站和出站端点都可以配置catch-all策略,因此可以捕获到任何孤立的消息,并将这些消息路由到一个共同的位置。
  • 过滤器 提供用于调用特定路由器的逻辑。通过逻辑过滤器AndFilter、OrFilter和NotFilter可以将过滤器组合在一起使用。并非所有的路由器都需要使用过滤器,但是所有的路由器都支持过滤器。

选择消息类型

当将Mule的服务结合在一起时,初用Mule的人有时会感到困惑,他们不知何时该使用出站路由器,何时可以最大程度地简化获得回复信息。下面介绍 Mule中可使用的消息类型,可以通过一个列表查看各个传输(transport)所支持的消息类型,详细内容可以查看Mule用户指南中的传输特征矩阵(Transports Feature Matrix) (查看前需要先登录,但注册是免费的,只需花费一点时间就可以完成注册。)

异步

如果只想将消息以“即发即弃(fire and forget)”的方式发送给一个服务,(并不需要给调用者返回响应),那么可使用异步消息类型。如果将入站端点的synchronous属性设置为false,它就不会给调用者返回响应。

例如:

<model 
name
="
Asynchronous_Message_Pattern
">
  <service 
name
="
AsynchronousService
">
    <inbound>
      <jms:inbound-endpoint 
queue
="
test.in
" 
synchronous
="
false
"/>
    </inbound>

    <component 
class
="
org.myorg.WidgetHandler
"/>

    <outbound>
      <pass-through-router>
        <jms:outbound-endpoint 
queue
="
test.out
">
      </pass-through-router>
    </outbound>
  </service>
</model>

Request-Response

在简单的Request-Response场景中,服务在一个同步的入口端点上接收请求,并处理该请求,然后将它作为回复发送给调用者。例如,如果 用户在 HTML表单中输入一个值,想转换该值并将其结果显示在同一个页面上,那么可以在该服务上简单地配置一个同步入站端点,由该服务完成数据转换。这种场景并 不需要使用出站端点。这就是request-response消息类型。

例如:

<model 
name
="
Request-Response_Message_Pattern
">
  <service 
name
="
SynchronousService
">

    
<!-- 为了返回response将synchronous的值设置为“true”-->

    <inbound>
      <http:inbound-endpoint 
host
="
localhost
" 
port
="
8080
"
               
path
="
/mule/services
" 
synchronous
="
true
"/>
    </inbound>

    
<!-- 指定处理该请求的组件 -->

    <component 
class
="
org.myorg.WidgetHandler
"/>

  </service>
</model>

同步

如果为了进一步处理消息,需要将消息传递给第二个服务,那么需要在第一个服务上配置一个出站路由器将该消息传递给第二个服务。在第二个服务处理完消 息后,第一个服务将它作为回复发送给调用者。值得注意的是将第一个服务设置为同步入口端点就意味着之后的所有服务都会以同步的方式处理该消息,所以无需在 第二个服务上设置synchronous属性的值。这就是同步消息类型。

例如:

<model 
name
="
Synchronous_Message_Pattern
">
  <service 
name
="
SynchronousService
">
    <inbound>
      
<!-- 为了返回response将synchronous的值设置为“true” -->


      <jms:inbound-endpoint 
queue
="
test.in
" 
synchronous
="
true
"/>
    </inbound>

    <component 
class
="
org.myorg.WidgetHandler
"/>

    <outbound>
      
<!-- 使用pass-through路由器时,如果想返回response必须将synchronous的值设置为“true”-->


      <pass-through-router>
        <!-- 设置出站端点 -->

        <jms:outbound-endpoint 
queue
="
test.out
" 
synchronous
="
true
"/>
      </pass-through-router>
    </outbound>
  </service>

  
<!-- 配置第二个服务,并将它的入站端点设置为上一个服务的出站端点的路径。
   值得注意的是无需设置synchronous的值,因为在第一个服务中已经将消息设置为synchronous了。
  -->


  <service>
    <inbound>
      <jms:inbound-endpoint 
queue
="
test.out
"/>
    </inbound>

    <component 
class
="
org.myorg.WidgetProcesser
"/>

  </service>
</model>

注意: 在Mule的以往版本中,远程服务需要设置remoteSync属性的值为true用于配置同步处理。从Mule 2.2开始,remoteSync属性已经被删除,只需要设置synchronous属性的值为true就可以创建同步流。

异步Request-Response

在大多数复杂的场景中,可以使用request-response消息,并使用后端(back-end)流程调用其它的服务,并基于多个服务调用的 结果异步地返回一个回复。你可以将入站端点的synchronous属性设置为false,因为异步回复路由器会处理该回复,除非你想给调用者发送响应。 这就是异步request-response消息类型。

在下面的例子中,HTTP端点接收一个请求,并使用Multicast路由器将该请求广播到两个端点,再将这些结果以异步的方式发送到一个JMS端点。

<model 
name
="
Async_Request-Response_Message_Pattern
">
  <service 
name
="
AsyncRequestResponseService
">
    <inbound>
      
<!-- 
      	将synchronous设置为“false”,因为response将由异步回复路由器处理  -->


      <http:inbound-endpoint 
host
="
localhost
" 
port
="
8080
"
               
path
="
/mule/services
" 
synchronous
synchronous="
false
"/>
    </inbound>

    <component 
class
="
org.myorg.WidgetHandler
"/>

    
<!-- 配置异步回复的设置。这个例子使用了收集异步回复路由器,
    	在发送回复信息之前,它将所有的响应信息收集在一起。 -->


    <async-reply 
timeout
="
5000
>
      <collection-async-reply-router/>
      <jms:inbound-endpoint 
queue
="
reply.queue
"/>
    </async-reply>

    
<!--设置负责接收和处理消息的端点以及回复消息的端点    -->


    <outbound>
      <multicasting-router>
        <reply-to 
address
="
jms://reply.queue
"/>
        <jms:outbound-endpoint 
queue
="
service1
" 
synchronous
="
false
"/>
        <jms:outbound-endpoint 
queue
="
service2
" 
synchronous
="
false
"/>
      </multicasting-router>
    </outbound>
  </service>
</model>

关于消息类型的全部内容可以参看Mule用户指南中的Mule的消息类型

现在我们已经理解了在不同的场景中可以使用哪种消息类型路由消息,下面让我们看看哪些路由器可以很好地控制消息路由。更多消息路由的信息可以参看Mule用户指南中的使用消息路由器

将消息传递到另一个端点

pass-through路由器是为简化端点间的消息传递而设计的。比如,它对分发消息给一个队列非常有用。

也可以使用pass-through路由器将协议桥接到其它的出站端点。例如:

<service 
name
="
HttpProxyService
">
  <inbound>
    <inbound-endpoint 
address
="
http://localhost:8888
" 
synchronous
="
true
"/>
  </inbound>

  <outbound>
    <pass-through-router>
      <outbound-endpoint
         
address
="
http://www.webservicex.net#[header:http.request]
"
         
synchronous
="
true
"/>
    </pass-through-router>
  </outbound>
</service>

当使用pass-through路由器时,如果想返回一个响应,必须将出站端点的synchronous属性设置为true。其它的路由器,比如 chaining路由器并不需将出站端点的synchronous属性设置为true,该路由器总会在同步的场景中返回一个响应。因此,如果将消费发送给 多个服务,可能会用chaining路由器代替pass-through路由器,因为chaining路由器中不需要将每个端点的synchronous 设置为true。

过滤消息

使用过滤器可以控制服务处理哪些消息。选择性消费者路由器(Selective Consumer Router)用于入站端点,它可以控制服务处理哪些消息。过滤路由器(Filtering Router)用于出站端点,可以控制哪些消息发送到下一个服务上。可以组合使用这些过滤器来控制消息流。

例如,如果只想处理不包含错误的消息,那么可以使用选择性消费者以确保只处理结果代码为success的消息。并使用Catch-all策略将其它的消息转发到另外端点上作为错误处理:

<inbound>
  <selective-consumer-router>
    <mulexml:jxpath-filter 
expression
="
msg/header/resultcode = 'success'
"/>
  </selective-consumer-router>

  <forwarding-catch-all-strategy>
    <jms:endpoint 
topic
="
error.topic
"/>
  </forwarding-catch-all-strategy>
</inbound>

在服务处理消息时,如果想通过指定的标准决定将消息发送到哪个端点,那么可以在出站端点上使用过滤路由器。在下面的示例中,将包含异常信息的消息发 送到系统管理员的email邮箱,将包含特定字符串的消息发送到名为string.queue的队列,并使用forwarding catch-all路由器接收余下的所有消息,并将它们发送到名为error.queue的死信队列:

<outbound>

  <filtering-router>
    <smtp:outbound-endpoint 
to
="
ross@muleumo.org
"/>
      <payload-type-filter 
expectedType
expectedType="
java.lang.Exception
"/>
  </filtering-router>

  <filtering-router>
    <jms:outbound-endpoint 
to
="
string.queue
"/>
    <and-filter>
      <payload-type-filter 
expectedType
="
java.lang.String
"/>
      <regex-filter 
pattern
="
the quick brown (.*)
"/>
    </and-filter>
  </filtering-router>

  <forwarding-catch-all-strategy>
    <jms:outbound-endpoint 
queue
="
error.queue
"/>
  </forwarding-catch-all-strategy>

</outbound>

与过滤路由器(filtering router)相似的路由器有转发路由器(forwarding router),它可以处理一些消息并可以选择性地将消息转发到其它路由器,还有wiretap router,这种路由器可以处理所有的消息,并将它们发送到端点上,同时也将消息的副本发送到另外一个端点。更多信息可以参看Mule用户指南中的入站路由器(Inbound Routers)

将多个出站端点链接在一起

假设我们有一个验证服务,当消息没有通过验证时,想将该消息以及验证异常转发到另一个服务,并将消息和验证异常返回给调用者。那么可以使用链接路由 器(chaining router),它是一个高速的、轻量级的可配置路由器,可用于将消息发送到端点,然后将该端点的输出结果发送到另一个端点。例如:

<chaining-router>
  
<!-- 首先,将消息发送到这个端点,用于验证。 -->


  <vm:outbound-endpoint path="
ValidationService
" synchronous="true"/>

  
<!-- 接着将包含表达式的消息发送到这个端点上 -->


  <vm:outbound-endpoint path="
ValidationError
" synchronous="
true
">
    <exception-type-filter expectedType="
java.lang.Exception
"/>
  </vm:outbound-endpoint>

</chaining-router>

消息分解

消息分解器(message splitter)可用于将输出消息(outgoing message)分解成多个部分,再将他们分发到配置在路由器(router)上的不同端点。例如,在订单处理应用中,如果想将经消息分解后的不同部分分 发给不同的服务去处理,那么可以使用下面的路由器:

列表消息分解器(List Message Splitter) :接收一个对象列表,这些对象将被路由到不同的端点。例如:

<outbound>
  <list-message-splitter-router">
    
<!-- 将order路由到队列order.queue -->


    <jms:outbound-endpoint 
queue
="
order.queue
">
      <payload-type-filter 
expectedType
="
com.foo.Order
"/>
    </jms:outbound-endpoint>

    
<!-- 将items路由到队列item.queue -->


    <jms:outbound-endpoint 
queue
="
item.queue
">
      <payload-type-filter 
expectedType
="
com.foo.Item
"/>
    </jms:outbound-endpoint>

  </list-message-splitter-router>
</outbound>

表达式分解路由器(Expression Splitter Router) :它与列表消息分解器相似,只是它是基于表达式分解消息,将消息分解成一个或者多个部分。例如:

<outbound>
  <expression-splitter-router
          
evaluator
="
xpath
"
          
expression
="
/mule:mule/mule:model/mule:service
"
          
disableRoundRobin
="
true
"
          
failIfNoMatch
="
false
">

    <outbound-endpoint 
ref
="
service1
">
      <expression-filter
          
evaluator
="
xpath
"
          
expression
="
/mule:service/@name = 'service splitter'
"/>
    </outbound-endpoint>

    <outbound-endpoint 
ref
="
service2
">
      <expression-filter
          
evaluator
="
xpath
"
          
expression
="
/mule:service/@name = 'round robin deterministic'
"/>
    </outbound-endpoint>

  </expression-splitter-router>
</outbound>

关于如何配置表达式分解路由器的更多信息,可以参看Mule的用户手册中的表达式配置参考(Expressions Configuration Reference)

为了提高性能也可以将消息分解成多个部分。轮叫(Round Robin)消息分解器将消息分解成多个部分,并以轮叫(round-robin)的方式将它们发送到端点。Message Chunking Router将消息按固定长度分解成多个部分,并将它们路由到同一个端点。

消息分解之后,可以使用Message Chunking Aggregator重新将消息块聚合在一起。该聚合器(aggregator)通过关联ID(correlation ID)来识别哪些消息块属于同一个消息,关联ID(correlation ID)在出站路由器(outbound router)上设置。

<inbound>
  <message-chunking-aggregator-router>

    <expression-message-info-mapping
          
correlationIdExpression
="
#[header:correlation]
"/>
    <payload-type-filter 
expectedType
="
org.foo.some.Object
"/>

  </message-chunking-aggregator-router>
</inbound>

处理消息仅有一次

幂等接收器(Idempotent Receiver) 通过核对输入消息的唯一消息ID来保证只有拥有唯一ID的消息才能被服务所接 收。消息ID可以通过使用一个表达式从消息中产生,该表达式在 idExpression属性中定义。#[message:id]是默认的表达式,也就是说如果要实现该功能,端点必须支持唯一性消息ID。在下面的例子 中,唯一性ID是由消息ID和消息标头中标签的内容组合而成。所有的消息ID都被记录到一个简单的文本文件中,用于追踪哪些消息已经处理过。

<inbound>
  <idempotent-receiver-router 
idExpression
="
#[message:id]-#[header:label]
">
    <simple-text-file-store 
directory
="
./idempotent"/
>
  </idempotent-receiver-router>
</inbound>

通过组件绑定调用外部服务

除了使用消息路由器控制服务间的消息流之外,也可以通过组件绑定(Component Bindings)调用处理消息的外部服务(External Service)。

在这个方法中,可以将Mule的端点绑定到Java接口方法。该方法的优势在于,在组件仍在处理消息时,你可以使用外部服务,而无需使用Mule的 API 或者修改组件的代码。相反,只需要在XML配置文件中配置组件绑定,从而指定外部服务的端点。例如,在下面的绑定例子中,当sayHello方法被调用 时,HelloInterface中的sayHello方法会调用外部的HelloWeb服务。

<component 
class
="
org.mule.examples.bindings.InvokerComponent
">
    <binding 
interface
="
org.mule.examples.bindings.HelloInterface
"
             
method
="
sayHello
">
        <cxf:outbound-endpoint
          
address
="
http://myhost.com:81/services/HelloWeb?method=helloMethod
"
          
synchronous
="
true
"/>
    </binding>
</component>

更多信息,可以参看Mule用户指南中的组件绑定(Component Bindings)

总结

Mule为控制应用中的消息如何交换提供了多种方法。本文总体上介绍了Mule的消息路由,以及它所支持的消息类型,一些常用的路由器和组件绑定。关于消息路由的完整信息,可以参看Mule 的用户手册

分享到:
评论

相关推荐

    Mule 实现路由分发

    以下是一些关于Mule路由分发的关键知识点: 1. **Mule Flow**: Mule的工作流程基于Flow的概念,Flow是Mule应用中的基本构建块,用于定义消息处理的序列。在路由分发中,Flow可以包含多个路由器,每个路由器决定消息...

    mule esb 3 tutorial.pdf

    2. **自定义路由逻辑**:了解如何通过编写自定义代码来扩展 Mule 的路由功能,实现更加灵活的消息处理。 3. **优化性能**:讨论如何通过调整路由逻辑来提高应用程序的整体性能。 通过以上内容的学习,开发者不仅...

    Mule3.6 Expression Language By 火花

    MEL 在几乎所有的 Mule 消息处理器中都可以使用,使得用户能够快速优雅地过滤、路由或处理 Mule 消息对象的不同部分。 #### 二、MEL 的优势 在 Mule 3.6 版本之前,开发者在处理消息内容时通常依赖于脚本语言(如 ...

    mule文档详解 mule

    Mule的工作流程通常包括消息的接收、转换、路由和发送。 **5. 安全性** Mule ESB提供了多种安全特性,如SSL/TLS加密、认证和授权、API网关管理和访问控制,以确保数据传输的安全性。 **6. 监控与管理** Mule ESB...

    Mule是一个企业服务总线(ESB)消息框架

    Mule是一个企业服务总线(ESB)消息框架.它的主要特性包括: 1.基于J2EE1.4的企业消息总线(ESB)和消息代理(broker). 2.可插入的连接性:比如Jms,jdbc,tcp,udp,multicast,http,servlet,smtp,pop3, file,xmpp等. 3.支持...

    Mule ESB手册-中文版

    Mule ESB支持多种传输协议、消息格式和数据转换,可以简化服务集成的复杂性,并提供强大的消息路由、转换和分发能力。 2. Mule Studio 使用 Mule Studio是基于Eclipse的集成开发环境(IDE),它为开发人员提供了一...

    Mule stdio 安装过程

    3. 消息路由:根据消息内容和复杂规则进行消息路由,具备过滤、聚合和重新排序的能力。 4. 服务创建和托管:可以将端点、EJB、Spring Bean及POJO暴露为服务,作为一个轻量级的服务容器来托管这些服务。 对于安装...

    MuleEsb开源框架简介.pdf

    消息路由基于消息内容和复杂规则路由消息,消息的过滤、聚合以及重新排列序号。 d. 服务创建和托管暴露端点、EJB、Spring Bean 以及 POJO 作为服务作为轻量级的服务容器进行服务托管。 Mule ESB 中的一些基本概念...

    mule 详细介绍 (soa esb)

    例如,当一个服务接收到一个请求时,Mule会根据预设的规则对消息进行解析、转换和路由,最终将处理后的消息传递给目标服务。 ### SOA与Mule的结合 在SOA环境中,Mule扮演着重要的角色。首先,Mule作为一个ESB,...

    mule in action 第二版英文正式版

    - Mule支持多种路由策略,如内容路由、条件路由等。 - **Components and Patterns (组件和模式)** - Mule提供了一系列组件,可用于构建复杂的集成流程。 - 设计模式对于提高集成系统的灵活性和可维护性至关重要...

    mule(java)开发简介

    Mule 是一个企业服务总线(Enterprise Service Bus, ESB)的消息框架,它被设计为轻量级且高度可扩展的ESB解决方案。Mule 的设计目标是简化企业应用程序之间的集成过程,同时确保系统的可维护性和可扩展性。 #### ...

    MuleESB帮助文档

    4. **智能路由和转换**:Mule ESB内置了强大的消息路由和转换机制,可以对数据进行处理和格式转换,确保不同系统间的数据一致性。 5. **安全性和监控**:提供全面的安全策略和监控工具,保障数据安全并确保系统的...

    mule-2.1.1-getting-started

    - **Mule的架构**:Mule采用了基于事件驱动的架构设计,主要由消息代理(Message Broker)、服务组件(Service Components)、路由策略(Routing Policies)等核心部分组成。 - **消息传递框架**:Mule提供了一个...

    MuleEsb开源框架简介

    Mule ESB的架构设计围绕着核心模块构建,包括Routing(路由)、Transaction Management(事务管理)、Transformation(转换)、Message Broker(消息代理)、Transportation Management(传输管理)和Security(安全...

    Mule in Action, Second Edition

    消息处理部分则深入讨论了如何使用Mule对消息进行处理,包括消息的路由、过滤和转换。 2. 连接器的使用 在企业集成中,连接器(Connectors)扮演了重要角色,它们是Mule ESB与外部系统通信的桥梁。第二版中会介绍...

    mule基本节点解释

    - 示例中的 `&lt;filtering-router&gt;` 表示过滤路由器,用于根据消息的不同属性将消息路由到不同的路径。 5. **`&lt;outbound&gt;`** - `&lt;outbound&gt;` 标签配置了服务的出站路由,即消息如何离开服务。这通常包含了处理完...

    muleide-2.1.1

    它主要用于实现不同应用之间的通信和数据交换,通过提供消息路由和转发服务,帮助构建松散耦合、模块化的系统。Mule ESB的核心特性包括: 1. **消息传输**:Mule ESB支持多种消息协议,如JMS、HTTP、FTP、TCP等,...

Global site tag (gtag.js) - Google Analytics