`
sbl2255
  • 浏览: 211836 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

ActiveMQ 初学

    博客分类:
  • jdk
 
阅读更多

直接上代码:

  发送:

 public static void main(String[] args) {  
        // ConnectionFactory :连接工厂,JMS 用它创建连接  
        ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS  
        // Provider 的连接  
        Connection connection = null; // Session: 一个发送或接收消息的线程  
        Session session; // Destination :消息的目的地;消息发送给谁.  
        Destination destination; // MessageProducer:消息发送者  
        MessageProducer producer; // TextMessage message;  
        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar  
        connectionFactory = new ActiveMQConnectionFactory(  
                ActiveMQConnection.DEFAULT_USER,  
                ActiveMQConnection.DEFAULT_PASSWORD, "failover:(tcp://localhost:61616,tcp://10.1.1.123:61616)");  
        try { // 构造从工厂得到连接对象  
            connection = connectionFactory.createConnection();  
            // 启动  
            connection.start();  
            // 获取操作连接  
            session = connection.createSession(Boolean.TRUE,  Session.AUTO_ACKNOWLEDGE);  
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置  
            destination = session.createQueue("FirstQueue");  
            // 得到消息生成者【发送者】  
            producer = session.createProducer(destination);  
            // 设置不持久化,此处学习,实际根据项目决定  
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
            // 构造消息,此处写死,项目就是参数,或者方法获取  
            sendMessage(session, producer);  
            session.commit();  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                if (null != connection)  
                    connection.close();  
            } catch (Throwable ignore) {  
            }  
        }  
    }  
  
    public static void sendMessage(Session session, MessageProducer producer)  
            throws Exception {  
        for (int i = 1; i <= SEND_NUMBER; i++) {  
            TextMessage message = session.createTextMessage("ActiveMq 发送的消息"  
                    + i);  
            // 发送消息到目的地方  
            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);  
            producer.send(message);  
        }  
    }  

  接收:

 public static void main(String[] args) {  
        // ConnectionFactory :连接工厂,JMS 用它创建连接  
        ConnectionFactory connectionFactory;  
        // Connection :JMS 客户端到JMS Provider 的连接  
        Connection connection = null;  
        // Session: 一个发送或接收消息的线程  
        Session session;  
        // Destination :消息的目的地;消息发送给谁.  
        Destination destination;  
        // 消费者,消息接收者  
        MessageConsumer consumer;  
        connectionFactory = new ActiveMQConnectionFactory(  
                ActiveMQConnection.DEFAULT_USER,  
                ActiveMQConnection.DEFAULT_PASSWORD, "failover:(tcp://localhost:61616,tcp://10.1.1.123:61616)");  
        try {  
            // 构造从工厂得到连接对象  
            connection = connectionFactory.createConnection();  
            // 启动  
            connection.start();  
            // 获取操作连接  
            session = connection.createSession(Boolean.FALSE,  Session.AUTO_ACKNOWLEDGE);  
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置  
            destination = session.createQueue("FirstQueue");  
            consumer = session.createConsumer(destination);  
            while (true) {  
                // 设置接收者接收消息的时间,为了便于测试,这里谁定为100s  
            	
                TextMessage message = (TextMessage) consumer.receive(100000);  
                if (null != message) {  
                    System.out.println("收到消息" + message.getText());  
                } else {  
                    break;  
                }  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                if (null != connection)  
                    connection.close();  
            } catch (Throwable ignore) {  
            }  
        }  
    }  

 

基于Spring的ActiveMQ:

  XML文件:

<bean id="topicListenConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://127.0.0.1:61616" />
		<property name="clientID" value="clientId_007" />
	</bean>


<bean id="deviceListener" class="net.coc.activemq.listener.DeviceMessageListener">
		<property name="deviceAdapter" ref="deviceAdapter"></property>
	</bean>

<bean id="deviceTopic" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg value="Assets.Device.Topic" />
	</bean>

<bean
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="topicListenConnectionFactory" />
		<property name="pubSubDomain" value="true" />
		<property name="receiveTimeout" value="10000" />
		<property name="destination" ref="deviceTopic" />
		<property name="subscriptionDurable" value="true" />
		<property name="clientId" value="deviceListenerclientId_007" />
		<property name="durableSubscriptionName" value="deviceListenerclientId_007" />
		<property name="messageListener" ref="deviceListener" />
	</bean>

 

Java:

public class DeviceMessageListener implements MessageListener {

	private DeviceAdapter deviceAdapter;

	public DeviceAdapter getDeviceAdapter() {
		return deviceAdapter;
	}

	public void setDeviceAdapter(DeviceAdapter deviceAdapter) {
		this.deviceAdapter = deviceAdapter;
	}

	@Override
	public void onMessage(Message message) {
		TextMessage textMsg = (TextMessage) message;
		try {
			JSONObject msg = JSONObject.fromObject(textMsg.getText());
			deviceAdapter.execute(msg);
			deviceAdapter.finish();
		} catch (JMSException e) {
			e.printStackTrace();
			throw new RuntimeException(e);
		} catch (Exception e) {
			e.printStackTrace();
			throw new RuntimeException(e);
		}
	}
}

 

 

deviceAdapter 是具体解析消息的处理类,这个需要根据各自的需求自己定制;

 

 

 

分享到:
评论

相关推荐

    activeMQ初学使用demo

    该demo主要用于activeMQ初学,队列消息监听和Topic消息监听

    activeMQ示例 activeMQ demo,java分布式技术

    本教程旨在帮助activeMQ初学者入门,通过本示例,能完全理解activeMQ的基本概念,为分布式应用打下基础。 本示例中,使用maven管理,完美解决各种依赖问题,不需要自行配置,导入项目等待eclipse自行下载jar包后即可...

    消息队列学习(springboot+kafka+activemq)

    基于springboot构建消息队列通信demo,针对kafka、activemq初学者,安装部署好activemq和kafka后,修改application.yml 。启动应用即可测试,可帮助快速了解kafka、activemq 两者在 Queue topic producer consumer ...

    ActiveMQ基础讲解.ppt

    此文档专门针对ActiveMQ初学者,内容清晰易懂,适合所有开发人员用

    ActiveMQ的点对点与发布/订阅模式小demo

    ActiveMQ初学时写的小demo,可以直接导入eclipse运行,压缩包内包含window版的ActiveMQ(内含有ActiveMQ完整包activemq-all-5.12.1.jar)

    ActiveMQ入门

    ActiveMQ是Apache开发的开源消息中间件,相比重量级收费的IBM 的消息中间件MQ, ActiveMQ更适合于初学者!

    ActiveMQ 开发例子

    ActiveMQ 开发例子初学者可以看看

    ActiveMQ配置参考手册

    用于ACtiveMq 配置插件配置使用,配置介绍等,适合初学者

    jms之activeMQ 队列和广播模式例子(主要给初学者提供入门知识)

    NULL 博文链接:https://zhaoshijie.iteye.com/blog/2090954

    activemq与spring整合源代码

    该资源是spring与activemq的基础整合,适合消息中间件的初学者

    Centos 7 服务器Apache-ActiveMQ安装指南

    虽然ActiveMQ目前已经不是开发时的主要消息中间件, 但是对于简单使用JMS的场景而言, ActiveMQ仍然是一个比较成熟、稳定的框架,可以供初学者、小微企业快速上手。 虽然Apache ActiveMQ目前有多种安装方式, 使用...

    activemq demo

    此代码为自己找的一个activemq的小实例,集成spring直接以单元测试的形式展示各个调用情景,初学好帮手

    ActiveMQ多客户端

    使用c# winfrom 做了一个ActiveMQ一个服务端多个客户端的小程序,希望可以帮到初学者

    ActiveMQ生产者

    使用ActiveMQ类使用官方例程编写简单的例子,带有头文件和类库,方便初学者

    ActiveMQ消息中间件面试专题.zip

    适合不同的应聘者:这份面试专题资料既适合有一定经验的ActiveMQ开发人员,也适合初学者和想要了解ActiveMQ的人使用。初学者可以从基础知识开始学习,逐渐掌握更多的知识,而有经验的开发人员可以通过这份面试专题...

    ActiveMQ视频教程

    本资源为百度云盘的链接地址,包括ActiveMQ从入门到精通的视频教程,资料文档及实例代码。适合初学者入门上手。

    ActiveMQ使用SSL加密文件Demo

    此demo结合了网上纷乱的资源进行讲解。少量操作即可直接跑通,适合初学者进行学习。所需jar包在demo中也都具有。

    ActiveMQ单客户端

    使用C# winfrom做了一个ActiveMQ一个服务端对应一个客户端的即时收发消息的小程序,注释写得很详细,希望能帮到初学者。

    ActiveMQ的队列模式

    ActiveMQ的队列模式,初学ActiveMQ的,一个很好的体验、入门,代码注释很清楚,JMS的java使用步骤

    消息队列activemq的实际应用

    这是我学习消息队列的总结而编写的activemq的使用代码,使用SpringBoot集成activemq以及简化使用代码,让初学者更容易接受,同时让高级开发看起来更加舒服

Global site tag (gtag.js) - Google Analytics