`
7wolfs
  • 浏览: 178148 次
  • 性别: Icon_minigender_1
  • 来自: 济南
社区版块
存档分类
最新评论

ActiveMQ 开发

 
阅读更多
1, 配置activemq.xml时,修改<broker>子节点内容时,子节点需要按照字母顺序排在<broker>中,按照xsd定义要求对activemq.xml内容进行编排。

2, 使用Ajax与ActiveMQ交互时,如果一个浏览器窗口打开多个tab,每个tab使用amq.init初始化了一个amq 对象,那只有一个amq对象会生效。
为了解决这个问题,从ActiveMQ 5.4.2开始, 在调用 amq.init 时,指定 a unique clientId,此时一个浏览器窗口里的多个tab的amq对象就可以共存生效,每个tab里的amq对象都可以独立与broker交互。

3, ActiveMQ利用Ajax使用轮询机制去获取服务器的信息,而不是使用服务器推送方式。
理由参见:Comparison to Pushlets in http://activemq.apache.org/ajax.html

4, 往activeMQ的Ajax请求加入用户验证信息时, 需要修改amq.js中的buildParams方法和init方法.

5, 启动connection的类如果不关闭connection,该类将处于活动状态,与activeMQ也将处于连接状态.

6, MessageConsumer.receive方法用于同步接收消息,如果topic、queue里没有消息,就处于等待状态,一旦获取过一次消息,则该consumer.receive就不再接收消息。

7, ActiveMQ 与 web server集成
7.1 在web.xml中加入如下配置,
<context-param>
		<param-name>brokerURI</param-name>
		<param-value>/WEB-INF/activemq-simple.xml</param-value>
	</context-param>

<listener>
		<listener-class>org.apache.activemq.web.SpringBrokerContextListener</listener-class>
	</listener>



7.2 在项目的lib中加入依赖的jars, 如: spring-web-3.0.3.RELEASE.jar, camel等jar.

8, 控制消息级别的安全过滤
8.1 实现MessageAuthorizationPolicy接口
8.2 在activemq.xml中加入如下配置信息,
<messageAuthorizationPolicy>
<bean class="cn.com.*.messageagent.utils.AuthorizationPolicy" xmlns="http://www.springframework.org/schema/beans"/>
</messageAuthorizationPolicy>


9. 使用jconsole监控ActiveMQ
9.1 修改activemq.xml, 在broker元素中加入 useJmx="true" 属性。
9.2 修改activemq.bat, 加入SUNJMX=-Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
9.3 在dos窗口中输入jconsole,在“远程连接”中输入:service:jmx:rmi:///jndi/rmi://127.0.0.1:1099/jmxrmi,可以在jconsole的控制台中查看ActiveMQ的使用情况。

10. ActiveMQ 4.0 的bug
问题: 对于auto-acknowledge的queue有bug,会从queue重复发送同一条messge给consumer。
解决方案:将OptimizeAcknowledge 设为false。设置的途径有三种:
1) cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.optimizeAcknowledge=true");
2) ((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(true);
3) ((ActiveMQConnection)connection).setOptimizeAcknowledge(true);

11. ActiveMQ 4.0 的JAAS在JDK 1.4下的实现
11.1 首先获取ActiveMQ 的activemq-jaas-4.0-M1.jar (for JDK 1.5),该包不在4.0的发布包里,需要单独下载
11.2 使用retrotranslator将jaas jar转成符合jdk 1.4要求的jar。
java -jar retrotranslator-transformer-1.2.9.jar  -srcjar activemq-jaas-5.5.0.jar -destjar activemq-jaas-5.5.0-for4.jar

11.3 在启动ActiveMQ的启动脚本activemq.bat中加入JAAS配置文件路径
-Djava.security.auth.login.config=login.config

11.4 配置ActiveMQ,修改activemq.xml、login.config、users.properties、groups.properties.
其中对于login.config, 里面的java package要与jaas jar里的package路径一致。如:org.activemq.jaas.PropertiesLoginModule,org.activemq.jaas.properties.user, org.activemq.jaas.properties.group
activemq-domain {
    org.activemq.jaas.PropertiesLoginModule required
        debug=true
        org.activemq.jaas.properties.user="users.properties"
        org.activemq.jaas.properties.group="groups.properties";
};


12. 使用JMX与ActiveMQ 4.0 通信,获取topic、queue等信息
12.1 使用retrotranslator 将activemq-web-5.5.0.jar转成符合jdk 1.4的jar
12.2 引入jar:mx4j-remote-3.0.2.jar,activemq-web-5.5.0.jar-for4.jar,slf4j-log4j12-1.5.11.jar
12.3 在activemq.xml文件中的broker加入 useJmx="true" 属性内容。
12.4 Java程序代码如下:
public static void getStaticsFromAMQ() {
		RemoteJMXBrokerFacade createConnector = new RemoteJMXBrokerFacade();   
        System.setProperty("webconsole.jmx.url", "service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");   
        //System.setProperty("webconsole.jmx.user","controlRole");   
        //System.setProperty("webconsole.jmx.password","abcd1234");   
        SystemPropertiesConfiguration configuration = new SystemPropertiesConfiguration();   
        createConnector.setConfiguration(configuration);   
        try {   
            BrokerViewMBean brokerAdmin = createConnector.getBrokerAdmin();   
//	            String brokerName =(brokerAdmin).getBrokerName();  
//	            System.out.println("BrokerName ="+brokerName );  
            long messages =brokerAdmin.getTotalMessageCount();  
            System.out.println("messages ="+messages );  
            long consumerCount=brokerAdmin.getTotalConsumerCount();  
            System.out.println("consumerCount ="+consumerCount );  
            long dequeueCount=brokerAdmin.getTotalDequeueCount();  
            System.out.println("dequeueCount ="+dequeueCount );  
            long enqueueCount=brokerAdmin.getTotalEnqueueCount();  
            System.out.println("enqueueCount ="+enqueueCount );  
              
//	            System.out.println(((ActiveMQConnection) brokerAdmin).getBrokerName());   
            //获取Topic相关的ObjectName  
            ObjectName[] topicList=brokerAdmin.getTopics();  
            List answer = (List) createConnector.getTopics(); //getManagedObjects(topicList, TopicViewMBean.class);
            for (int i = 0; i < answer.size(); i++) {
            	TopicViewMBean value = (TopicViewMBean) answer.get(i);
            	System.out.println("topic ="+value.getName() + ";enqueued:" + value.getEnqueueCount()
            			+ ";dequeued:"+value.getDequeueCount() + ";queueSize:" + value.getQueueSize());
            }
            
            //获取Queue相关的ObjectName  
//	            ObjectName[] queueList=brokerAdmin.getQueues();
            answer = (List) createConnector.getQueues();
            for (int i = 0; i < answer.size(); i++) {
            	QueueViewMBean value = (QueueViewMBean) answer.get(i);
            	System.out.println("queue ="+value.getName() + ";enqueued:" + value.getEnqueueCount()
            			+ ";dequeued:"+value.getDequeueCount() + ";queueSize:" + value.getQueueSize());
            }
            
            //根据ObjectName创建相关的JMX对象获取相关的信息。  
        } catch (Exception e) {   
            e.printStackTrace();   
        }   
	}



13. 获取queue类型队列中剩余消息条数
public static void getNumOfMsg() throws JMSException, VSJMSException {
		Connection connection = EndPointManager.getInstance().getConnection();
		connection.start();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Queue queue = session.createQueue("QUEUES." + "test");
		QueueBrowser browser = session.createBrowser(queue);

		int maxMessages = 100;
		Enumeration iter = browser.getEnumeration();
		int counter = 0;
        for (; iter.hasMoreElements() && (maxMessages <= 0 || counter < maxMessages); counter++) {
            Message message = (Message) iter.nextElement();
        }
        log.info("count:" + counter);
	}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics