`

Tomcat7.84 Websocket接消息队列

阅读更多
   前端监控需要和消息队列对接,使用websockt和activeMQ,整个系统采用的spring strusts
结构
   按部就班:
1.以定义好消息队列连接器等配置,针对消息监控不需要转换为对象,直接透传,以前队列监听的是主题队列,增加一个监听,使用simplemessage convertl
 
  <!-- 消息转换器 -->
	<bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"></bean>

  <!-- 前端Websocket消息监听器 -->
	<bean id="webSocketConsumerListener" class="WebSocketConsumerListener"></bean>
	<bean id="taskMessageWebSocketListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<property name="delegate" ref="webSocketConsumerListener"/> 
		<property name="defaultListenerMethod" value="handleMessage" />
		<property name="messageConverter" ref="simpleMessageConverter"/>
	</bean>
	<bean id="taskMessageWebSocketListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="taskConnectionFactory" />
        <property name="destination" ref="taskMessageTopic" />
        <property name="messageListener" ref="taskMessageWebSocketListenerAdapter" />
        <property name="exceptionListener" ref="messageExceptionListener"/>
        <property name="pubSubDomain" value="true" />     <!-- 发布订阅模式 -->
        <!-- <property name="receiveTimeout" value="10000" />  消息接收超时 -->
        <property name="subscriptionDurable" value="true" /> <!-- 持久化订阅者 -->
        <property name="recoveryInterval" value="300000" />
        <property name="durableSubscriptionName" value="durableSubscriptionName_webSocket_task" />
	</bean>


import javax.annotation.Resource;

import org.springframework.context.annotation.Lazy;

/**
 * 任务mom topic 监听器
 * 
 * @author
 *
 */
public class WebSocketConsumerListener {
	
	@Lazy(value=true)
	@Resource
	private SocketServer socketServer;

//	public void setSocketServer(SocketServer socketServer) {
//		this.socketServer = socketServer;
//	}

	public void handleMessage(String message) {
		if (message != null )
			socketServer.send(message);
	}

}



由于tomcat7.84的websocket是基于servlet的,没有办法使用spring管理,在spring容器实例化完成后,才会初始化servlet.
所以在监听里使用了延迟加载,后面介绍如何将websocket server加到spring容器里去

代码如下

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;

import org.apache.catalina.websocket.MessageInbound;
import org.apache.catalina.websocket.StreamInbound;
import org.apache.catalina.websocket.WebSocketServlet;
import org.apache.catalina.websocket.WsOutbound;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.web.context.support.WebApplicationContextUtils;
import org.springframework.web.context.support.XmlWebApplicationContext;

public class SocketServer extends WebSocketServlet {
	private static final long serialVersionUID = 1L;
	public final Set<ChatWebSocket> sessions = new CopyOnWriteArraySet<ChatWebSocket>();

	public static int USERNUMBER = 1;

	@Override
	public void init() throws ServletException {
		super.init();
		// 绑定监听和websocket
		// WebMonitorConsumerListener bean =
		// WebApplicationContextUtils.getWebApplicationContext(getServletContext())
		// .getBean("webMonitorConsumerListener", WebMonitorConsumerListener.class);
		// bean.setSocketServer(this);

		XmlWebApplicationContext webApplicationContext = (XmlWebApplicationContext) WebApplicationContextUtils
				.getWebApplicationContext(getServletContext());
		ConfigurableListableBeanFactory beanFactory = webApplicationContext.getBeanFactory();
		beanFactory.registerSingleton("socketServer", this);

		WebApplicationContextUtils.getWebApplicationContext(getServletContext()).getAutowireCapableBeanFactory()
				.autowireBean(this);
	}

	@Override
	protected StreamInbound createWebSocketInbound(String arg0, HttpServletRequest arg1) {
		return new ChatWebSocket(sessions);
	}

	public void send(String message) {
		for (ChatWebSocket session : sessions) {
			try {
				CharBuffer temp = CharBuffer.wrap(message);
				session.getWsOutbound().writeTextMessage(temp);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	public class ChatWebSocket extends MessageInbound {

		private Set<ChatWebSocket> sessions;

		public ChatWebSocket() {
			sessions = new CopyOnWriteArraySet<ChatWebSocket>();
		}

		public ChatWebSocket(Set<ChatWebSocket> sessions) {
			this.sessions = sessions;
		}

		@Override
		protected void onTextMessage(CharBuffer message) throws IOException {
			// 这里处理的是文本数据
			onMessage(message.toString());
		}

		public void onMessage(String data) {

		}

		@Override
		protected void onOpen(WsOutbound outbound) {
			USERNUMBER++;
			sessions.add(this);
		}

		@Override
		protected void onClose(int status) {
			sessions.remove(this);

		}

		@Override
		protected void onBinaryMessage(ByteBuffer arg0) throws IOException {

		}
	}

}


使用ConfigurableListableBeanFactory 将sever实例注册到spring容器内
ConfigurableListableBeanFactory beanFactory = webApplicationContext.getBeanFactory();
		beanFactory.registerSingleton("socketServer", this);



前端采用标准的websocket代码
if (!window.WebSocket && window.MozWebSocket)
	window.WebSocket=window.MozWebSocket;
if (!window.WebSocket)
	alert("No Support ");
var ws;

$(document).ready(function(){
	startWebSocket();
})

function startWebSocket()
{	
	ws = new WebSocket("ws://" + location.host + "/SocketServer");
    ws.onopen = function(){
    	console.log("success open"); 
    };
	ws.onmessage = function(event)
	{
		console.log("RECEIVE:"+event.data);
		handleData(event.data); 
	};
	ws.onclose = function(event) { 
		console.log('Client notified socket has closed',event); 
	};
}

function handleData(data)
{	
	if(data){
		var obj = JSON.parse(data);
		updateStatus&&updateStatus(obj);//更新方法
	}
}



分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics