`
liyixing1
  • 浏览: 939806 次
  • 性别: Icon_minigender_1
  • 来自: 江西上饶
社区版块
存档分类
最新评论

jms-spring

    博客分类:
  • jms
阅读更多
同步接收和同步发送的情况(使用JNDI)



异步接收消息




JmsTemplete是用来发送和接收消息的(同步),另外有一个JmsTemplete102类,是支持更老的版本JMS1.02
它可以自动将对象转换成需要的JMS的Message类型。
public class SimpleJMSSender {
public static void main(String[] args) {
try {
ApplicationContext ctx =
new ClassPathXmlApplicationContext("app-context.xml");
JmsTemplate jmsTemplate =
(JmsTemplate)ctx.getBean("jmsTemplate");
//发送消息
jmsTemplate.convertAndSend("This is easy!");
}
}
}



常用方法
send 参数MessageCreator是用来创建消息的
convertAndSend 会自动调用spring的SimpleMessageConverter类进行消息转化。通过参数MessagePostProcessor,可以用来设置属性,头。
receive 接收消息
receiveSelected 根据选择器接收
receiveAndConvert 接收,并转化成对应的java对象。SimpleMessageConverter使用该转化器,通过jmsTemplate的messageConverter属性可以指定转化器实现。


JNDI方式


<!--设置连接属性,通过jndiTemplate可以初始化jndi,已经通过temp获取factory,相当于执行
InitialContext initialContext = new InitialContext();
		ConnectionFactory factory = (ConnectionFactory) initialContext
				.lookup(factoryName);

-->
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<!--ActiveMQInitialContextFactory生产出来的factory应该是同时实现了TopicConnectionFactory和QueueConnectionFactory两个接口的-->
<prop key="java.naming.factory.initial">
org.apache.activemq.jndi.ActiveMQInitialContextFactory</prop>
<prop key="java.naming.provider.url">tcp://localhost:61616</prop>
<prop key="java.naming.security.principal">system</prop>
<prop key="java.naming.security.credentials">manager</prop>
</props>
</property>
</bean>

<!---->
<bean id="jndiQueueConnectionFactory"
class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiTemplate" ref="jndiTemplate"/>
<property name="jndiName" value="QueueCF"/>
</bean>

<!--CachingConnectionFactory类扩展自SingleConnectionFactory,主要用于提供缓存JMS资源功能。具体包括MessageProducer、MessageConsumer和Session的缓存功能。
Spring中发送消息的核心是JmsTemplate,然而Jmstemplate的问题是在每次调用时都要打开/关闭session和producter,效率很低,刚开始的解决方案是PooledConnectionFactory连接池,用于缓存session和producter。然而这还不是最好的。从spring2.5.3版本后,Spring又提供了CachingConnectionFactory,这才是首选的方案。然而CachingConnectionFactory有一个问题必须指出,默认情况下,CachingConnectionFactory只缓存一个session,它声明对于低并发情况下这是足够的。与之相反,PooledConnectionFactory的默认值是500。这些设置,在很多情况下,需要亲自去测试并验证。CachingConnectionFactory的缓存大小,要根据实际情况设置
-->
<bean id="queueConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="jndiQueueConnectionFactory"/>
<property name="sessionCacheSize" value="1"/>
</bean>

<!--目的地解析器,可以简单的认为是对目的地的简单转化功能,如将string转化成queue-->
<bean id="destinationResolver"
class="org.springframework.jms.support.destination.JndiDestinationResolver">
<property name="jndiTemplate" ref="jndiTemplate"/>
<property name="cache" value="true"/>
<property name="fallbackToDynamicDestination" value="false"/>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="queueConnectionFactory"/>
<property name="destinationResolver" ref="destinationResolver"/>
<property name="defaultDestinationName" value="queue1"/>
<property name="pubSubDomain" value="false"/>
</bean>


消息驱动模式
实现MessageListener接口
实现SessionAwareMessageListener 接口
MessageListenerAdapter

SessionAwareMessageListener 和MessageListener的接口很像,但是支持访问session
public interface SessionAwareMessageListener<M extends Message> {

	void onMessage(M message, Session session) throws JMSException;

}


MessageListenerAdapter模式,可以对任何POJO对象使用监听器模式,而不用实现MessageLister接口,在收到消息后,该模式会自动在POJO中查找如下方法
public void handleMessage(String message) {...}
//receive a converted BytesMessage
public void handleMessage(byte[ ] message) {...}
//receive a converted MapMessage
public void handleMessage(Map message) {...}
//receive a converted ObjectMessage
public void handleMessage(Object message) {...}
这种情况下,MessageListenerAdapter已经对Message进行过转化,只能访问消息内容,而要访问消息本身,需要
<bean id="messageListener"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="SimpleJMSReceiver"/>
</constructor-arg>
<!--通过设置消息转化器为null,防止转化。
<property name="messageConverter"><null/></property>
</bean>

并且MessageListenerAdapter在查找处理方法查找的就是
//receive a JMS TextMessage
public void handleMessage(TextMessage message) {...}
//receive a JMS BytesMessage
public void handleMessage(BytesMessage message) {...}
//receive a JMS MapMessage
public void handleMessage(MapMessage message) {...}
//receive a JMS ObjectMessage
public void handleMessage(ObjectMessage message) {...}
//receive a JMS StreamMessage
public void handleMessage(StreamMessage message) {...}

,假如POJO无法找到匹配的方法,就会造成NoSuchMethodException
<bean id="messageListener"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<!--这里可以是任何的JAVABEAN-->
<bean class="SimpleJMSReceiver"/>
</constructor-arg>
</bean>
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="queueConnectionFactory"/>
<property name="destinationResolver" ref="destinationResolver"/>
<property name="concurrentConsumers" value="3" />
<property name="destinationName" value="queue1"/>
<property name="messageListener" ref="messageListener" />
</bean>



如果不想使用handleMessage作为方法名,可以通过设置defaultListenerMethod属性
<bean id="messageListener"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="TradeOrderManager"/>
</constructor-arg>
<property name="defaultListenerMethod" value="createTradeOrder"/>
</bean>


三种方式都有一个监听容器,类似于JmsTemplete,需要创建一个监听容器的上下文bean,容器有两个常用的
DefaultMessageListenerContainer 可以动态调整线程数,支持XA外部事物
SimpleMessageListenerContainer 不可以动态调整线程数

<!--一个MessageListener的实现-->
<bean id="messageListener" class="SimpleJMSReceiver"/>

<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="queueConnectionFactory"/>
<property name="destinationResolver" ref="destinationResolver"/>
<property name="concurrentConsumers" value="3" />
<property name="destinationName" value="queue1"/>
<property name="messageListener" ref="messageListener" />
</bean>


事物管理
Spring为JMS提供了本地事务管理的能力,JMS事务管理器和数据库事务管理器都是PlatformTransactionManager接口的实现类,Spring的org.springframework.jms.connection包提供了用于管理JMS本地事务JmsTransactionManager事务管理器

<bean id="transactionManager"
class="org.springframework.jms.connection.JmsTransactionManager">
       <property name=”connectionFactory" ref="internalJmsQueueConnectionFactory"/>
</bean>

对于那些未基于JmsTemplate编写的JMS处理类,可以让消息监听器容器对它们进行事务管理。DefaultMessageListenerContainer和ServerSessionMessageListenerContainer都支持通过消息监听器使用JMS事务,不过必须为他们提供一个事务管理器,如下配置:
<!— 事务管理器 -->
<bean id="transactionManager"
class="org.springframework.jms.connection.JmsTransactionManager">
       <property name="connectionFactory" ref=" internalJmsQueueConnectionFactory"/>
</bean>


<!—消息监听器容器 -->
<bean id="listenerContainer"
class="org.springframeword.jms.listener.DefaultMessageListenerContainer">
<property name="transactionManager" ref="transactionManager"/>
</bean>



命名空间
常用的命名空间元素是jms:listener和jms:listener-container

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-2.5.xsd">

命名空间前的监听器配置,假设有两个监听器
<bean id="messageListener1"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="SimpleJMSReceiver1"/>
</constructor-arg>
<property name="defaultListenerMethod" value="processRequest"/>
</bean>
<bean id="messageListener2"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="SimpleJMSReceiver2"/>
</constructor-arg>
<property name="defaultListenerMethod" value="processRequest"/>
</bean>
<bean id="jmsContainer1"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="queueConnectionFactory"/>

使用命名空间,对于监听器部分的配置,可以简化为
<bean id="messageListener1" class="SimpleJMSReceiver1"/>
<bean id="messageListener2" class="SimpleJMSReceiver2"/>
<jms:listener-container connection-factory="queueConnectionFactory"
destination-resolver="destinationResolver"
concurrency="3">
<jms:listener destination="queue1" ref="messageListener1" />
<jms:listener destination="queue2" ref="messageListener2" />
</jms:listener-container>
...
  • 大小: 160.6 KB
  • 大小: 154.7 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics