- 浏览: 17261 次
- 性别:
- 来自: 杭州
最新评论
mule的消息路由
- 博客分类:
- java
这个是从infoq载录下的文章,具体链接为http://www.infoq.com/cn/articles/mule-message-routing
mule 消息路由的几个概念
- 端点(Endpoints)定义了发送和接收消息的通道(channel)。例如,一个购物组件可能会接收到一个HTTP订单请求。一旦该组件处理完订单请求,它可能会发送一个 JMS消息到一个主题(topic)上,以通知审计系统,并返回一个HTTP响应。可以通过端点监听JMS消息、发送email、调用web services等。
- 入站路由器(Inbound router)控制服务如何处理入站(incoming)消息,比如,有选择地只消费符合特定条件的消息,或者在将消息转发给服务处理之前,将拥有同一group ID的消息聚合(group)在一起。
- 出站路由器(Outbound router)控制如何分发经服务处理过的消息,比如,将消息发送到一个接受者列表,或者将消息分解成多个部分,并将它们分别发送至不同的端点。
- 异步回复路由器(Asynchronous reply router)常用于request/response场景。在这些场景中,发送一个请求会触发一个或者更多的请求,并且在返回响应之前,需要考虑这些请求的结果。典型的例子是请求发送后,会并行执行任务(task)。在返回响应之前,必须执行完每个任务,处理完结果。
- Catch-all策略在当前消息找不到路由路径时才被调用。入站和出站端点都可以配置catch-all策略,因此可以捕获到任何孤立的消息,并将这些消息路由到一个共同的位置。
- 过滤器提供用于调用特定路由器的逻辑。通过逻辑过滤器AndFilter、OrFilter和NotFilter可以将过滤器组合在一起使用。并非所有的路由器都需要使用过滤器,但是所有的路由器都支持过滤器。
mule的消息类型
异步
如果只想将消息以“即发即弃(fire and forget)”的方式发送给一个服务,(并不需要给调用者返回响应),那么可使用异步消息类型。如果将入站端点的synchronous属性设置为false,它就不会给调用者返回响应。
<model name="Asynchronous_Message_Pattern">
<service name="AsynchronousService">
<inbound>
<jms:inbound-endpoint queue="test.in" synchronous="false"/>
</inbound>
<component class="org.myorg.WidgetHandler"/>
<outbound>
<pass-through-router>
<jms:outbound-endpoint queue="test.out">
</pass-through-router>
</outbound>
</service>
</model>
Request-Response
在简单的Request-Response场景中,服务在一个同步的入口端点上接收请求,并处理该请求,然后将它作为回复发送给调用者。例如,如果用户在 HTML表单中输入一个值,想转换该值并将其结果显示在同一个页面上,那么可以在该服务上简单地配置一个同步入站端点,由该服务完成数据转换。这种场景并不需要使用出站端点。这就是request-response消息类型。
<model name="Request-Response_Message_Pattern">
<service name="SynchronousService">
<!-- 为了返回response将synchronous的值设置为“true”--> <inbound> <http:inbound-endpoint host="localhost" port="8080"
path="/mule/services" synchronous="true"/>
</inbound>
<!-- 指定处理该请求的组件 -->
<component class="org.myorg.WidgetHandler"/>
</service>
</model>
同步
如果为了进一步处理消息,需要将消息传递给第二个服务,那么需要在第一个服务上配置一个出站路由器将该消息传递给第二个服务。在第二个服务处理完消息后,第一个服务将它作为回复发送给调用者。值得注意的是将第一个服务设置为同步入口端点就意味着之后的所有服务都会以同步的方式处理该消息,所以无需在第二个服务上设置synchronous属性的值。这就是同步消息类型。
<model name="Synchronous_Message_Pattern">
<service name="SynchronousService">
<inbound>
<!-- 为了返回response将synchronous的值设置为“true” -->
<jms:inbound-endpoint queue="test.in" synchronous="true"/>
</inbound>
<component class="org.myorg.WidgetHandler"/>
<outbound>
<!-- 使用pass-through路由器时,如果想返回response必须将synchronous的值设置为“true”-->
<pass-through-router>
<!-- 设置出站端点 -->
<jms:outbound-endpoint queue="test.out" synchronous="true"/>
</pass-through-router>
</outbound>
</service>
<!-- 配置第二个服务,并将它的入站端点设置为上一个服务的出站端点的路径。
值得注意的是无需设置synchronous的值,因为在第一个服务中已经将消息设置为synchronous了。
-->
<service>
<inbound>
<jms:inbound-endpoint queue="test.out"/>
</inbound>
<component class="org.myorg.WidgetProcesser"/>
</service>
</model>
异步Request-Response
在大多数复杂的场景中,可以使用request-response消息,并使用后端(back-end)流程调用其它的服务,并基于多个服务调用的结果异步地返回一个回复。你可以将入站端点的synchronous属性设置为false,因为异步回复路由器会处理该回复,除非你想给调用者发送响应。这就是异步request-response消息类型。
在下面的例子中,HTTP端点接收一个请求,并使用Multicast路由器将该请求广播到两个端点,再将这些结果以异步的方式发送到一个JMS端点。
<model name="Async_Request-Response_Message_Pattern">
<service name="AsyncRequestResponseService">
<inbound>
<!--
将synchronous设置为“false”,因为response将由异步回复路由器处理 -->
<http:inbound-endpoint host="localhost" port="8080"
path="/mule/services" synchronoussynchronous="false"/>
</inbound>
<component class="org.myorg.WidgetHandler"/>
<!-- 配置异步回复的设置。这个例子使用了收集异步回复路由器,
在发送回复信息之前,它将所有的响应信息收集在一起。 -->
<async-reply timeout="5000>
<collection-async-reply-router/>
<jms:inbound-endpoint queue="reply.queue"/>
</async-reply>
<!--设置负责接收和处理消息的端点以及回复消息的端点 -->
<outbound>
<multicasting-router>
<reply-to address="jms://reply.queue"/>
<jms:outbound-endpoint queue="service1" synchronous="false"/>
<jms:outbound-endpoint queue="service2" synchronous="false"/> </multicasting-router> </outbound> </service> </model>
将消息传递到另一个端点
pass-through路由器是为简化端点间的消息传递而设计的。比如,它对分发消息给一个队列非常有用。
也可以使用pass-through路由器将协议桥接到其它的出站端点。例如:
<service name="HttpProxyService">
<inbound>
<inbound-endpoint address="http://localhost:8888" synchronous="true"/>
</inbound>
<outbound>
<pass-through-router>
<outbound-endpoint
address="http://www.webservicex.net#[header:http.request]"
synchronous="true"/>
</pass-through-router>
</outbound>
</service>
当使用pass-through路由器时,如果想返回一个响应,必须将出站端点的synchronous属性设置为true。其它的路由器,比如 chaining路由器并不需将出站端点的synchronous属性设置为true,该路由器总会在同步的场景中返回一个响应。因此,如果将消费发送给多个服务,可能会用chaining路由器代替pass-through路由器,因为chaining路由器中不需要将每个端点的synchronous 设置为true。
过滤消息
使用过滤器可以控制服务处理哪些消息。选择性消费者路由器(Selective Consumer Router)用于入站端点,它可以控制服务处理哪些消息。过滤路由器(Filtering Router)用于出站端点,可以控制哪些消息发送到下一个服务上。可以组合使用这些过滤器来控制消息流。
例如,如果只想处理不包含错误的消息,那么可以使用选择性消费者以确保只处理结果代码为success的消息。并使用Catch-all策略将其它的消息转发到另外端点上作为错误处理:
<inbound> <selective-consumer-router> <mulexml:jxpath-filter expression="msg/header/resultcode = 'success'"/> </selective-consumer-router> <forwarding-catch-all-strategy> <jms:endpoint topic="error.topic"/> </forwarding-catch-all-strategy> </inbound>
在服务处理消息时,如果想通过指定的标准决定将消息发送到哪个端点,那么可以在出站端点上使用过滤路由器。在下面的示例中,将包含异常信息的消息发送到系统管理员的email邮箱,将包含特定字符串的消息发送到名为string.queue的队列,并使用forwarding catch-all路由器接收余下的所有消息,并将它们发送到名为error.queue的死信队列:
<outbound> <filtering-router> <smtp:outbound-endpoint to="ross@muleumo.org"/> <payload-type-filter expectedTypeexpectedType="java.lang.Exception"/> </filtering-router> <filtering-router> <jms:outbound-endpoint to="string.queue"/> <and-filter> <payload-type-filter expectedType="java.lang.String"/> <regex-filter pattern="the quick brown (.*)"/> </and-filter> </filtering-router> <forwarding-catch-all-strategy> <jms:outbound-endpoint queue="error.queue"/> </forwarding-catch-all-strategy> </outbound>
与过滤路由器(filtering router)相似的路由器有转发路由器(forwarding router),它可以处理一些消息并可以选择性地将消息转发到其它路由器,还有wiretap router,这种路由器可以处理所有的消息,并将它们发送到端点上,同时也将消息的副本发送到另外一个端点。更多信息可以参看Mule用户指南中的入站路由器(Inbound Routers)。
将多个出站端点链接在一起
假设我们有一个验证服务,当消息没有通过验证时,想将该消息以及验证异常转发到另一个服务,并将消息和验证异常返回给调用者。那么可以使用链接路由器(chaining router),它是一个高速的、轻量级的可配置路由器,可用于将消息发送到端点,然后将该端点的输出结果发送到另一个端点。例如:
<chaining-router> <!-- 首先,将消息发送到这个端点,用于验证。 --> <vm:outbound-endpoint path="ValidationService" synchronous="true"/> <!-- 接着将包含表达式的消息发送到这个端点上 --> <vm:outbound-endpoint path="ValidationError" synchronous="true"> <exception-type-filter expectedType="java.lang.Exception"/> </vm:outbound-endpoint> </chaining-router>
消息分解
消息分解器(message splitter)可用于将输出消息(outgoing message)分解成多个部分,再将他们分发到配置在路由器(router)上的不同端点。例如,在订单处理应用中,如果想将经消息分解后的不同部分分发给不同的服务去处理,那么可以使用下面的路由器:
列表消息分解器(List Message Splitter):接收一个对象列表,这些对象将被路由到不同的端点。例如:
<outbound> <list-message-splitter-router"> <!-- 将order路由到队列order.queue --> <jms:outbound-endpoint queue="order.queue"> <payload-type-filter expectedType="com.foo.Order"/> </jms:outbound-endpoint> <!-- 将items路由到队列item.queue --> <jms:outbound-endpoint queue="item.queue"> <payload-type-filter expectedType="com.foo.Item"/> </jms:outbound-endpoint> </list-message-splitter-router> </outbound>
表达式分解路由器(Expression Splitter Router):它与列表消息分解器相似,只是它是基于表达式分解消息,将消息分解成一个或者多个部分。例如:
<outbound> <expression-splitter-router evaluator="xpath" expression="/mule:mule/mule:model/mule:service" disableRoundRobin="true" failIfNoMatch="false"> <outbound-endpoint ref="service1"> <expression-filter evaluator="xpath" expression="/mule:service/@name = 'service splitter'"/> </outbound-endpoint> <outbound-endpoint ref="service2"> <expression-filter evaluator="xpath" expression="/mule:service/@name = 'round robin deterministic'"/> </outbound-endpoint> </expression-splitter-router> </outbound>
关于如何配置表达式分解路由器的更多信息,可以参看Mule的用户手册中的表达式配置参考(Expressions Configuration Reference)。
为了提高性能也可以将消息分解成多个部分。轮叫(Round Robin)消息分解器将消息分解成多个部分,并以轮叫(round-robin)的方式将它们发送到端点。Message Chunking Router将消息按固定长度分解成多个部分,并将它们路由到同一个端点。
消息分解之后,可以使用Message Chunking Aggregator重新将消息块聚合在一起。该聚合器(aggregator)通过关联ID(correlation ID)来识别哪些消息块属于同一个消息,关联ID(correlation ID)在出站路由器(outbound router)上设置。
<inbound> <message-chunking-aggregator-router> <expression-message-info-mapping correlationIdExpression="#[header:correlation]"/> <payload-type-filter expectedType="org.foo.some.Object"/> </message-chunking-aggregator-router> </inbound>
处理消息仅有一次
幂等接收器(Idempotent Receiver)通过核对输入消息的唯一消息ID来保证只有拥有唯一ID的消息才能被服务所接收。消息ID可以通过使用一个表达式从消息中产生,该表达式在 idExpression属性中定义。#[message:id]是默认的表达式,也就是说如果要实现该功能,端点必须支持唯一性消息ID。在下面的例子中,唯一性ID是由消息ID和消息标头中标签的内容组合而成。所有的消息ID都被记录到一个简单的文本文件中,用于追踪哪些消息已经处理过。
<inbound> <idempotent-receiver-router idExpression="#[message:id]-#[header:label]"> <simple-text-file-store directory="./idempotent"/> </idempotent-receiver-router> </inbound>
通过组件绑定调用外部服务
在这个方法中,可以将Mule的端点绑定到Java接口方法。该方法的优势在于,在组件仍在处理消息时,你可以使用外部服务,而无需使用Mule的API 或者修改组件的代码。相反,只需要在XML配置文件中配置组件绑定,从而指定外部服务的端点。例如,在下面的绑定例子中,当sayHello方法被调用时,HelloInterface中的sayHello方法会调用外部的HelloWeb服务。
<component class="org.mule.examples.bindings.InvokerComponent"> <binding interface="org.mule.examples.bindings.HelloInterface" method="sayHello"> <cxf:outbound-endpoint address="http://myhost.com:81/services/HelloWeb?method=helloMethod" synchronous="true"/> </binding> </component>
更多信息,可以参看Mule用户指南中的组件绑定(Component Bindings)。
发表评论
-
从死锁问题说起
2013-07-13 16:24 797前几天发生了一个枚举死锁问题,下面分析下,同时将java初 ... -
java 统计文件夹大小
2013-07-06 17:48 653今天群里 有人发了个比赛的题目,就是实现类似linux 下 ... -
oscache 的问题
2013-06-29 09:42 768一:现象: 系统页面无法打开,数据显示异常 二:原因分 ... -
java序列化3
2013-06-29 09:41 931上面的java,hessian和fastjson的序列化, ... -
java序列化2
2013-06-29 09:40 826Java序列化有两个重要的问题,第一个是冗余数据太多,也就 ... -
java序列化1
2013-06-27 09:39 582前言: 如果仔细 ... -
动态代理的简单分析(2)
2013-06-27 09:31 590下面我们再来看下cglib的实现,首先看下测试类吧,上面已经 ... -
动态代理的简单分析
2013-06-27 09:28 572动态代理的东东 听起来很牛,用起别人提供的现成的工具,也很方 ... -
设计模式的简单总结
2013-06-26 18:33 4911、单例模式 单例,延迟加载,静态初始化,以及 ... -
独立部署 需要注意的点
2013-06-24 12:04 530独立部署 之前先将所要得到的东西准备好,可以节省很多时间的。 ... -
mule的链接
2011-07-12 21:18 727ESB架构之企业实施案例:http://www.infoq. ... -
今天决定写技术blog 了
2011-07-11 19:15 7861、今天决定写博客了,每天10点回家吧 2、每天一篇,不能懈 ... -
每周一点
2011-03-03 14:17 732业界动态2011年智能手机:Android继续闪耀或暗淡? ... -
技术生产
2010-11-29 14:00 649学习的模式 1、在项目中学习 2、随时记录,分门别类的记录 3 ... -
Java编程 的动态性,第 2部分: 引入反射
2010-11-15 21:30 493反射使您的程序代码能 ... -
Java编程 的动态性,第 2部分: 引入反射
2010-11-15 21:29 511反射使您的程序代码能 ... -
Java 编程的动态性,第3部分: 应用反射
2010-11-15 21:28 569命令行参数处理是一项令人厌烦的零碎工作,不管您过去已经处理过多 ... -
Java 编程的动态性, 第四部分: 用 Javassist 进行类转换
2010-11-15 21:27 558厌倦了只能按编写好源代码的方式执行的 Java 类了吗?那么打 ...
相关推荐
mule 根据JAVA 代码实现路由分发。
muleide-2.1.1,在eclipse中的插件,支持mule项目的动态发布 mule是开源的ESB项目,支持消息路由和转发,是比较全面的企业服务总线工具。。
Mule是一个企业服务总线(ESB)消息框架.它的主要特性包括: 1.基于J2EE1.4的企业消息总线(ESB)和消息代理(broker). 2.可插入的连接性:比如Jms,jdbc,tcp,udp,multicast,http,servlet,smtp,pop3, file,xmpp等. 3.支持...
Mule是一个企业服务总线(ESB)消息框架.它的主要特性包括: 1.基于J2EE1.4的企业消息总线(ESB)和消息代理(broker). 2.可插入的连接性:比如Jms,jdbc,tcp,udp,multicast,http,servlet,smtp,pop3, file,xmpp等. 3.支持...
图整体结构从上图可见,Mule通过Transports/Connectors与外围的异构系统连接,提供Routing(路由)、TransactionManagement(事务管理)、Transformation(转换)、MessageBroker(消息代理)、...
您可以使用Mule来智能地管理节点之间的消息路由,数据映射,编排,可靠性,安全性和可伸缩性,而不是在系统,服务,API和设备之间创建点对点集成。 将其他系统和应用程序插入Mule,并使其处理系统之间的所有通信,...
您可以使用 Mule 智能地管理节点之间的消息路由、数据映射、编排、可靠性、安全性和可扩展性,而不是在系统、服务、API 和设备之间创建点对点集成。 将其他系统和应用程序插入 Mule 并让它处理系统之间的所有通信,...
然后,一系列自定义过滤器会捕获 JSON 数据中的异常并将消息路由到错误处理流。 输入数据经过验证,如果数据丢失或无效,则会引发适当的异常。设置并运行示例将示例项目作为 mule 应用程序运行使用 Postman 发出...
介绍企业服务总线Mule ESB。使用Mule ESB的过程中,体会到其快速...日常使用中esb主要还是路由功能,把一些系统的借口接入esb。本课主要介绍http接入,数据库操作,mq操作。通过学习,可以轻松使用mule esb,避免编码。
梅西JavaScript中的消息集成,受Mule,Apache Camel,SimpleMule,WIP的启发。 它具有消息生产者,消费者和路线。 路由对传入消息实施一系列转换。安装通过Node上的npm: npm install messi用法程序中的参考var ...