`
chenhua_1984
  • 浏览: 1234950 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

activemq

阅读更多

1activeMq是apache的一个子项目,支持jms1.1和j2ee1.4,主要实现了一个队列,用于网络间消息传输

 

2下载apache actionmq  

  http://www.apache.org/dyn/closer.cgi?path=%2Factivemq%2Fapache-activemq%2F5.5.0%2Fapache-activemq-5.5.0-bin.zip

 

3下载后的目录

 

 activemq-all-5.5.1.jar

 bin

 conf

 data

 docs

 example

 lib

 LICENSE

 NOTICE

 README.txt

 user-guide.html

 webapps

 WebConsole-README.txt

 

启动 mq,进入 bin目录,运行activemq.bat 文件,可以看到绑定的端口和侦听的端口,启动后访问地址

 

http://localhost:8161/

 

可以查看一些例子和查看控制台。这个时候,MQ的队列服务功能已经开启了。接下来需要编码实现

 

package com.mchz.mq.home;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class SendMessage {
	private static final String url = "tcp://localhost:61616";;
	private static final String QUEUE_NAME = "send.queue";
	protected String expectedBody = "hello,word,172.16.4.106,hello,word,"
			+ "172.16.4.106,hello,word,172.16.4.106,hello,word,172.16.4.106,"
			+ "hello,word,172.16.4.106,hello,word,172.16.4.106,hello,word,"
			+ "172.16.4.106,hello,word,172.16.4.106,hello,word,172.16.4.106,"
			+ "hello,word,172.16.4.106,hello,word,172.16.4.106,hello,word,"
			+ "172.16.4.106,hello,word,172.16.4.106,hello,word,172.16.4.106,"
			+ "hello,word,172.16.4.106,hello,word,172.16.4.106,hello,word,"
			+ "172.16.4.106,hello,word,172.16.4.106,hello,word,172.16.4.106,"
			+ "hello,word,172.16.4.106,hello,word,172.16.4.106,";

	public void sendMessage() throws JMSException {

		Connection connection = null;
		Session session = null;
		try {
			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
					url);
			connection = connectionFactory.createConnection();

			connection.start();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			Destination destination = session.createQueue(QUEUE_NAME);
			MessageProducer producer = session.createProducer(destination);
			TextMessage message = session.createTextMessage(expectedBody);
			message.setStringProperty("ipaddress", "172.16.4.106");
			message.setStringProperty("address", "zj.hz.mc.219");
			producer.send(message);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			session.close();
			connection.close();
		}
	}

	public static void main(String[] args) {

		SendMessage send = new SendMessage();
		long startTime=System.currentTimeMillis();
				
		try {
			for (int i = 0; i < 100000; i++) {
				send.sendMessage();
//				System.out.println(i);
			}
		} catch (JMSException e) {
			e.printStackTrace();
			long endTime=System.currentTimeMillis();
			System.out.println(endTime-startTime);
		}
		long endTime=System.currentTimeMillis();
		System.out.println(endTime-startTime);
	}

}
 

 

 

package com.mchz.mq.home;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Message;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ReceiveMessage {
	private static final String url = "tcp://localhost:61616";
	private static final String QUEUE_NAME = "send.queue";

	public void receiveMessage() {
		Connection connection = null;
		try {
			try {
				ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
						url);
				connection = connectionFactory.createConnection();
			} catch (Exception e) {
				e.printStackTrace();
			}
			connection.start();
			Session session = connection.createSession(false,
					Session.AUTO_ACKNOWLEDGE);
			Destination destination = session.createQueue(QUEUE_NAME);
			MessageConsumer consumer = session.createConsumer(destination);
			consumeMessagesAndClose(connection, session, consumer);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	protected void consumeMessagesAndClose(Connection connection,
			Session session, MessageConsumer consumer) throws JMSException {
		for (int i = 0; i < 16349;i++) {
			Message message = consumer.receive(1000);
			if (message != null) {
				onMessage(message);
			}
		}
		System.out.println("Closing connection");
		consumer.close();
		session.close();
		connection.close();
	}

	public void onMessage(Message message) {
		System.out.println("come in onMessage.....");

		try {
			if (message instanceof TextMessage) {
				TextMessage txtMsg = (TextMessage) message;
				String msg = txtMsg.getText();
				System.out.println("Received: " + msg);
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static void main(String args[]) {
		ReceiveMessage rm = new ReceiveMessage();
		rm.receiveMessage();
	}
}
 

 

一个用与发送消息,一个用于接收消息,先运行发送消息的类SendMessage ,然后查看MQ的控制 http://localhost:8161/admin/queues.jsp

可以看到自己往队列里面放的消息,接下来运行接收的类ReceiveMessage ,在刷新控制台,可以看到消息被接收。

 

另外,MQ还可以把一个队列里面的消息发送到另一个队列。

 

 

 

 

分享到:
评论

相关推荐

    springboot-nettysocketio +netty+activeMq在线客服系统

    springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot...

    spring 整合activemq实现自定义动态消息队列

    百度spring整合activemq 发现几乎都只是在xml文件配置固定的消息队列而且太麻烦。并没有根据需求进行动态生成主题和队列。本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和...

    zabbix 3.4 监控 Activemq 自动发现模板

    用zabbix 自动发现实现activemq 监控pending consumers activemq_scan.sh #!/bin/bash activemq() { MQ_IP=(10.10.11.208:8161) for g in ${MQ_IP[@]} do port=($(curl -uadmin:admin http://${g}/admin/queues.jsp...

    activemq-core-5.7.0-API文档-中文版.zip

    赠送jar包:activemq-core-5.7.0.jar; 赠送原API文档:activemq-core-5.7.0-javadoc.jar; 赠送源代码:activemq-core-5.7.0-sources.jar; 包含翻译后的API文档:activemq-core-5.7.0-javadoc-API文档-中文...

    apache-activemq-5.15.0-bin.tar.7z

    MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 特点: 1、支持多种...

    ActiveMQ高并发处理方案

    ActiveMQ高并发处理方案ActiveMQ高并发处理方案 超级字数补丁超级字数补丁

    activeMQ示例 activeMQ demo,java分布式技术

    本教程旨在帮助activeMQ初学者入门,通过本示例,能完全理解activeMQ的基本概念,为分布式应用打下基础。 本示例中,使用maven管理,完美解决各种依赖问题,不需要自行配置,导入项目等待eclipse自行下载jar包后即可...

    activeMQ收发工具.rar

    activeMQ的测试工具,用于发送和接收activeMQ消息,jar包形式的,安装完jdk之后用java -jar xxx.jar命令运行

    MQ之ActiveMQ.mmap

    自己做的尚硅谷周阳老师ActiveMQ课程脑图,其中自己所用做案例的环境搭建都是基于docker与老师课程不一样。脑图内容涵盖视频的99%的笔记,含有自己编写的代码文件,外加了自己对一些问题的测试与回答。 消息中间件...

    ActiveMQ 之Spring结合实例

    包括1、ActiveMQ java实例 2、ActiveMQ Spring结合实例 3、代码亲测,无问题。 4、资源分5分绝对值 注意:请先安装ActiveMQ 服务。

    activemq-protobuf-1.1-API文档-中文版.zip

    赠送jar包:activemq-protobuf-1.1.jar; 赠送原API文档:activemq-protobuf-1.1-javadoc.jar; 赠送源代码:activemq-protobuf-1.1-sources.jar; 包含翻译后的API文档:activemq-protobuf-1.1-javadoc-API文档-...

    apache-activemq-5.11.2

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的...

    apache-activemq Linux版本

    apache-activemq Linux版本

    ActiveMQ部署方案分析对比

    构建高可用的ActiveMQ系统在生产环境中是非常重要的,单点的ActiveMQ作为企业应用无法满足高可用和集群的需求,所以ActiveMQ提供 了master-slave、broker cluster等多种部署方式,但通过分析多种部署方式之后我认为...

    ActiveMQ in Action pdf英文版+源代码

    ActiveMQ in Action pdf英文原版加源代码压缩包。 Apache ActiveMQ in Action is a thorough, practical guide to implementing message-oriented systems in Java using ActiveMQ. The book lays out the core of ...

    activemq, Apache ActiveMQ镜像.zip

    activemq, Apache ActiveMQ镜像 欢迎来到 Apache ActiveMQis是一个高性能的Apache 2.0许可以消息代理和 JMS 1.1实现。正在启动要帮助你入门,请尝试以下链接:入门http://activemq.apache.org/version-

    Spring集成ActiveMQ配置

    Spring 集 成ActiveMQ 配置 异步RPC框架 Missian ActiveMq-JMS简单实例使用tomcat

    activemq-protobuf-1.1-API文档-中英对照版.zip

    赠送jar包:activemq-protobuf-1.1.jar; 赠送原API文档:activemq-protobuf-1.1-javadoc.jar; 赠送源代码:activemq-protobuf-1.1-sources.jar; 包含翻译后的API文档:activemq-protobuf-1.1-javadoc-API文档-...

    springboot集成activemq实现消息接收demo

    springboot集成activemq实现消息接收demo

Global site tag (gtag.js) - Google Analytics