`

ActiveMQ

阅读更多

1.JMS介绍 

    JMS源于企业应用对于消息中间件的需求,使应用程序可以通过消息进行异步处理而互不影响。Sun公司和它的合作伙伴设计的JMS API定义了一组公共的应用程序接口和相应语法,使得Java程序能够和其他消息组件进行通信。JMS有四个组成部分:JMS服务提供者、消息管理对象、消息的生产者消费者和消息本身。 
1)JMS服务提供者实现消息队列和通知,同时实现消息管理的API。JMS已经是J2EE API的一部分,J2EE服务器都提供JMS服务。 
2) 消息管理对象提供对消息进行操作的API。JMS API中有两个消息管理对象:创建jms连接使用的工厂(ConnectionFactory)和目的地(Destination),根据消息的消费方式的不同ConnectionFactory可以分为QueueConnectionFactory和TopicConnectionFactory,目的地(Destination)可以分为队列(Queue)和主题(Topic)两种。 
3)消息的生产者和消费者。消息的产生由JMS的客户端完成,JMS服务提供者负责管理这些消息,消息的消费者可以接收消息。消息的生产者可以分为――点对点消息发布者(P2P)和主题消息发布者(TopicPublisher)。所以,消息的消费者分为两类:主题消息的订阅者(TopicSubscriber)和点对点消息的接收者(queue receiver) 
4)消息。消息是服务提供者和客户端之间传递信息所使用的信息单元。JMS消息由以下三部分组成: 
  消息头(header)――JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息确定路由。

       属性(property)――用来添加删除消息头以外的附加信息。 

    1. 应用需要用到的属性; 

    2. 消息头中原有的一些可选属性; 

     3. JMS Provider 需要用到的属性。

  消息体(body)――JMS中定义了5种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。 

2.Messages 通信方式 
上面提到JMS通信方式分为点对点通信和发布/订阅方式 
1)点对点方式(point-to-point) 
   点对点的消息发送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder 发送消息,receive接收消息.具体点就是Sender Client发送Message Queue ,而 receiver Cliernt从Queue中接收消息和"发送消息已接受"到Quere,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行 
2)发布/订阅 方式(publish/subscriber Messaging) 
    发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。 

3.为什么选用ActiveMQ 
   1)ActiveMQ是一个开放源码 
   2)基于Apache 2.0 licenced 发布并实现了JMS 1.1。 
   3)ActiveMQ现在已经和作为很多项目的异步消息通信核心了 
   4)在很多中小型项目中采用ActiveMQ+SPRING+TOMCAT开发模式。 

4.编程模式 
4.1消息产生者向JMS发送消息的步骤 
(1)创建连接使用的工厂类JMS ConnectionFactory 
(2)使用管理对象JMS ConnectionFactory建立连接Connection 
(3)使用连接Connection 建立会话Session 
(4)使用会话Session和管理对象Destination创建消息生产者MessageSender 
(5)使用消息生产者MessageSender发送消息 
4.2消息消费者从JMS接受消息的步骤 
(1)创建连接使用的工厂类JMS ConnectionFactory 
(2)使用管理对象JMS ConnectionFactory建立连接Connection 
(3)使用连接Connection 建立会话Session 
(4)使用会话Session和管理对象Destination创建消息消费者MessageReceiver 
(5)使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver 
消息消费者必须实现了MessageListener接口,需要定义onMessage事件方法。 

5.ActiveMQ运行 
ActiveMQ5.0版本默认启动时,启动了内置的jetty服务器,提供一个demo应用和用于监控ActiveMQ的admin应用。运行%activemq_home%bin/目录下的 activemq.bat , 之后你会看见如下一段话表示启动成功。 
打开http://localhost:8161/admin/queues.jsp ,可以查看相应的queue中是否有消息 


6.SendMessage(用于发送消息) 

Java代码  收藏代码
  1. import javax.jms.Connection;  
  2. import javax.jms.Destination;  
  3. import javax.jms.JMSException;  
  4. import javax.jms.MessageProducer;  
  5. import javax.jms.Session;  
  6. import javax.jms.TextMessage;  
  7. import org.apache.activemq.ActiveMQConnectionFactory;  
  8.   
  9. public class SendMessage {  
  10.  private static final String url ="tcp://localhost:61616";  
  11.  private static final String QUEUE_NAME ="choice.queue";  
  12.  protected String expectedBody = "<hello>world!</hello>";  
  13.  public void sendMessage() throws JMSException{  
  14.   Connection connection =null;  
  15.   try{  
  16.    ActiveMQConnectionFactory connectionFactory =new ActiveMQConnectionFactory(url);  
  17.    connection = (Connection)connectionFactory.createConnection();  
  18.    connection.start();  
  19.    Session session = (Session)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
  20.    Destination destination = session.createQueue(QUEUE_NAME);  
  21.    MessageProducer producer = session.createProducer(destination);  
  22.    TextMessage message = session.createTextMessage(expectedBody);  
  23.    message.setStringProperty("headname""remoteB");  
  24.    producer.send(message);      
  25.   }catch(Exception e){  
  26.    e.printStackTrace();  
  27.   }  
  28.  }  
  29.    
  30.  public static void main(String[] args){  
  31.   SendMessage sndMsg = new SendMessage();  
  32.   try{  
  33.    sndMsg.sendMessage();  
  34.   }catch(Exception ex){  
  35.    System.out.println(ex.toString());  
  36.   }  
  37.  }  
  38. }  




7.ReceiveMessage(用于接收消息) 

Java代码  收藏代码
  1. import java.io.File;  
  2. import java.io.FileInputStream;  
  3. import java.io.FileOutputStream;  
  4. import java.io.IOException;  
  5. import javax.jms.BytesMessage;  
  6. import javax.jms.Connection;  
  7. import javax.jms.Destination;  
  8. import javax.jms.JMSException;  
  9. import javax.jms.Message;  
  10. import javax.jms.MessageConsumer;  
  11. import javax.jms.Session;  
  12. import javax.jms.TextMessage;  
  13. import org.apache.activemq.ActiveMQConnectionFactory;  
  14.   
  15. public class ReceiveMessage {  
  16.  private static final String url = "tcp://localhost:61616";  
  17.  private static final String QUEUE_NAME = "choice.queue";  
  18.  public void receiveMessage() {  
  19.   Connection connection = null;  
  20.   try {  
  21.    try {  
  22.     ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);  
  23.     connection = connectionFactory.createConnection();  
  24.    } catch (Exception e) {  
  25.                 System.out.println(e.toString());  
  26.    }  
  27.    connection.start();  
  28.    Session session = connection.createSession(false,  
  29.      Session.AUTO_ACKNOWLEDGE);  
  30.    Destination destination = session.createQueue(QUEUE_NAME);  
  31.    MessageConsumer consumer = session.createConsumer(destination);  
  32.    consumeMessagesAndClose(connection, session, consumer);  
  33.   } catch (Exception e) {  
  34.     System.out.println(e.toString());  
  35.   }  
  36.  }  
  37.   
  38.  protected void consumeMessagesAndClose(Connection connection,Session session, MessageConsumer consumer)  
  39.  throws JMSException {  
  40.   for (int i = 0; i < 1;) {  
  41.    Message message = consumer.receive(1000);  
  42.    if (message != null) {  
  43.     i++;  
  44.     onMessage(message);  
  45.    }  
  46.   }  
  47.   System.out.println("Closing connection");  
  48.   consumer.close();  
  49.   session.close();  
  50.   connection.close();  
  51.  }  
  52.   
  53.  public void onMessage(Message message) {  
  54.   try {  
  55.    if (message instanceof TextMessage) {  
  56.     TextMessage txtMsg = (TextMessage) message;  
  57.     String msg = txtMsg.getText();  
  58.     System.out.println("Received: " + msg);  
  59.    }  
  60.   } catch (Exception e) {  
  61.    e.printStackTrace();  
  62.   }  
  63.   
  64.  }  
  65.   
  66.  public static void main(String args[]) {  
  67.   ReceiveMessage rm = new ReceiveMessage();  
  68.   rm.receiveMessage();  
  69.  }  
  70.   
  71. }  
分享到:
评论

相关推荐

    springboot-nettysocketio +netty+activeMq在线客服系统

    springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot...

    spring 整合activemq实现自定义动态消息队列

    百度spring整合activemq 发现几乎都只是在xml文件配置固定的消息队列而且太麻烦。并没有根据需求进行动态生成主题和队列。本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和...

    zabbix 3.4 监控 Activemq 自动发现模板

    用zabbix 自动发现实现activemq 监控pending consumers activemq_scan.sh #!/bin/bash activemq() { MQ_IP=(10.10.11.208:8161) for g in ${MQ_IP[@]} do port=($(curl -uadmin:admin http://${g}/admin/queues.jsp...

    activemq-core-5.7.0-API文档-中文版.zip

    赠送jar包:activemq-core-5.7.0.jar; 赠送原API文档:activemq-core-5.7.0-javadoc.jar; 赠送源代码:activemq-core-5.7.0-sources.jar; 包含翻译后的API文档:activemq-core-5.7.0-javadoc-API文档-中文...

    apache-activemq-5.15.0-bin.tar.7z

    MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 特点: 1、支持多种...

    ActiveMQ高并发处理方案

    ActiveMQ高并发处理方案ActiveMQ高并发处理方案 超级字数补丁超级字数补丁

    activeMQ示例 activeMQ demo,java分布式技术

    本教程旨在帮助activeMQ初学者入门,通过本示例,能完全理解activeMQ的基本概念,为分布式应用打下基础。 本示例中,使用maven管理,完美解决各种依赖问题,不需要自行配置,导入项目等待eclipse自行下载jar包后即可...

    activeMQ收发工具.rar

    activeMQ的测试工具,用于发送和接收activeMQ消息,jar包形式的,安装完jdk之后用java -jar xxx.jar命令运行

    MQ之ActiveMQ.mmap

    自己做的尚硅谷周阳老师ActiveMQ课程脑图,其中自己所用做案例的环境搭建都是基于docker与老师课程不一样。脑图内容涵盖视频的99%的笔记,含有自己编写的代码文件,外加了自己对一些问题的测试与回答。 消息中间件...

    ActiveMQ 之Spring结合实例

    包括1、ActiveMQ java实例 2、ActiveMQ Spring结合实例 3、代码亲测,无问题。 4、资源分5分绝对值 注意:请先安装ActiveMQ 服务。

    activemq-protobuf-1.1-API文档-中文版.zip

    赠送jar包:activemq-protobuf-1.1.jar; 赠送原API文档:activemq-protobuf-1.1-javadoc.jar; 赠送源代码:activemq-protobuf-1.1-sources.jar; 包含翻译后的API文档:activemq-protobuf-1.1-javadoc-API文档-...

    apache-activemq-5.11.2

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的...

    apache-activemq Linux版本

    apache-activemq Linux版本

    ActiveMQ部署方案分析对比

    构建高可用的ActiveMQ系统在生产环境中是非常重要的,单点的ActiveMQ作为企业应用无法满足高可用和集群的需求,所以ActiveMQ提供 了master-slave、broker cluster等多种部署方式,但通过分析多种部署方式之后我认为...

    ActiveMQ in Action pdf英文版+源代码

    ActiveMQ in Action pdf英文原版加源代码压缩包。 Apache ActiveMQ in Action is a thorough, practical guide to implementing message-oriented systems in Java using ActiveMQ. The book lays out the core of ...

    activemq, Apache ActiveMQ镜像.zip

    activemq, Apache ActiveMQ镜像 欢迎来到 Apache ActiveMQis是一个高性能的Apache 2.0许可以消息代理和 JMS 1.1实现。正在启动要帮助你入门,请尝试以下链接:入门http://activemq.apache.org/version-

    Spring集成ActiveMQ配置

    Spring 集 成ActiveMQ 配置 异步RPC框架 Missian ActiveMq-JMS简单实例使用tomcat

    activemq-protobuf-1.1-API文档-中英对照版.zip

    赠送jar包:activemq-protobuf-1.1.jar; 赠送原API文档:activemq-protobuf-1.1-javadoc.jar; 赠送源代码:activemq-protobuf-1.1-sources.jar; 包含翻译后的API文档:activemq-protobuf-1.1-javadoc-API文档-...

    springboot集成activemq实现消息接收demo

    springboot集成activemq实现消息接收demo

Global site tag (gtag.js) - Google Analytics