`
hekuilove
  • 浏览: 156523 次
  • 性别: Icon_minigender_1
  • 来自: 魔都
社区版块
存档分类
最新评论

ActiveMQ发送接收TextMessage、BytesMessage

阅读更多
1、TextMessage

发送部分
package org.quinn.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class SendMessage {

	public static void main(String[] args) {
		ConnectionFactory factory; //连接工厂
		Connection connection;//jms连接
		Session session;//发送、接收线程
		Destination destination;//消息目的地
		MessageProducer producer;//消息发送者

		factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,
				"tcp://localhost:61616");

		try {
			connection = factory.createConnection();
			connection.start();
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("FirstQueue");
			producer = session.createProducer(destination);
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  //是否为持久化 ,NON_PERSISTENT非持久化 ,PERSISTENT持久化

			for (int i = 11; i < 13; i++) {
				String msg = "第" + i + "次发送消息";

				TextMessage textMessage = session.createTextMessage(msg);
				producer.send(textMessage);
			}
			session.commit();
		} catch (JMSException e) {
			e.printStackTrace();
		}

	}
}



接收部分
package org.quinn.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;

public class ReceivedMessage {

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		ConnectionFactory factory;
		Connection connection = null;
		Session session;
		Destination destination;
		MessageConsumer consumer;
		factory = new org.apache.activemq.ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
		try {
			connection = factory.createConnection();
			connection.start();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("FirstQueue");
			consumer = session.createConsumer(destination);
			while (true) {
				TextMessage textMessage = (TextMessage) consumer.receive();
				if (textMessage != null){
					System.out.println(textMessage.getText());
				}
				Thread.sleep(1000);
			}
		} catch (JMSException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			try {
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}

}



2、BytesMessage

发送部分
package org.quinn.activemq;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class SendBytesMessage {

	public static void main(String[] args) {
		ConnectionFactory factory; //连接工厂
		Connection connection;//jms连接
		Session session;//发送、接收线程
		Destination destination;//消息目的地
		MessageProducer producer;//消息发送者

		factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,
				"tcp://localhost:61616");

		try {
			connection = factory.createConnection();
			connection.start();
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("FirstQueue");
			producer = session.createProducer(destination);
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

			BytesMessage bytesMessage = session.createBytesMessage();

			File fil = new File("C:\\Users\\WS-SH-L1051\\Desktop\\a.txt");

			InputStream is = new FileInputStream(fil);

			byte by[] = new byte[is.available()];

			is.read(by);
			
			bytesMessage.writeBytes(by);

			producer.send(destination, bytesMessage);
			
			is.close();
			session.commit();
		} catch (JMSException e) {
			e.printStackTrace();
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}

	}
}


接收部分
package org.quinn.activemq;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.command.ActiveMQBytesMessage;

public class ReceivedBytesMessage {

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		ConnectionFactory factory;
		Connection connection = null;
		Session session;
		Destination destination;
		MessageConsumer consumer;
		factory = new org.apache.activemq.ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
		try {
			connection = factory.createConnection();
			connection.start();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("FirstQueue");
			consumer = session.createConsumer(destination);
			while (true) {
				
				Message msg = consumer.receive();
				if(msg instanceof ActiveMQBytesMessage){
					ActiveMQBytesMessage bytesMessage = (ActiveMQBytesMessage) msg;
					if (bytesMessage != null){
						byte []bt = new byte[(int) bytesMessage.getBodyLength()];
						bytesMessage.readBytes(bt);
						File fil =new File("C:\\Users\\WS-SH-L1051\\Desktop\\b.txt");
						if(!fil.exists()){
							fil.createNewFile();
						}
						OutputStream os = new FileOutputStream(fil);
						os.write(bt);
						os.close();
					}
				}
				Thread.sleep(1000);
			}
		} catch (JMSException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			try {
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}

}

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics