`

ActiveMQ发送与接收消息

阅读更多
package com.yl.common.activemq;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.springframework.jms.support.converter.MessageType;

import com.yl.common.utils.LogUtil;

/**
 * ActiveMq消息相关操作
 * @author huangzp
 * @date 2015-3-30
 */
public class ActiveMqMessage {
	
	/** activemq 消息队列名 */
	public static final String QUEUE_NAME = "im_queue";
	
	/** activemq 主题队列名 */
	public static final String TOPIC_NAME = "im_topic";
	
	private static final String MESSAGE_HEAD = "uid";
	
	/**
	 * 发送消息到activemq
	 * @param connection activemq长连接
	 * @param destinationType 队列消息或主题消息
	 * @param destinationName 消息队列名称
	 * @param msgType 消息类型,如文本
	 * @param msg 消息
	 * @param toUserId 接收的用户ID
	 * @param priority 消息优先级 0-9,数字越大越优先
	 * @return isSuccess 是否发送成功
	 */
	protected boolean send(Connection connection, DestinationType destinationType, MessageType msgType, String msg, String toUserId, Integer priority) {
		
		boolean isSuccess = true;
		
		// Session: 一个发送或接收消息的线程
		Session session = null;
		// Destination :消息的目的地;消息发送给谁.
		Destination destination = null;
		// MessageProducer:消息发送者
		MessageProducer producer = null;

		try {
			// 启动
			connection.start();
			session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			
			if(DestinationType.QUEUE.equals(destinationType)){ //队列消息
				destination = session.createQueue(QUEUE_NAME);
			}else if(DestinationType.TOPIC.equals(destinationType)){ //主题消息
				destination = session.createTopic(TOPIC_NAME);
			}else{
				throw new RuntimeException("DestinationType非法,不是队列消息或主题消息");
			}
			producer = session.createProducer(destination);
			producer.setDeliveryMode(DeliveryMode.PERSISTENT);
			if(null != priority){
				producer.setPriority(priority);
			}
			// 发送消息
			sendMessage(session, producer, msgType, msg, toUserId);
			session.commit();
		} catch (Exception e) {
			LogUtil.log.error(e.getCause(), e);
			isSuccess = false;
		} 
		
		return isSuccess;
	}
	
	private void sendMessage(Session session, MessageProducer producer, MessageType msgType, String msg, String toUserId) throws Exception {
		Message message = null;
		if(MessageType.TEXT.equals(msgType)){ //文本消息
			message = session.createTextMessage(msg);
		}else{ //默认以文本消息处理
			message = session.createTextMessage(msg);
		}
		message.setStringProperty(MESSAGE_HEAD, toUserId);
//		message.setJMSExpiration(3*30*24*60*60*1000); //设置消息过期时间
		producer.send(message);
	}
	
	
	/**
	 * 从activemq接收消息
	 * @param connection
	 * @param destinationType 队列消息或主题消息
	 * @param userId 接收指定用户ID的消息
	 * @param client 当前用户的socket连接
	 * @throws Exception 
	 */
	protected MessageConsumer receveive(Connection connection, DestinationType destinationType, String userId) throws Exception {

		// Destination :消息的目的地;消息发送给谁.
		Destination destination = null;
		// 消费者,消息接收者
		MessageConsumer consumer = null;

		Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE); // 客户端手动签收消息

		if (DestinationType.QUEUE.equals(destinationType)) { // 队列消息
			destination = session.createQueue(QUEUE_NAME);
		} else if (DestinationType.TOPIC.equals(destinationType)) { // 主题消息
			destination = session.createTopic(TOPIC_NAME);
		} else {
			throw new RuntimeException("DestinationType非法,不是队列消息或主题消息");
		}

		String messageSelector = MESSAGE_HEAD + "='" + userId + "'"; // 消息过滤器
		consumer = session.createConsumer(destination, messageSelector);

		return consumer;
	}

}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics