公司有新的项目-智慧公交服务平台。要求实现公交GPS定位,在地图上动态显示订阅的公交车行车轨迹、轨迹回放等等一些功能。这就要用到消息推送服务中间 件ActiveMQ。采用UDP的方式推送消息。先简单介绍一下Spring整合ActiveMQ,后续将介绍Spring+activeMQ+Flex 消息推送的实现。
一.消息监听
Spring提供了三种 AbstractMessageListenerContainer 的子类,每种各有其特点。
第一种:SimpleMessageListenerContainer
这个消息侦听容器是三种中最简单的。它在启动时创建固定数量的JMS session并在容器的整个生命周期中使用它们。这个类不能动态的适应运行时的要求或参与消息接收的事务处理。然而它对JMS提供者的要求也最低。它只需要简单的JMS API。
第二种:DefaultMessageListenerContainer
这个消息侦听器使用的最多。和 SimpleMessageListenerContainer 相反,这个子类可以动态适应运行时侯的要求,也可以参与事务管理。每个收到的消息都注册到一个XA事务中(如果使用 JtaTransactionManager 配置过),这样就可以利用XA事务语义的优势了。这个类在对JMS提供者的低要求和提供包括事务参于等的强大功能上取得了很好的平衡。
第三种:ServerSessionMessageListenerContainer
这个监听器容器利用JMS ServerSessionPool SPI动态管理JMS Session。 使用者各种消息监听器可以获得运行时动态调优功能,但是这也要求JMS提供者支持ServerSessionPool SPI。如果不需要运行时性能调整,请使用 DefaultMessageListenerContainer 或 SimpleMessageListenerContainer。
二.自动将消息转化为Java对象
转化器在很多组件中都是必不缺少的东西。Spring挺过MessageConverter接口提供了对消息转换的支持。
三.代码
1.修改activeMQ conf文件夹下activemq.xml配置文件,加入UDP传输方式(紫的部分)
[html]
<!-- Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
- <!-- START SNIPPET: example
-->
- <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
- <!-- Allows us to use system properties as variables in this configuration file
-->
- <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
- <property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
- <!-- Allows log searching in hawtio console
-->
<bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop" />
- <!-- The <broker> element is used to configure the ActiveMQ broker.
-->
- <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
- <destinationPolicy>
- <policyMap>
- <policyEntries>
- <policyEntry topic=">">
- <!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
-->
- <pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000" />
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
- <!-- The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
- <managementContext>
<managementContext createConnector="false" />
</managementContext>
- <!-- Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
- <persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb" />
</persistenceAdapter>
- <!-- The systemUsage controls the maximum amount of space the broker will
use before disabling caching and/or slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
- <systemUsage>
- <systemUsage>
- <memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
- <storeUsage>
<storeUsage limit="100 gb" />
</storeUsage>
- <tempUsage>
<tempUsage limit="50 gb" />
</tempUsage>
</systemUsage>
</systemUsage>
- <!-- The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
- <transportConnectors>
- <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB
-->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<strong><span style="color:#cc33cc;"><transportConnector name="udp" uri="udp://0.0.0.0:8123" /> </span></strong>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
</transportConnectors>
- <!-- destroy the spring context on shutdown to stop jetty
-->
- <shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
- <!-- Enable web consoles, REST and Ajax APIs and demos
The web consoles requires by default login, you can disable this in the jetty.xml file
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml" />
</beans>
- <!-- END SNIPPET: example
-->
2. applicationContext.xml
[html]
<?xml version="1.0" encoding="UTF-8" ?>
- <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:flex="http://www.springframework.org/schema/flex" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/flex http://www.springframework.org/schema/flex/spring-flex-1.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd">
- <!-- %%%%%%%%%%%%%%%%%%*********************消息处理 ACTIVEMQ***************************%%%%%%%%%%%%%
-->
- <!-- JMS TOPIC MODEL
-->
- <!-- TOPIC链接工厂
-->
- <bean id="topicSendConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="udp://localhost:8123" />
- <!-- UDP传输方式
-->
- <!-- <property name="brokerURL" value="tcp://localhost:61616" />
-->
- <!-- TCP传输方式
-->
<property name="useAsyncSend" value="true" />
</bean>
- <bean id="topicListenConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="udp://localhost:8123" />
- <!-- UDP传输方式
-->
- <!-- <property name="brokerURL" value="tcp://localhost:61616" />
-->
- <!-- TCP传输方式
-->
</bean>
- <!-- 定义主题
-->
- <bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="normandy.topic" />
</bean>
<bean id="messageConvertForSys" class="com.tech.gps.util.MessageConvertForSys" />
- <!-- TOPIC send jms模板
-->
- <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="topicSendConnectionFactory" />
<property name="defaultDestination" ref="myTopic" />
<property name="messageConverter" ref="messageConvertForSys" />
- <!-- 发送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久
-->
<property name="deliveryMode" value="1" />
<property name="pubSubDomain" value="true" />
- <!-- 开启订阅模式
-->
</bean>
- <!-- 消息发送方
-->
- <bean id="topicSender" class="com.tech.gps.util.MessageSender">
<property name="jmsTemplate" ref="jmsTemplate" />
</bean>
- <!-- 消息接收方
-->
<bean id="topicReceiver" class="com.tech.gps.util.MessageReceiver" />
- <!-- 主题消息监听容器
-->
- <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="topicListenConnectionFactory" />
<property name="pubSubDomain" value="true" />
- <!-- true 订阅模式
-->
<property name="destination" ref="myTopic" />
- <!-- 目的地 myTopic
-->
<property name="subscriptionDurable" value="true" />
- <!-- -这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,知道被这个ID的客户端消费掉
-->
<property name="clientId" value="clientId_1" />
<property name="messageListener" ref="topicReceiver" />
</bean>
- <!-- Servlet
-->
- <bean id="ControlServlet1" class="com.tech.gps.servlet.ControlServlet1">
<property name="topicSender" ref="topicSender" />
</bean>
</beans>
3. web.xml
[html]
<?xml version="1.0" encoding="UTF-8" ?>
- <web-app version="2.4" xmlns="http://java.sun.com/xml/ns/j2ee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd">
- <!-- 加载spring配置文件applicationContext.xml
-->
- <listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
- <!-- 指明spring配置文件在何处
-->
- <context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath*:applicationContext.xml</param-value>
</context-param>
- <servlet>
<servlet-name>ControlServlet1</servlet-name>
<servlet-class>com.tech.gps.servlet.DelegatingServletProxy</servlet-class>
</servlet>
- <servlet-mapping>
<servlet-name>ControlServlet1</servlet-name>
<url-pattern>/ControlServlet1</url-pattern>
</servlet-mapping>
- <welcome-file-list>
<welcome-file>index11.jsp</welcome-file>
</welcome-file-list>
</web-app>
4. 消息发送
[html]
package com.tech.gps.util;
import org.springframework.jms.core.JmsTemplate;
public class MessageSender {
private JmsTemplate jmsTemplate;
public void sendMessage(String msg){
jmsTemplate.convertAndSend(msg);
}
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
}
5. 消息转换
[html]
package com.tech.gps.util;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TopicPublisher;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
public class MessageConvertForSys implements MessageConverter {
public Message toMessage(Object object, Session session)
throws JMSException, MessageConversionException {
System.out.println("sendMessage:"+object.toString());
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setStringProperty("key",object.toString());
return objectMessage;
}
public Object fromMessage(Message message) throws JMSException,
MessageConversionException {
ObjectMessage objectMessage = (ObjectMessage) message;
return objectMessage.getObjectProperty("key");
}
}
6. 消息接收
[html]
package com.tech.gps.util;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
public class MessageReceiver implements MessageListener {
public void onMessage(Message m) {
ObjectMessage om = (ObjectMessage) m;
try {
String key = om.getStringProperty("key");
System.out.println(" ");
System.out.println("receiveMessage:"+key);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
7. servlet控制器
[html]
package com.tech.gps.servlet;
import java.io.IOException;
import java.io.PrintWriter;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.springframework.context.ApplicationContext;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;
import com.tech.gps.util.MessageSender;
public class ControlServlet1 extends HttpServlet {
private MessageSender topicSender;
public MessageSender getTopicSender() {
return topicSender;
}
public void setTopicSender(MessageSender topicSender) {
this.topicSender = topicSender;
}
public void init() throws ServletException {
}
public void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
doPost(request,response);
}
public void doPost(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
request.setCharacterEncoding("utf-8");
for(int i =0;i<10;i++){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
topicSender.sendMessage("坐标:118.36582,37.2569812");
}
}
}
8.Spring整合Servlet
[html]
package com.tech.gps.servlet;
import java.io.IOException;
import javax.servlet.GenericServlet;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;
public class DelegatingServletProxy extends GenericServlet{
private String targetBean;
private Servlet proxy;
public void service(ServletRequest req, ServletResponse res)
throws ServletException, IOException {
proxy.service(req, res);
}
public void init() throws ServletException {
this.targetBean = getServletName();
getServletBean();
proxy.init(getServletConfig());
}
private void getServletBean() {
WebApplicationContext wac = WebApplicationContextUtils.getRequiredWebApplicationContext(getServletContext());
this.proxy = (Servlet) wac.getBean(targetBean);
}
}
9. 输出
sendMessage:坐标:128.36582,32.2569812
receiveMessage:坐标:128.36582,32.2569812
相关推荐
Spring集成ActiveMQ配置
Spring整合ActiveMQ实现队列和主题发布订阅通信、一个完整的DEMO
Spring整合activemq。。。。。。。。。。。。。。。。。。。。
Spring和ActiveMQ整合的完整实例
spring整合activemq的maven工程 activemq+spring+maven project
百度spring整合activemq 发现几乎都只是在xml文件配置固定的消息队列而且太麻烦。并没有根据需求进行动态生成主题和队列。本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和...
spring集成activemq例子demo
博客http://blog.csdn.net/poorCoder_/article/details/61192791的代码。Spring整合ActiveMQ的简单实例
Spring集成ActiveMQ配置
源码为web工程,整合了Spring,SpringMVC、ActiveMQ,启动后,index页面输入消息,控制台输出消息。
Spring整合ActiveMQ实现点对点与主题发布订阅通信的一个DEMO
支持持久化的采用Spring整合activeMQ与quartz的JMS数据同步实例,包含依赖的jar包
Spring整合ActiveMQ超级详细实例,基于Spring+JMS+ActiveMQ+Tomcat,注解的完整实例,包含jar包,可以参考学习
spring集成ActiveMQ的配置过程
这是一个spring整合activeMQ的简单案例,主要是有相关的一些配置文件可以借鉴!