`
gtgt1988
  • 浏览: 111541 次
  • 性别: Icon_minigender_1
  • 来自: 合肥
社区版块
存档分类
最新评论

JMS的应用

阅读更多

最近需要使用JMS接收消息,需要在客户端使用一个监听器实时接收消息,监听器代码如下:

public class JmsReceiverListener extends Thread implements	MessageListener, ExceptionListener {
	private final static Logger logger = LoggerFactory.getLogger(JmsReceiverListener.class); 
	// ConnectionFactory :连接工厂,JMS 用它创建连接
		ConnectionFactory connectionFactory;
		// Connection :JMS 客户端到JMS Provider 的连接
		Connection connection = null;
		// Session: 一个发送或接收消息的线程
		Session session;
		// Destination :消息的目的地
		Destination destination;
		//消息接收者
		MessageConsumer consumer;
		String url;

		public JmsReceiverListener(String queuename) {
			ClassPathResource resource = new ClassPathResource("META-INF/res/profile.properties");
			Properties properties;
			try {
				properties = PropertiesLoaderUtils.loadProperties(resource);
				url = properties.getProperty("activemq.url");
				logger.info("activeMq地址:"+url);
			} catch (IOException e1) {
				// TODO Auto-generated catch block
				e1.printStackTrace();
				logger.info("资源文件不存在");
			}
			
		
			String queue=queuename;
			connectionFactory = new ActiveMQConnectionFactory(
					ActiveMQConnection.DEFAULT_USER,
					ActiveMQConnection.DEFAULT_PASSWORD, url);
			// 构造从工厂得到连接对象
			try {
				connection = connectionFactory.createConnection();
				connection.setExceptionListener(this);// 异常处理
				connection.start();// 连接启动
				//如果为true,则队列里面的消息没有被取走,继续存在
				session = connection.createSession(false,
						Session.AUTO_ACKNOWLEDGE);
				// 获取session, 
				destination = session.createQueue(queue);
				consumer = session.createConsumer(destination);
			} catch (JMSException e) {
				System.err.println("Create fail!");
				e.printStackTrace();
			}
		}

		public void run() {
			try {
				consumer.setMessageListener(this);
			} catch (JMSException e) {
				System.err
						.println(" MessageListener failed...");
				e.printStackTrace();
			}
		}

		public void onMessage(Message message) {
		
			try {
				if (message instanceof MapMessage) {
					MapMessage mapMsg = (MapMessage)message;
					if(!CommonUtil.isNull(mapMsg.getString(AlarmTransferConstant.SUBSCRIBE))){
						String  messages=mapMsg.getString(AlarmTransferConstant.SUBSCRIBE);
						logger.info("告警订阅消息: " + messages);
						new AlarmSub().sendMsg(messages);
					}
					if(!CommonUtil.isNull(mapMsg.getString(AlarmTransferConstant.SYNCHRONIZE))){
						String  messages=mapMsg.getString(AlarmTransferConstant.SYNCHRONIZE);
						logger.info("告警同步消息: " + messages);
						new AlarmSyc().sendMsg(messages);
					}
					if(!CommonUtil.isNull(mapMsg.getString(AlarmTransferConstant.QUERY))){
						String  messages=mapMsg.getString(AlarmTransferConstant.QUERY);
						new AlarmQuery().sendMsg(messages);
						logger.info("告警查询消息: " + messages);
					}
//					TextMessage txtMsg = (TextMessage) message;
//					String msg = txtMsg.getText();
//					System.out.println("Received: " + msg);
//					new AlarmSub().sendMsg(msg);
					
				}
			} catch (JMSException e) {
				System.err.println("The process of getting a message failed...");
				e.printStackTrace();
			}
		}
		// 异步消息异常处理
		public void onException(JMSException arg0) {
			System.err.println("JMS异常!");
		}
		//测试程序
		public static void main(String[] args) {
			JmsReceiverListener jrl = new JmsReceiverListener("HelloWorldQueue");
			jrl.start();
		}

}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics