`

ActiveMQ使用之HelloWord

 
阅读更多
一、下载MQ:
    官网http://activemq.apache.org/下载,选择windows版本学习,项目当中使用linux版本进行集群。

二、解压目录介绍
Bin:启动脚本.bin/activemq.bat是启动MQ的脚本
Conf:mq核心配置。
      activemq.xml核心配置
      Jetty:管控界面
      Users.properties:用户名密码
Data:kahaDB文件
Webapps:管控台使用



三、启动MQ,管控台介绍
Home :查看broker相关信息
Queues:队列相关信息
Topics:主题订阅相关信息
Subscribers:订阅者相关信息
Connections:连接信息
Network:集群连接信息
Send:模拟发送MQ信息





----------------------------------------------------------------------------heloword实例----------------------------------------------------------------------------

   1、生产者端

/**
 * Hello world!
 *
 */
public class Sender {
    public static void main( String[] args ) throws Exception
    {
        //建立connectionFactory工厂对象,填入用户名、密码以及要连接的地址,均使用默认即可,默认端口为"tcp://localhost:61616"
    	ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    	
		//通过connectionFactory工厂对象创建connection连接,并且调用connection的start方法开启连接,connection默认是关闭的
    	Connection connection = connectionFactory.createConnection(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD);
    	connection.start();
    	
    	//通过connection对象创建session会话(上下文环境对象),用于接收消息,参一表示是否启用事务,参二表示签收模式,一般我们设置自动签收。
    	Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
    	
    	//通过session创建destination对象,指的是一个客户端用来指定生产消息目标和消费消息的来源的对象。
    	//在p2p模式中,destination被称作Queue即队列,在pub/sub模式中,destination被称作Topic即主题。
    	Destination destination = session.createQueue("first");
    	
    	//通过session对象创建消息的发送/接收对象(生产者/消费者)MessageProducer/MessageConsumer
    	MessageProducer producer = session.createProducer(null);
    	
    	for (int i = 0; i < 50; i++) {
    		//使用JMS规范的TextMessage形式创建数据(通过session对象),并用MessageProducer的send方法发送对象。同理客户端使用receive方法进行接收数据。最后关闭connection连接。
			TextMessage msg = session.createTextMessage("消息内容"+i);
			//一参	目标地址
			//二参	具体数据信息
			//三参	传送数据模式
			//四参	优先级
			//五参	消息过期时间
			producer.send(destination,msg);
		}
    	
    	if(connection !=null){
    		connection.close();
    	}
    	
    	
    }
}



运行消息结果:




2、消费端

public class Receiver {

	public static void main(String[] args) throws Exception {

		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

		Connection connection = connectionFactory.createConnection(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD);
		// Connection connection = connectionFactory.createConnection("fu","fu");

		connection.start();

		Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);

		Destination destination = session.createQueue("first");
		// 通过session对象创建消息的发送/接收对象(生产者/消费者)MessageProducer/MessageConsumer
		MessageConsumer consumer = session.createConsumer(destination);

		while (true) {
			// 使用JMS规范的TextMessage形式创建数据(通过session对象),并用MessageProducer的send方法发送对象。同理客户端使用receive方法进行接收数据。最后关闭connection连接。
			TextMessage msg = (TextMessage) consumer.receive();
			// 手工签收消息
			msg.acknowledge();
			System.out.println("消费数据:" + msg.getText());
		}

	}

}



消费端运行结果:






  • 大小: 29.7 KB
  • 大小: 92.1 KB
  • 大小: 62 KB
  • 大小: 10.1 KB
  • 大小: 64.9 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics