本文的目的是在 broker 端实现消息的路由分发,通俗点讲就是,根据消息的特征将消息分发到不同的 queue 或者 topic 上。要实现消息路由,最简单的方式是在 activemq 提供的 xml 配置文件下面构建路由规则。
所使用的版本:
- ActiveMQ 5.6.0
- Camel 2.9.2
在 ActiveMQ 的每个发行版的 conf 目录下包含了很多的示例 xml 配置文件,它们是对 ActiveMQ 重要 feature 的配置举例,比如使用jdbc作持久化机制时针对不同数据库的配置、如何进行权限配置、如何配置 network broker 等等。在研究ActiveMQ的某项特征时,参考下这里面的配置会节省很多时间。本文将要实现的功能用到了 conf 目录下的 activemq.xml 和 camel.xml 两个文件。
activemq.xml 是启动 broker 时缺省的配置文件。如果要使用 camel,我们首先需要将 activemq.xml 中的 “import camel.xml” 的那行注释去掉。
<import resource="camel.xml"/>
camel.xml 中的 camelContext 中包含了一个简单的路由规则:
<route> <description>Example Camel Route</description> <from uri="activemq:example.A"/> <to uri="activemq:example.B"/> </route>
其中 uri 的格式为 activemq:[queue:|topic:]destinationName 在不指定 queue/topic 的情况下,默认为 queue。
它实现的功能很简单:所有发送到队列 example.A 上的消息都将转发到队列 example.B。
camel允许我们在activemq的broker端根据Message的特征实现路由分发。为此,我们需要一种可以描述路由规则的语言,而camel支持很多这样的语言(http://camel.apache.org/languages.html)。我选用了 Bean language,这种语言允许我们通过在bean中定义的过滤方法(返回类型为boolean)对消息进行过滤。并且,可以用spring下bean的定义方式,在xml文件下注册这个bean。
import javax.jms.JMSException; import org.apache.camel.Body; import org.apache.camel.Header; public class CamelBean { public boolean isEven(@Header("JMSType")String jmsType) throws JMSException { return "even".equals(jmsType); } public boolean isOdd(@Header("JMSType")String jmsType) throws JMSException { return "odd".equals(jmsType); } public boolean matchBody(@Body String body) { if (body.contains("aaa")) return true; return false; } }
我在CamelBean中示例了三种过滤方法。 isEven和isOdd都是通过消息头的JMSType作判断的,而matchBody方法则是检查实际的消息体中是否包含“aaa”关键字。
为了使用这个bean,我需要将该类打到jar包里,然后放到activemq主目录的lib路径下面。
下一步要做的就是在camel.xml文件下注册这个bean。
<bean id="camelBean" class="com.example.activemq.CamelBean"></bean>
最后,也是最关键的部分,即将这个bean与camel的路由机制关联。为此我们需要在 route下面使用 filter。配置如下:
<route> <description>Example Camel Route</description> <from uri="activemq:example.A"/> <filter> <method ref="camelBean" method="isEven" /> <to uri="activemq:example.even"/> </filter> <filter> <method ref="camelBean" method="isOdd" /> <to uri="activemq:example.odd"/> </filter> <filter> <method ref="camelBean" method="matchBody" /> <to uri="activemq:example.aaa"/> </filter> </route>
这段配置实现的功能是:
检查消息头的JMSType值,如果是“even”就将消息的一个副本发送到example.even队列(odd,情况同理);如果检查消息内容发现“aaa”关键字,则发送一个消息副本到example.aaa队列。注意,同一份消息是可能会被分发到多个队列的。
这种配置下,每个消息到达broker后,broker端要做多次检查,次数取决于filter的个数。因此,如果filter很多,而且broker接收到的消息量很大的话,性能上会有一定的影响。
附:创建并发送消息的部分示例代码:
private void sendMessage() { try { Queue queue = session.createQueue("example.A"); final MessageProducer producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.PERSISTENT); TextMessage message = session.createTextMessage("aaabbbbb"); if (ai.incrementAndGet() %2 == 0) { message.setJMSType("even"); } else { message.setJMSType("odd"); } producer.send(message); producer.close(); // session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
相关推荐
使用Camel配置ActiveMQ路由的实践方式
Apache Camel是一个基于规则路由和中介引擎,提供企业集成模式的Java对象(POJO)的实现,通过应用程序接口(或称为陈述式的Java领域特定语言(DSL))来配置路由和中介的规则。领域特定语言意味着Apache Camel支持你...
Camel IBM Websphere MQ 到 Active MQ 桥接路由 先决条件 IBM 为安装在 Fuse 上的 MQ 客户端提供了 OSGi jar 文件IBM_MQ_INSTALL_DIR/java/lib/OSGi 运行 AMQ 代理 带有填充属性的 JBOSS_FUSE_INSTALL_DIR/etc/ 中...
Apache Camel 是一个非常强大的基于规则的路由以及媒介引擎,该引擎提供了一个基于POJO的 企业应用模式(Enterprise Integration Patterns)的实现,你可以采用其异常强大且十分易用的API (可以说是一种Java的领域定义...
消耗来自CometD生产者的消息终端窗口-运行Java客户端应用程序-它将在骆驼服务器上触发JMS消息端到端地运行演示,您将看到Java应用程序将消息发送到ActiveMQ,ActiveMQ服务器上的Camel路由将提取消息并将其推送到...
阿帕奇骆驼展 一个展示 Apache Camel 的演示项目。 2015 年 3 月在惠灵顿 Java 用户组的演讲中使用。 有用信息 所有示例都应该在运行,但是您需要手动... 功能:安装activemq-camel 对于基于电子邮件的路由也很有用。
Apache Camel - JMS/ActiveMQ组件 Apache Camel - Jetty组件 Apache Camel - Timer组件 Apache Camel - JDBC组件 Apache Camel - Dynamic Control Route(动态控制) Camel 各种路由... 学习期间,参考的PDF资料,都...
骆驼路由从三个带有“CountryCode”消息头选择器的 activemq 队列(foo、bar 和)消费,然后将它们发送到“direct:start”生产者端点。 相应的“direct:start”消费者然后根据名称为“aggregatorid”的标头聚合与...
为了实现这种智能性,在路由包下构建了路由机制,您可以在其中找到扩展RouteBuilder的抽象路由器类和扩展此抽象类的两个不同的路由器。 您可以将这两个扩展类视为单个独立的微服务。一旦路由器将消息定向到相关的...
Camel使用URI来简化与所有传输或消息传递模型(包括HTTP,ActiveMQ,JMS,JBI,SCA,MINA或CXF)的集成,并使用可插拔数据格式选项。 Apache Camel是一个小型库,具有最小的依赖关系,可轻松嵌入任何Java应用程序中...
在此示例中,我们将使用两个容器,一个容器作为EnMasse实例运行,另一个容器作为运行Camel路由的代理的客户端。 此快速入门要求首先部署和运行EnMasse。 要将EnMasse安装到OpenShift或Kubernetes中,请遵循。 该...
: Apache Camel 路由使用 ActiveMQ 组件和JmsSSLConnectionFactoy进行 AMQ Online TLS 连接 :Spring 应用程序显示具有 TLS 连接的 AMQ JMS 池库 :使用 AMQ JMS Pool 和 TLS 连接的 Apache Camel 生产者和消费者...
Ehcache - Camel 缓存实现; Maven 的生命周期支持; 单元测试( jUnit启动嵌入式Jetty进行测试) 环境 应用程序是通过以下方式创建的: JDK 8u40 Tomcat 8.0.* 技术栈 阿帕奇骆驼 2.15 Spring 4.1(上下文) ...
然后,消费者将根据其公司名称(文件名)将该消息队列中的消息路由到单独的消息通道。 在这些渠道的每一个中,将解析csv文件内容的所有价格和金额,并将其存储在StockStatData中。 然后,它将使用Stat
介绍Camel允许您创建企业集成模式,以通过基于Spring或Blueprint的Xml配置文件或基于Scala DSL,以基于Java的特定于域的语言(或Fluent API)来实现路由和中介规则。 这意味着无论是在Java,Scala还是XML编辑器中,...
保险丝示例使用 Gradle 作为构建工具的 Jboss Fuse 结构的 JAXWS、JAXRS、JPA、... Fuse-Camel-ActiveMQ:本项目包含以下两条camel路由(i)ProducerRoute:此路由有一个 SOAP webservices 作为 From 端点。此服务将 Pers