随 着互联网企业业务量的不断扩大,企业信息网络系统的愈加复杂,性能问题也就越来越凸显出来,串行的业务处理方式显然已经成为主要的瓶颈,我们需要更多 异 步的并行处理来提高企业信息系统的业务处理能力,因此独立的消息处理系统也就应运而生,ActiveMQ 就是诸多开源消息系统的佼佼者。对于我们 的技术选型来说,稳定和适应性是最重要的考虑因素,因此由 Apache 组织背景而且支持发布/订阅(Pub/Sub)模式以及异步 Stomp 协 议(Streaming Text Orientated Messaging Protocol)和 REST 方式的 ActiveMQ 就成为 了首选的消息处理器,经测试在异步模式下,整个系统的信息处理吞吐量还是很理想的(一台普通服务器能达到每秒收发 4000 个消息,每个消 息 4K 字节这样的速度)。
下面我们来看看 ActiveMQ 的基本配置与使用,由于我们公司大部分的内部服务都是使用脚本语言写 的,比如 PHP/Python 等,所以我们使用 Stomp 协议来处理;另外,为了尽可能提高消息“生产者”的效率,我们采用了异 步 nio 模式;还有关于并发处理以及负载均衡方面还有很多需要注意的配置和选项,我们后面会另找时间补充。
1、下载 ActiveMQ(最新版本 5.4.1)
2、解压安装到 /usr/local/activemq 目录下
3、安装初始化配置文件 ./bin/activemq setup /etc/default/activemq (保存至默认位置,以后可微调启动参数)
4、编辑 ./conf/activemq.xml 加入如下段(authentication / stomp+nio):
...
<plugins>
<!-- Add By James ; Configure authentication; Username, passwords and groups -->
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="system" password="${activemq.password}" groups="users,admins"/>
<authenticationUser username="user" password="${guest.password}" groups="users"/>
<authenticationUser username="guest" password="${guest.password}" groups="guests"/>
</users>
</simpleAuthenticationPlugin>
</plugins>
...
<!-- Add by James for support stomp protocol -->
<transportConnector name="stomp+nio" uri="stomp+nio://0.0.0.0:61613?transport.closeAsync=false"/>
...
5、使用 Stomp 库对 ActiveMQ 进行收发消息测试(这里不提供 Java 的例子,因为 Google 上已经有太多了,我们以 PHP 作为脚本语言的程序范例)。
a> 以下是 Stomp 协议的简单命令:
* SEND
* SUBSCRIBE
* UNSUBSCRIBE
* BEGIN
* COMMIT
* ABORT
* ACK
* DISCONNECT
b> PHP 实例(transaction / persistent / ack)
关于 Stomp 的 PHP 库可以到 Google Code 上下载,使用以下测试用例的时候可以配合 http://activemq-hostname:8161/admin/ 查看消息的数量和状态。
c> amq_sent.php(生产者示例)
...
// include a library
require_once("Stomp.php");
// make a connection
$con = new Stomp("tcp://192.168.1.11:61613");
// connect
$con->connect("guest", "password");
// begin transaction
$con->begin("tx1");
try {
// build a message by map
require_once("Stomp/Message/Map.php");
$msgBody = array("city"=>"Belgrade", "name"=>"Dejan");
$msgHeader["persistent"] = "true"; // VERY IMPORTANT : Because By default, Stomp produced messages are set to non-persistent.
$msgHeader["transformation"] = "jms-map-json";
$mapMessage = new StompMessageMap($msgBody, $msgHeader);
// send the message to the queue
$con->send("/queue/test", $mapMessage, array("transaction" => "tx1"));
// test rollback logic
// throw new Exception("Sending failed ...");
// commit sending message
$con->commit("tx1");
// print message
echo "Sent message : ";
print_r($msgBody);
} catch (Exception $e) {
// rollback sending
$con->abort("tx1");
// print message
echo $e->getMessage();
}
// disconnect
$con->disconnect();
...
d> amq_recv.php(消费者示例)
...
// include a library
require_once("Stomp.php");
// make a connection
$con = new Stomp("tcp://192.168.1.11:61613");
// connect with authentication
$con->connect("guest", "password");
// set read timeout
$con->setReadTimeout(1);
// subscribe to the queue
$con->subscribe("/queue/test", array("transformation" => "jms-map-json"));
// receive a message from the queue
$msg = $con->readFrame();
// do what you want with the message
if ( $msg != null) {
echo "Received message : ";
print_r($msg);
// mark the message as received in the queue
$con->ack($msg);
} else {
echo "Failed to receive a message/n";
}
// disconnect
$con->disconnect();
...
6、 目前 ActiveMQ 最新版的消息数据是保存在一个快速的文本数据库 kahadb 里面的,使用虽然速度很快但是一旦出错恢复起来总是有很多的问 题。所以我建议大家还是把主流数据库的持久化选项打开,虽然慢一点但是数据至少恢复起来要简 单不少,我们使用 Mysql 来保存持久化的消息数据,数 据库是 activemq,配置在 ./conf/activemq.xml 中,如下:
...
<persistenceFactory>
<journalPersistenceAdapterFactory dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
</persistenceFactory>
...
<!-- MySql DataSource Sample Setup -->
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.1.10:11811/activemq?relaxAutoCommit=true"/>
<property name="username" value="admin"/>
<property name="password" value="ihush2010"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
...
总结&注意事项:
1、Stomp 协议构造的消息默认是非持久化的,如果要让消息持久化必须加上 "persistent:true" 的消息头,这个搞了我半天时间~
2、编程时候还是需要注意“消费者”的运行效率,因为大量“慢消费者”会在非持久的 Topics 上出现问题,特别在多消费者的情况下,容易造成 ActiveMQ 的反应变慢~
3、如果不是使用 Stomp 协议和 ActiveMQ 建立通讯的情况下尽量使用长连接,Connection 的 start() 和 stop() 方法代价很高。
4、如果使用集群 master-slave 也是可以的,但是总是不够稳定,建议还是使用单台服务器的模式。
相关推荐
php 版本 activemq+stomp.php,连接信息源代码+xml解析实例+实例代码demo
java中使用消息中间件ActiveMQ的MQTT协议发布消息使用fusesource,fusesource提供三种方式实现发布消息的方式,分别是阻塞式(BlockingConnection)、回调式(CallbackConnection)和Future样式(FutureConnection)
标签:activemq-stomp-5.10.0.jar,activemq,stomp,5.10.0,jar包下载,依赖包
activeMQ的简单demo,直接导入项目后运行即可使用,非常简单易懂
该demo主要用于activeMQ初学,队列消息监听和Topic消息监听
go语言实现的读取、发送消息到activemq,使用的stomp协议。
一个用Spring+Activemq实现的消息平台
springboot集成activemq实现消息接收demo
标签:activemq-stomp-5.10.0-sources.jar,activemq,stomp,5.10.0,sources,jar包下载,依赖包
前面介绍了php ActiveMQ的安装与使用,这里再来讲述一下php通过stomp协议连接ActiveMQ。 一、安装php的stomp扩展 http://pecl.php.net/package/stomp 如:stomp-2.0.0.tgz > tar xf stomp-1.0.9.tgz > cd stomp-...
activemq消息中间件的使用demo,以及spring集合jms实现消息发送和处理。
网上关于在LINUX下用C连接ACTIVEMQ的资料实在太少了,在自己搜索几天资料后,将这些内容整合在一起,有关ACTIVEMQ,APR,STOMP一起使用的内容,资料还不够详细,只希望给大家带来帮助!
基于Spring+JMS+ActiveMQ+Tomcat,我使用的版本情况如下所示:Spring 3.2.0,ActiveMQ 5.4.3,Tomcat 6.0.43。本例通过详细的说明和注释,实现消息服务的基本功能:发送与接收。Spring对JMS提供了很好的支持,可以...
ActiveMQ Demo程序,包括发送和接收程序,WinForm开发 关于ActiveMQ的介绍,请看我的这篇文章:http://blog.csdn.net/bodybo/article/details/5647968
spring boot activemq集成示例,包含queue和topic消息的发送、接收,连接池的支持。
本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和获取,但是没有自定义监听(当前项目不需要),本文档只有功能实现类 即业务层。若要调用和推送 则需要自己根据需求编写。...
此demo结合了网上纷乱的资源进行讲解。少量操作即可直接跑通,适合初学者进行学习。所需jar包在demo中也都具有。
ActiveMQ允许客户端使用多种协议接收消息,其中WebSocket协议的URL使用ws://开头,默认端口号是61614
ActiveMQ实例Demo