`

消息中间件:ActiveMQ

    博客分类:
  • Java
 
阅读更多

ActiveMQ(Message Queue) 来自apache, 开源的消息总线.

完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,非常快速

官方网站:http://activemq.apache.org/ 

 

一, 特性

  • 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  • 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
  • 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
  • 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
  • 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  • 支持通过JDBC和journal提供高速的消息持久化
  • 从设计上保证了高性能的集群,客户端-服务器,点对点
  • 支持Ajax
  • 支持与Axis的整合
  • 可以很容易得调用内嵌JMS provider,进行测试

二, 什么情况使用

  • 多项目之间集成 
  • 降低系统间模块的耦合度,解耦 
  • 系统前后端隔离

三, 启动

   windows: 双击bin目录下activemq.bat脚本

   linux: ./activemq start    ./activemq stop

 

四, 测试

      ActiveMQ默认连接端口是61616,

     可通过查看该端口的信息可以测试ActiveMQ是否成功启动 netstat -an|find "61616"

 

五, 监控

   ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用。 

 

   admin:http://127.0.0.1:8161/admin/             user/pwd: admin/admin

 

六, 通信方式

  1, publish-subscribe:  发布订阅模式,一对多的关系。

   2, P2P

       在p2p的场景里,相互通信的双方是通过一个类似于队列的方式来进行交流。

       和ub-sub的区别在于一个topic有一个发送者和多个接收者,

         而在p2p里一个queue只有一个发送者和一个接收者。

   3, request-response

      需要双方都能给对方发送消息

  备注: 参考: http://shmilyaw-hotmail-com.iteye.com/blog/1897635

                    https://www.cnblogs.com/zhuxiaojie/p/5564187.html

 

七, 安全配置

activemq/conf/jetty.xml:

   <pre name="code" class="html"> <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
        <property name="name" value="BASIC" />
        <property name="roles" value="admin" />
         <!-- 把这个改为true,当然,高版本的已经改为了true -->
        <property name="authenticate" value="true" />
  </bean>

 找到activemq/conf/activemq.xml,并打开

<plugins>
             <simpleAuthenticationPlugin>
                 <users>
                     <authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>
                 </users>
             </simpleAuthenticationPlugin>
</plugins>

 然后账号密码的配置在activemq/conf/credentials.properties文件中

#账号
activemq.username=admin
#密码
activemq.password=123456
guest.password=password

 

八, 示例

流程:

     1. 获得JMS connection factory. 通过我们提供特定环境的连接信息来构造factory。

     2. 利用factory构造JMS connection

     3. 启动connection

     4. 通过connection创建JMS session.

     5. 指定JMS destination.

     6. 创建JMS producer或者创建JMS message并提供destination.

     7. 创建JMS consumer或注册JMS message listener.

     8. 发送和接收JMS message.

     9. 关闭所有JMS资源,包括connection, session, producer, consumer等。

 

public class Producter {

    //ActiveMq 的默认用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //ActiveMq 的默认登录密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //ActiveMQ 的链接地址
    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    AtomicInteger count = new AtomicInteger(0);
    //链接工厂
    ConnectionFactory connectionFactory;
    //链接对象
    Connection connection;
    //事务管理
    Session session;
    ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();

    public void init(){
        try {
            //创建一个链接工厂
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            //从工厂中创建一个链接
            connection  = connectionFactory.createConnection();
            //开启链接
            connection.start();
            //创建一个事务(这里通过参数可以设置事务的级别)
            session = connection.createSession(true,Session.SESSION_TRANSACTED);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void sendMessage(String disname){
        try {
            //创建一个消息队列
            Queue queue = session.createQueue(disname);
            //消息生产者
            MessageProducer messageProducer = null;
            if(threadLocal.get()!=null){
                messageProducer = threadLocal.get();
            }else{
                messageProducer = session.createProducer(queue);
                threadLocal.set(messageProducer);
            }
           while(true){
                Thread.sleep(1000);
                int num = count.getAndIncrement();
                //创建一条消息
                TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
                        "productor:我是大帅哥,我现在正在生产东西!,count:"+num);
                System.out.println(Thread.currentThread().getName()+
                        "productor:我是大帅哥,我现在正在生产东西!,count:"+num);
                //发送消息
                messageProducer.send(msg);
                //提交事务
                session.commit();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

public class Comsumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    ConnectionFactory connectionFactory;

    Connection connection;

    Session session;

    ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
    AtomicInteger count = new AtomicInteger();

    public void init(){
        try {
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            connection  = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }


    public void getMessage(String disname){
        try {
            Queue queue = session.createQueue(disname);
            MessageConsumer consumer = null;

            if(threadLocal.get()!=null){
                consumer = threadLocal.get();
            }else{
                consumer = session.createConsumer(queue);
                threadLocal.set(consumer);
            }
            while(true){
                Thread.sleep(1000);
                TextMessage msg = (TextMessage) consumer.receive();
                if(msg!=null) {
                    msg.acknowledge();
                    System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

 

分享到:
评论

相关推荐

    实验三 消息中间件应用开发:ActiveMQ实现单线程多队列

    中间件技术 实验三 消息中间件应用开发: - CSDN博客 https://blog.csdn.net/lly1122334/article/details/80139790

    消息中间件应用开发: ActiveMQ实现单 线程多队列-Java代码类资源

    消息中间件应用开发: ActiveMQ实现单 线程多队列-Java代码类资源 中间件技术 消息中间件应用开发

    2019品优购.txt

    前端:angularJS + Bootstrap 后台:SSM( springmvc+spring+mybatis) 数据库:mysql,使用mycat读写分离 开发模式:SOA 服务中间件:dubbox,需要和zookeeper配合使用 注册中心:zookeeper 消息中间件:Activemq,...

    黑马品优购电商项目全套资源

    消息中间件:Activemq,使用弹簧JMS 负载均衡:nginx的的 搜索:Solr中的集群(solrCloud),配合动物园管理员搭建,使用弹簧-数据-索洛 缓存:Redis的的集群,使用弹簧数据redis的的 图片存储:fastDFS集群 | |...

    消息中间件-ActiveMQ.zip

    消息中间件-ActiveMQ

    消息中间件-activeMQ.zip

    消息中间件-activeMQ

    黑马49期全系列包括品优购

    zookeeper 消息中间件:Activemq,使用spring-jms 负载均衡:nginx 搜索:solr集群(solrCloud),配合zookeeper搭建, 使用spring-data-solor 缓存:redis集群,使用spring-data-redis 图片存储:fastDFS集群 网页...

    Springmvc整合Dubbo

    摘要: 开发工具 1.Eclipse IDE:采用Maven项目管理... 技术选型(只列了一部分技术) 1、后端 服务框架:Dubbo、zookeeper、Rest服务 缓存:Redis、ehcache 消息中间件:ActiveMQ 负载均衡:Nginx 分布式文件:FastDFS

    分布式框架简介SSM组合+ springmvc+mybatis+shiro+restful+bootstrap

    开发工具 1.Eclipse IDE:采用Maven项目管理,模块化。 2.代码生成:通过界面方式简单配置,自动生成相应代码,...消息中间件:ActiveMQ 负载均衡:Nginx 分布式文件:FastDFS 数据库连接池:Alibaba Druid 1.0

    消息中间件之ActiveMQ视频课程

    当前使用较多的消息中间件有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ等。本套视频以Apache的ActiveMQ作为切入点,分为基础/实战/面试上中下三大部分,带你从零基础入门到熟练掌握ActiveMQ,能够结合...

    38 4 ActiveMQ消息中间件视频教程

    教程视频:Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件

    消息中间件之ActiveMQ.mmap

    消息中间件是一门极为重要的技术。在项目开发中使用到的频率很高。该技术能够很好的解决我们项目之间数据传输的问题。相比于dubbo等RPC远程调用技术,消息中间件更灵活,能够解耦项目之间的依赖,能够消峰、异步等。...

    Hyperic HQ 系统安装指南

    •消息中间件: ActiveMQ,Weblogic MQ •微软的产品: MS Exchange,MS ActiveDirectory,.NET •虚拟产品: VMWare,Citrix Metaframe •应用平台: LAMP,LAM-J,J2EE,MX4J •其他:网络设备交换机,路由器,...

    java中间件之activemq

    ActiveMQ 是由 Apache 出品的一款开源消息中间件,旨在为应用程序提供高效、可扩展、稳定、安全的企业级消息通信。它的设计目标是提供标准的、面向消息的、多语言的应用集成消息通信中间件。ActiveMQ 实现了 JMS 1.1...

    ActiveMQ消息中间件面试专题.pdf

    Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。

    ActiveMQ.rar

    包括:消息属性、 Advisory Message、延迟和定时消息投递、Blob消息、消息 转换等 n 十二: Consumer高级特性 包括:独有消费者、消息异步分发、消息优先级、管理持久化消息、消息分组、 消息选择器、消息重递策略、...

    基于SSM框架开发的一个博客系统项目+源代码+文档说明

    一、项目主要功能 使用SSM框架开发的一个博客系统,包含的功能大致有: ... ... ... 4.首页展示及分页,主要展示文章内容,可进行搜索,将搜索结果高亮显示 ...5.消息中间件:ActiveMQ 6.搜索引擎:solr 7.富文本

    ActiveMQ消息中间件

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。 下载本压缩包后解压运行里面的activemq.bat即可。 http://192.168.0.61:8161/ 是管理ActiveMQ的后台管理系统入口, 如需在JAVA中使用,请下载本人的 ...

    Java消息中间件ActiveMQ学习资料

    资源内容:ActiveMQ(中文)参考手册;ActiveMQ集群:网络连接模式(network connector)详解;生产者消费者模式实现代码;activemq集群配置文档;

Global site tag (gtag.js) - Google Analytics