- 浏览: 478766 次
- 性别:
- 来自: 大连
文章分类
最新评论
-
龘龘龘:
TrueBrian 写道有个问题,Sample 1中,为了控制 ...
What's New on Java 7 Phaser -
龘龘龘:
楼主总结的不错。
What's New on Java 7 Phaser -
TrueBrian:
有个问题,Sample 1中,为了控制线程的启动时机,博主实际 ...
What's New on Java 7 Phaser -
liguanqun811:
不知道楼主是否对zookeeper实现的分布式锁进行过性能测试 ...
Distributed Lock -
hobitton:
mysql的get lock有版本限制,否则get lock可 ...
Distributed Lock
2.6 Features
ActiveMQ包含了很多功能强大的特性,下面简要介绍其中的几个。
2.6.1 Exclusive Consumer
Queue中的消息是按照顺序被分发到consumers的。然而,当你有多个consumers同时从相同的queue中提取消息时,你将失去这个保证。因为这些消息是被多个线程并发的处理。有的时候,保证消息按照顺序处理是很重要的。例如,你可能不希望在插入订单操作结束之前执行更新这个订单的操作。
ActiveMQ从4.x版本起开始支持Exclusive Consumer (或者说Exclusive Queues)。 Broker会从多个consumers中挑选一个consumer来处理queue中所有的消息,从而保证了消息的有序处理。如果这个consumer失效,那么broker会自动切换到其它的consumer。
可以通过Destination Options 来创建一个Exclusive Consumer,如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true"); consumer = session.createConsumer(queue);
顺便说一下,可以给consumer设置优先级,以便针对网络情况(如network hops)进行优化,如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true &consumer.priority=10");
2.6.2 Message Groups
用Apache官方文档的话说,Message Groups rock!它是Exclusive Consumer功能的增强。逻辑上,Message Groups 可以看成是一种并发的Exclusive Consumer。跟所有的消息都由唯一的consumer处理不同,JMS 消息属性JMSXGroupID 被用来区分message group。Message Groups特性保证所有具有相同JMSXGroupID 的消息会被分发到相同的consumer(只要这个consumer保持active)。另外一方面,Message Groups特性也是一种负载均衡的机制。
在一个消息被分发到consumer之前,broker首先检查消息JMSXGroupID属性。如果存在,那么broker 会检查是否有某个consumer拥有这个message group。如果没有,那么broker会选择一个consumer,并将它关联到这个message group。此后,这个consumer会接收这个message group的所有消息,直到:
- Consumer被关闭。
- Message group被关闭。通过发送一个消息,并设置这个消息的JMSXGroupSeq为0。
从4.1版本开始,ActiveMQ支持一个布尔字段JMSXGroupFirstForConsumer 。当某个message group的第一个消息被发送到consumer的时候,这个字段被设置。如果客户使用failover transport连接到broker。在由于网络问题等造成客户重新连接到broker的时候,相同message group的消息可能会被分发到不同与之前的consumer,因此JMSXGroupFirstForConsumer字段也会被重新设置。
以下是使用message groups的例子:
Mesasge message = session.createTextMessage("<foo>hey</foo>"); message.setStringProperty("JMSXGroupID", "IBM_NASDAQ_20/4/05"); ... producer.send(message);
2.6.3 JMS Selectors
JMS Selectors用于在订阅中,基于消息属性对进行消息的过滤。JMS Selectors由SQL92语法定义。以下是个Selectors的例子:
consumer = session.createConsumer(destination, "JMSType = 'car' AND weight > 2500");
在JMS Selectors表达式中,可以使用IN、NOT IN、LIKE等,例如:
LIKE '12%3' ('123' true,'12993' true,'1234' false)
LIKE 'l_se' ('lose' true,'loose' false)
LIKE '\_%' ESCAPE '\' ('_foo' true,'foo' false)
需要注意的是,JMS Selectors表达式中的日期和时间需要使用标准的long型毫秒值。另外表达式中的属性不会自动进行类型转换,例如:
myMessage.setStringProperty("NumberOfOrders", "2");
"NumberOfOrders > 1" 求值结果是false。关于JMS Selectors的详细文档请参考javax.jms.Message的javadoc。
上一小节介绍的Message Groups虽然可以保证具有相同message group的消息被唯一的consumer顺序处理,但是却不能确定被哪个consumer处理。在某些情况下,Message Groups可以和JMS Selector一起工作,例如:
设想有三个consumers分别是A、B和C。你可以在producer中为消息设置三个message groups分别是"A"、"B"和"C"。然后令consumer A使用"JMXGroupID = 'A'"作为selector。B和C也同理。这样就可以保证message group A的消息只被consumer A处理。需要注意的是,这种做法有以下缺点:
- producer必须知道当前正在运行的consumers,也就是说producer和consumer被耦合到一起。
- 如果某个consumer失效,那么应该被这个consumer消费的消息将会一直被积压在broker上。
2.6.4 Pending Message Limit Strategy
首先简要介绍一下prefetch机制。ActiveMQ通过prefetch机制来提高性能,这意味这客户端的内存里可能会缓存一定数量的消息。缓存消息的数量由prefetch limit来控制。当某个consumer的prefetch buffer已经达到上限,那么broker不会再向consumer分发消息,直到consumer向broker发送消息的确认。可以通过在ActiveMQConnectionFactory或者ActiveMQConnection上设置ActiveMQPrefetchPolicy对象来配置prefetch policy。也可以通过connection options或者destination options来配置。例如:
tcp://localhost:61616?jms.prefetchPolicy.all=50
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
prefetch size的缺省值如下:
- persistent queues (default value: 1000)
- non-persistent queues (default value: 1000)
- persistent topics (default value: 100)
- non-persistent topics (default value: Short.MAX_VALUE -1)
慢消费者会在非持久的topics上导致问题:一旦消息积压起来,会导致broker把大量消息保存在内存中,broker也会因此而变慢。未来ActiveMQ可能会实现磁盘缓存,但是这也还是会存在性能问题。目前ActiveMQ使用Pending Message Limit Strategy来解决这个问题。除了prefetch buffer之外,你还要配置缓存消息的上限,超过这个上限后,新消息到来时会丢弃旧消息。通过在配置文件的destination map中配置PendingMessageLimitStrategy,可以为不用的topic namespace配置不同的策略。目前有以下两种:
- ConstantPendingMessageLimitStrategy。这个策略使用常量限制。
例如:<constantPendingMessageLimitStrategy limit="50"/> - PrefetchRatePendingMessageLimitStrategy。这个策略使用prefetch size的倍数限制。
例如:<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
在以上两种方式中,如果设置0意味着除了prefetch之外不再缓存消息;如果设置-1意味着禁止丢弃消息。
此外,你还可以配置消息的丢弃策略,目前有以下两种:
- oldestMessageEvictionStrategy。这个策略丢弃最旧的消息。
- oldestMessageWithLowestPriorityEvictionStrategy。这个策略丢弃最旧的,而且具有最低优先级的消息。
以下是个ActiveMQ配置文件的例子:
<broker persistent="false" brokerName="${brokername}" xmlns="http://activemq.org/config/1.0"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="PRICES.>"> <!-- 10 seconds worth --> <subscriptionRecoveryPolicy> <timedSubscriptionRecoveryPolicy recoverDuration="10000" /> </subscriptionRecoveryPolicy> <!-- lets force old messages to be discarded for slow consumers --> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="10"/> </pendingMessageLimitStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> ... </broker>
2.6.5 Composite Destinations
从1.1版本起, ActiveMQ支持composite destinations。它允许用一个虚拟的destination 代表多个destinations。例如你可以通过composite destinations在一个操作中同时向12个queue发送消息。在composite destinations中,多个destination之间采用","分割。例如:
Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C");
如果你希望使用不同类型的destination,那么需要加上前缀如queue:// 或topic://,例如:
Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");
以下是ActiveMQ配置文件进行配置的一个例子:
<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <compositeQueue name="MY.QUEUE"> <forwardTo> <queue physicalName="FOO" /> <topic physicalName="BAR" /> </forwardTo> </compositeQueue> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors>
可以在转发前,先通过JMS Selector判断一个消息是否需要转发,例如:
<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <compositeQueue name="MY.QUEUE"> <forwardTo> <filteredDestination selector="odd = 'yes'" queue="FOO"/> <filteredDestination selector="i = 5" topic="BAR"/> </forwardTo> </compositeQueue> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors>
2.6.6 Mirrored Queues
每个queue中的消息只能被一个consumer消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用Virtual Destinations 来建立一个virtual queue 来把消息转发到多个queues中。但是 为系统中每个queue都进行如此的配置可能会很麻烦。
ActiveMQ支持Mirrored Queues。Broker会把发送到某个queue的所有消息转发到一个名称类似的topic,因此监控程序可以订阅这个mirrored queue topic。为了启用Mirrored Queues,首先要将BrokerService的useMirroredQueues属性设置成true,然后可以通过destinationInterceptors设置其它属性,如mirror topic的前缀,缺省是"VirtualTopic.Mirror."。以下是ActiveMQ配置文件的一个例子:
<broker xmlns="http://activemq.org/config/1.0" brokerName="MirroredQueuesBroker1" useMirroredQueues="true"> <transportConnectors> <transportConnector uri="tcp://localhost:61616"/> </transportConnectors> <destinationInterceptors> <mirroredQueue copyMessage = "true" prefix="Mirror.Topic"/> </destinationInterceptors> ... </broker>
假如某个producer向名为Foo.Bar的queue中发送消息,那么你可以通过订阅名为Mirror.Topic.Foo.Bar的topic来获得发送到Foo.Bar中的所有消息。
评论
<policyEntry queue="SNCA_JMS_MSG_queue.>" memoryLimit="1mb">
<messageEvictionStrategy>
<oldestMessageEvictionStrategy />
</messageEvictionStrategy>
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="500" />
</pendingMessageLimitStrategy>
</policyEntry>
不起作用,指点一下。
发表评论
-
Terracotta in Action (3)
2009-03-25 09:20 50903 Inside Terracotta 3.1 Core T ... -
Terracotta in Action (2)
2009-03-21 21:09 46192. Terracotta Eclipse Plugin ... -
Terracotta in Action (1)
2009-03-19 21:52 62441. Introduction Terraco ... -
OpenEJB (4)
2008-05-11 09:05 3137本文部分内容节选自Enterprise JavaBeans 3 ... -
OpenEJB (3)
2008-05-11 09:04 2691本文部分内容节选自Enterprise JavaBeans 3 ... -
OpenEJB (2)
2008-05-11 09:03 3284本文部分内容节选自Enterprise JavaBeans 3 ... -
OpenEJB (1)
2008-05-10 22:39 5069本文部分内容节选自Enterprise JavaBeans 3 ... -
OpenJPA (7)
2008-03-25 21:56 351610 Miscellaneous Features 10 ... -
OpenJPA (6)
2008-03-23 21:33 62558 Object Locking 8.1 Configu ... -
OpenJPA (5)
2008-03-18 22:38 49607 Inheritance 对象使用引用以便关联到其 ... -
OpenJPA (4)
2008-03-11 23:27 70406 Query 6.1 JPQL Queries 6.1. ... -
OpenJPA (3)
2008-03-09 23:09 51944 EntityManagerFactory 4.1 Ove ... -
OpenJPA (2)
2008-03-05 23:59 72513 Metadata 通过javax.persist ... -
OpenJPA (1)
2008-03-04 23:11 68301 Overview Apache OpenJPA是 ... -
ActiveMQ in Action (7)
2008-02-27 14:33 126332.6.7 Wildcards Wil ... -
ActiveMQ in Action (5)
2008-02-26 00:35 135322.5 Clustering ActiveMQ从多种 ... -
ActiveMQ in Action (4)
2008-02-26 00:21 112122.4 Security ActiveMQ ... -
ActiveMQ in Action (3)
2008-02-26 00:16 105912.3 Persistence 2.3.1 AMQ Mess ... -
ActiveMQ in Action (2)
2008-02-25 23:58 132172.2 Transport ActiveMQ目前支持 ... -
ActiveMQ in Action (1)
2008-02-25 23:18 237851 JMS 在介绍ActiveMQ ...
相关推荐
ActiveMQ In Action及其源码 本来想免费提供下载的,但是发现csdn不能选择0积分下载,坑的很
ActiveMQ in Action pdf英文原版加源代码压缩包。 Apache ActiveMQ in Action is a thorough, practical guide to implementing message-oriented systems in Java using ActiveMQ. The book lays out the core of ...
ActiveMQ in Action的开发文档
这是最新版的《ActiveMQ in Action》包括第7、9、11、13等章节
activeMq in action 使用activeMq开发JMS的简单讲述,activeMq in action 使用activeMq开发JMS的简单讲述
ActiveMQ in Action 英文无水印pdf pdf所有页面使用FoxitReader和PDF-XChangeViewer测试都可以打开 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除
ActiveMQ In Action翻译笔记-更新版2011
activemq in action PDF 英文版 源代码 合二为一,目前好像只有英文版,源代码一起发布的少。我收集到了,分享给大家。 这本书讲得真是不错,英文看起来也不难,希望对你有帮助 。
ActiveMQ In Action的精简版
ActiveMQ 不仅实现了 JMS 规范中定义的所有特性,也额外提供了一些特有且有用的特性。我们会在 3.1 小节详细列说这些特性,并且书中剩余的章节还会继续讨论这些特性。为了演示这些特性,我们创建了两个基于实际业务...
ActiveMQ_in_Action_中文ActiveMQ_in_Action_中文ActiveMQ_in_Action_中文ActiveMQ_in_Action_中文
ActiveMQ in Action
经典的学习ActiveMQ的书籍。这本书讲得真是不错,英文看起来也不难,希望对你有帮助 。
ActiveMQ In Action.zip
ActiveMQ_in_Action.pdf 教程文档
activemq in action - activemq in action
ActiveMQ_in_Action_中文.zip
ActiveMQ+In+Action翻译笔记-+更新版2011.pdf
ActionMQ In Action 就是网上可以下载的那个版本, 不知道作者是谁,和英文版的书应该不是一回事。在原版的基础上增加了书签,可以看的更方便。这本书对ActiveMQ入门到一般的管理非常有帮助。