注:本文以ActiveMQ5.10版本为基础。
我们知道,ActiveMQ中的TransportConnector(传输连接器)主要用于配置ActiveMQ服务端和客户端之间的通信方式,NetworkConnector(网络连接器)则主要用来配置ActiveMQ服务端与服务端之间的通信。在某些场景,网络拓扑中我们可能会需要大量的生产者和消费者,也就是说我们会有大量的ActiveMQ客户端,这样,我们就需要在网络环境中有多个MQ服务器,服务器之间是可以透传消息的,这是,我们就需要用到NetworkConnector,我们先从简单的分析起,假设我们需要部署两台MQ服务器,如下图:
使用NetworkConnector的简单场景
如上图所示,服务器S1和S2通过NewworkConnector相连,则生产者P1发送消息,消费者C3和C4都可以接收到,而生产者P3发送的消息,消费者C1和C2同样也可以接收到,要使用NewworkConnector的功能,需要在服务器S1的activemq.xml中的broker节点下添加如下配置(注:10.79.11.172:61617为S2的地址):
<networkConnectors>
<networkConnector uri="
static:(tcp://10.79.11.172:61617)
"/>
</networkConnectors>
<networkConnectors>
<networkConnector uri="
static:(tcp://10.79.11.171
:61617)
"/>
</networkConnectors>
<networkConnectors>
<networkConnector uri="
static:(tcp://10.79.11.171
:61617,tcp://10.79.11.173:61617,tcp://10.79.11.174:61617
)
"/>
</networkConnectors>
<networkConnector uri="multicast://default"/>
</networkConnectors>
decreaseNetworkConsumerPriority:默认值为false,如果该值为false,网络上的消费者和本地的消费者有相同的优先级,就是说一个消息发送到本地的消费者和其他的网络消费者有相同的概率。如果该值为true,则其他网络消费者的优先级从-5开始算起,根据网络跳数(network hops)来算优先级,当然是越“远”优先级越低。
networkTTL:默认值为1,该值表示消息或订阅可以通过的消息提供者(brokers)的数量。拿上图来说,假设P1向S1发了条消息,另外有个ActiveMQ服务器S3(上图未画出)与S2相互桥接,但不直接与S1桥接,因为该值为1,即使S3上有该队列的消费者或主题的订阅者,但对于S1来说S3这些网络消费者和订阅者是不可见的,所以P1发的这条消息就不能被S3上的消费者给消费,如果想要让S3上的消费者也有机会消费,可以让S3和S1桥接,或者将S1的networkTTL参数配置为2.
messageTTL:默认为1,从5.9版本开始有效,表示消息能通过的消息提供者数量,该值可以覆盖掉networkTTL中对消息通过数量的限制。
consumerTTL:默认为1,从5.9版本开始有效,表示订阅能通过的消息提供者数量,该值可以覆盖掉networkTTL中对订阅通过数量的限制。
conduitSubscriptions:默认为true。该值表示多个消费者订阅一个相同的目的地是否在网络上被对待为一个消费者。以上图为例,如果采用默认的分配策略,P1连续发120条消息,结果应该是C1和C2都收到40条,而C3和C4共收到40条(C3和C4平分这40条),为什么不是C1、C2、C3和C4都收到大概30条呢?因为该值为true,C3和C4对于S1来说只是一个消费者,而不是两个。注意,如果你的多个网络提供者中的消费者使用了消息选择器(selector),你又设置了conduitSubscriptions为true,则有可能会导致消息不会被发送到正确的网络消费者从而消息不被消费,因为网络消费者中的消息选择器对生产者的提供者来说是透明的,所以此种情况下,请将conduitSubscriptions设置为false。
excludedDestinations:默认为空,用来配置不会在网络中转发的目的地(Destination)。
dynamicallyIncludedDestinations:默认为空,用来配置可以在网络中转发的目的地,和上面的excludedDestinations的功能相反。例如:
<networkConnector uri="static:(tcp://host)">
<dynamicallyIncludedDestinations>
<queue physicalName="queue1"/>
<topic physicalName="topic1"/>
</dynamicallyIncludedDestinations>
</networkConnector>
staticallyIncludedDestinations:默认为空,用来配置可以在网络中转发的目的地,但它和dynamicallyIncludedDestinations不同的是dynamicallyIncludedDestinations只在对方有该目的地的消费者时才将消息转发给对方,staticallyIncludedDestinations则不管对方有没有该目的地都将消息转发给对方。
举例配置如下:
<networkConnectors>
<networkConnector uri="static:(tcp://0.0.0.0:61616,tcp://0.0.0.0:61618)" >
<staticallyIncludedDestinations>
<queue physicalName="TopicTestQueue1"/>
</staticallyIncludedDestinations>
</networkConnector>
</networkConnectors>
duplex:默认值为false,默认情况下,在两个提供者之间的连接上的消息流动方向是单向(单工连接),如果该值设置为true,则一个连接上可以双向流动消息(双工连接)。可以想象,单工连接比双工连接吞吐量要高,但增加了连接数量方面的开销。如果在使用双工连接时为了增加吞吐量,可以建立多个双工连接,但每个连接必须起不同的名字,例如:
<networkConnectors>
<networkConnector name="SYSTEM1" duplex="true" uri="static:(tcp://10.79.6.56:61616)">
<dynamicallyIncludedDestinations>
<topic physicalName="outgoing.System1" />
</dynamicallyIncludedDestinations>
</networkConnector>
<networkConnector name="SYSTEM2" duplex="true" uri="static:(tcp://10.79.6.56:61616)">
<dynamicallyIncludedDestinations>
<topic physicalName="outgoing.System2"/>
</dynamicallyIncludedDestinations>
</networkConnector>
</networkConnectors>
prefetchSize:默认是1000,指网络连接中的消费者的预读数量,该值必须大于0,因为网络消费者不能采用poll来获取消息(轮询消息)。
suppressDuplicateQueueSubscriptions:(从5.3版本开始有效)默认为false,如果该参数为true,会防止提供者之间相互桥接时的队列重复订阅的问题,其实该参数为true和false只是ActiveMQ代码中的微妙的性能变化,对实际的集群结果并不会带来什么影响。
bridgeTempDestinations:默认值为true,该值用来指定在MQ服务器在网络中创建临时目的地(temp destinations)时是否需要广播公告消息(broadcast advisory messages )。如果设置为false,表示禁用此功能,则当生产者和消费者不是连接到同一个MQ服务器时,可以减少网络开销。
alwaysSyncSend:(从5.6版本开始有效)默认为false,表示是否总是异步发送。默认时,非持久化消息被发送到远程的MQ服务器时将采用请求/应答方式而不是oneway方式。此参数对待持久化和非持久化消息相同。
staticBridge:(从5.6版本开始有效)默认为false,如果设置为true,MQ服务器将不会动态应答新的消费者,也意味着该提供者将对其他远程提供者的主题不感兴趣。当设置为true是,我们可以使用staticallyIncludedDestinations来创建配置需要订阅的主题,例如:
<networkConnector uri="static:(tcp://host)" staticBridge="true">
<staticallyIncludedDestinations>
<queue physicalName="always.include.queue"/>
</staticallyIncludedDestinations>
</networkConnector>
消息如果一开始就是持久化的化的或者是被持久订阅的,则在网络经纪人中传播时也会保持这种特性,然而,如果消息一开始就不是持久化的或者不是持久订阅的,则在网络的MQ服务器中传播时也一定不会有这些特性。
总的消息顺序在网络经纪人中是不保证的,当然,即使不是网络经纪人,在单个经纪人环境下,有多个消费者时,消息被消费的顺序也是不保证的。
如果你希望提供者不受任何远程提供者上消费者的影响,或者希望不管远程提供者上是否有消费者都将所有消息转发到远程提供者,那么你可以考虑使用静态网络(static networks)。
就如前面你说看到的,如果你想增加吞吐量,或者想采用不同的通信方式(如tcp或者nio),又或者你想采用更灵活的配置,两个提供者之间可以由多个NetworkConnector相连, 例如,如果使用分布式队列(distributed queues),你可能希望在通过网络对队列接收者能采用等效加权(equivalent weighting),但只针对活跃的接收者,你可以如下配置:
<networkConnectors>
<networkConnector uri="static:(tcp://localhost:61617)" name="queues_only" conduitSubscriptions="false" decreaseNetworkConsumerPriority="false">
<excludedDestinations>
<topic physicalName=">"/>
</excludedDestinations>
</networkConnector>
</networkConnectors>
<policyMap>
<policyEntries>
<policyEntry queue="TEST.>" enableAudit="false">
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
</networkBridgeFilterFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
感兴趣的童鞋可以去看看官方文档:http://activemq.apache.org/networks-of-brokers.html
相关推荐
高可用之ActiveMQ集群:网络连接模式(network connector)详解
ActiveMQ集群:网络连接模式(network connector)详解
前言1、ActiveMQ的networkConnector是什么在某些情况下,需要多个ActiveMQ的Broker做集群,那么就涉及到Broker到Broke
使用activemq依赖库连接, 该项目为java工程,内有ssl证书生成方式链接,不清楚可私信
ActiveMQ环境搭建及实例详解的源码 ActiveMQ环境搭建及实例详解的源码
ActiveMQ如何整合Spring并使用连接池的方式,还有一些必须基础jar包
activemq5.3.1整合应用服务器详解,此文档适合刚开始接触activem嵌入其他容器的开发模式的开发人员参考的。
ActiveMQ spring 配置方案详解。
001-ActiveMQ基础;002-安全机制+签收模式+发送模式+MessageProducer;...ActiveMQ集群:网络连接模式(network connector)详解.docx;ActiveMQ集群:网络连接模式(network connector)详解.docx;示例;
自己实现的ActiveMQ连接池和新版本ActiveMQ自带的连接池,封装好的工具类,可直接使用
ActiveMQ集群:网络连接模式(network connector)详解.docx
Apache ActiveMQ Queue Topic 详解 教程 加入代码解释说明
自己实现的ActiveMQ连接池和新版本ActiveMQ自带的连接池,封装好的工具类,可直接使用,自己在网上找的时候好多都是纯Demo性质的,没有好的思想可以学习,希望我的劳动成果可以供你学习借鉴
NULL 博文链接:https://gong1208.iteye.com/blog/1555582
activeMq是一个开源的支持JMS的框架:(以下为考录他人的信息,如有版权问题,请联系) 一、特性及优势 1、实现JMS1.1规范,支持J2EE1.4以上 2、可运行于任何jvm和大部分web容器(ActiveMQ works great in any ...
赠送jar包:activemq-protobuf-1.1.jar; 赠送原API文档:activemq-protobuf-1.1-javadoc.jar; 赠送源代码:activemq-protobuf-1.1-sources.jar; 包含翻译后的API文档:activemq-protobuf-1.1-javadoc-API文档-...
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的...
activemq消息测试工具
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。 下载本压缩包后解压运行里面的activemq.bat即可。 http://192.168.0.61:8161/ 是管理...如需在JAVA中使用,请下载本人的 ActiveMQ简单JAVA项目案例。
下载后可直接导入Maven项目运行,包含ActiveMQ的连接代码以及数据插入和读取测试