`
不平凡的人
  • 浏览: 33074 次
  • 性别: Icon_minigender_1
  • 来自: 嘉峪关
社区版块
存档分类
最新评论

ActiveMQ+Mysql持久化存储

阅读更多

本文简单介绍ActiveMQ使用Mysql数据库实现消息的持久化存储

 

一、ActiveMQ配置修改

二、代码示例

 

 

一、ActiveMQ配置修改

对于ActiveMQ需要保证消息的可靠性,需要持久化进行存储,默认情况下使用kahadb进行数据的默认持久化存储技术,同时也可以使用leveldb、mysql、oracle

此次,使用mysql对消息进行持久化操作。

 

1、active.xml文件的修改

(1)数据源配置信息

  

        <persistenceAdapter>
		    <!-- 默认使用kahadb进行持久化操作,保证消息的可靠性 -->
            <kahaDB directory="${activemq.data}/kahadb"/> 
			-->
			
		    <!-- 当前使用mysql进行数据的持久化操作 -->
			<jdbcPersistenceAdapter dataSource="#mysql-ds" />
			
        </persistenceAdapter>

 

<!-- 设置mysql数据源的配置信息 -->
	<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
	      <property name="driverClassName" value="com.mysql.jdbc.Driver" />
		  <property name="url" value="jdbc:mysql://localhost:3306/test" />
		  <property name="username" value="root" />
		  <property name="password" value="***"/>
		  <property name="maxActive" value="200" />
		  <property name="poolPreparedStatements" value="true" />
	</bean>

 

 

(2)消息队列中消息的优先级

              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
				
				<!-- 指定队列中消息的优先级  queue 自定义的队列的名称-->
				<policyEntry queue="persistMysql"  prioritizedMessages="true"/>
				
              </policyEntries>

 

消息优先级说明:

消息的优先级有0-9十个级别的优先级,0-4为普通的消息,5-9为加急消息,如果不指定优先级,默认为4。JMS不严格按照这十个优先级发送消息,但必须保证单次加急消息要先于普通消息到达,并不能保证顺序消费机制。

 

 

2、ActiveMQ安转目录lib下需要添加如下jar

mysql-connector-java-5.1.45.jar

commons-dbcp-1.4.jar

commons-pool-1.5.4.jar

 

二、代码示例

(1)生产者

package com.chinasoft.activemqv1;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * ActiveMQ消息的生产者
 * 
 * 存储技术使用Mysql保证数据的可靠性
 * 
 * @author Freedom
 *
 */
public class Producer {

	private ConnectionFactory f = null;
	private Connection c = null;
	private Session session = null;
	private Destination d = null;
	private MessageProducer p = null;

	public Producer() {

		try {

			f = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
					ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616");
			c = f.createConnection(); // 创建连接对象
			// 创建好连接对象后要打开连接
			c.start();

			session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
			d = session.createQueue("persistMysql");
			p = session.createProducer(null); // 发送消息时指定消息的目的地

		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	public void sender() {

		// 创建一个 MapMessage类型的消息
		try {
			MapMessage msg = session.createMapMessage();
			msg.setStringProperty("name", "cc");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤
			msg.setIntProperty("age", 26);
			msg.setIntProperty("salary", 5600);

			MapMessage msg1 = session.createMapMessage();
			msg1.setStringProperty("name", "zs");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤
			msg1.setIntProperty("age", 22);
			msg1.setIntProperty("salary", 4000);

			MapMessage msg2 = session.createMapMessage();
			msg2.setStringProperty("name", "lsi");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤
			msg2.setIntProperty("age", 21);
			msg2.setIntProperty("salary", 9100);

			MapMessage msg3 = session.createMapMessage();
			msg3.setStringProperty("name", "nb");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤
			msg3.setIntProperty("age", 19);
			msg3.setIntProperty("salary", 3600);

			MapMessage msg4 = session.createMapMessage();
			msg4.setStringProperty("name", "ww");// 利用此方法设置的消息,当消费端使用选择器可以对消息进行过滤
			msg4.setIntProperty("age", 27);
			msg4.setIntProperty("salary", 7600);

			// 生产者发送消息
			// 默认情况,数据时需要进行持久化操作,可以指定DeliveryMode不进行初始化操作
			// 2指定消息的优先级,需要active.xml配置 <policyEntity />

			p.send(d, msg1, DeliveryMode.PERSISTENT, 2, 1000 * 60 * 1L);
			p.send(d, msg2);
			p.send(d, msg3);
			p.send(d, msg4);

		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			if (c != null) {
				try {
					c.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}

	}

	public static void main(String[] args) {

		Producer pro = new Producer();
		pro.sender();

	}

}

 

(2)消费者

消费者中使用了消息监听机制,监听MQ上的消息,并处理消息

package com.chinasoft.activemqv1;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * ActiveMQ消费者
 * 
 * @author Freedom
 *
 */
public class Consumer {

	private static final String SELECTOR = "age>21 and salary>4000";

	private ConnectionFactory f = null;
	private Connection c = null;
	private Session session = null;
	private Destination d = null;
	private MessageConsumer mc = null;

	public Consumer() {

		try {

			f = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
					ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616");
			c = f.createConnection();
			c.start();

			session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
			d = session.createQueue("persistMysql");
			mc = session.createConsumer(d, SELECTOR);// 第二个参数为一个selctor选择器用于筛选数据,满足SQL92规范

		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	public void recevice() {

		// 消费端创建一个监听类,监听MQ上消息并读取消息
		try {
			mc.setMessageListener(new MessageListener() {

				@Override
				public void onMessage(Message m) {

					if (m instanceof MapMessage) {

						System.out.println("消费者接受到的消息****  " + m);
					}

				}
			});
		} catch (JMSException e) {
			e.printStackTrace();
		}

	}

	public static void main(String[] args) {
		Consumer c = new Consumer();
		c.recevice();
	}

}

 

执行完成,打开数据库表,会生成三个表


 

xxx_msgs表记录生产者发送的消息信息,如果消费者消费完成,则会清空表中的消息



 
 

 

  • 大小: 67.1 KB
  • 大小: 3.8 KB
  • 大小: 19.9 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics