为了长时间的存储和管理消息,一般会使用数据库。在Activemq中默认使用的是Derby DB。当然也可更改配置来使用其他的DB。Activemq支持以下这些DB:
- Apache Derby
- Axion
- DB2
- HSQL
- Informix
- MaxDB
- MySQL
- Oracle
- Postgresql
- SQLServer
- Sybase
如果要使用不在上面列表中的DB,可以通过配置SQL语句和JDBC驱动来支持自己的DB。Broker在启动的时候读取配置文件,若在配置文件中指定了特定的JDBC驱动,则会在classpath路径下自动检测配置的JDBC驱动。下面是关于oracle的一个配置:
<beans ...>
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost">
...
<persistenceAdapter>
<jdbcPersistenceAdapter
dataDirectory="${activemq.base}/data"
dataSource="#oracle-ds"/>
</persistenceAdapter>
...
</broker>
<!-- Oracle DataSource Sample Setup -->
<bean id="oracle-ds"
class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB"/>
<property name="username" value="scott"/>
<property name="password" value="tiger"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
</beans>
在InstallDir/conf/activemq-jdbc.xml有一个jdbc配置样例。
在使用JDBC持久化的时候根据是否支持Activemq提供的高效的journal文件分为两种。
1、支持高效日志
在消息消费者能跟上生产者的速度时,journal文件能大大减少需要写入到DB中的消息。举个例子:生产者产生了10000个消息,这10000个消息会保存到journal文件中,但是消费者的速度很快,在journal文件还未同步到DB之前,以消费了9900个消息。那么后面就只需要写入100个消息到DB了。如果消费者不能跟上生产者的速度,journal文件可以使消息以批量的方式写入DB中,JDBC驱动进行DB写入的优化。从而提升了性能。另外,journal文件支持JMS事务的一致性。
下面是一段支持高效日志的JDBC持久化的配置:
<beans ...>
<broker ...>
...
<persistenceFactory>
<journaledJDBC journalLogFiles="5" dataSource="#mysql-ds" />
</persistenceFactory>
...
<broker>
...
<bean id="mysql-ds"
class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
</beans>
journaledJDBC 节点的有以下可配置属性:
属性 | 默认值 | 描述 |
adapter | | 指定持久存储的DB |
createTablesOnStartup | true | 是否在启动的时候创建相应的表 |
dataDirectory | activemq-data | 指定Derby存储数据文件的路径 |
dataSource | #derby | 指定要引用的数据源 |
journalArchiveDirectory | | 指定归档日志文件存储的路径 |
journalLogFiles | 2 | 指定日志文件的数量 |
journalLogFileSize | 20MB | 指定每个日志文件的大小 |
journalThreadPriority | 10 | 指定写入日志的线程的优先级 |
useDatabaseLock | true | 在主从配置的时候是否使用排他锁 |
useJournal | true | 指定是否使用日志 |
2、不使用高效日志
下面是不使用高效日志的配置:
<beans ...>
<broker ...>
...
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#derby-ds" />
</persistenceAdapter>
...
<broker>
...
<bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
<property name="databaseName" value="derbydb"/>
<property name="createDatabase" value="create"/>
</bean>
</beans>
可以看出,使用高效日志的配置使在persistenceFactory节点内的。而不使用高效日志的使配置在persistenceAdapter节点内的。
下面是jdbcPersistenceAdapter 节点可配置的属性:
属性 | 默认值 | 描述 |
adapter | | 指定持久存储的DB |
cleanupPeriod | 300000 | 指定删除DB中已收到确认信息的消息的时间周期(ms) |
createTablesOnStartup | true | 是否在启动的时候创建相应的表 |
databaseLocker | DefaultDatabaseLocker instance | 防止多个broker同时访问DB的锁实例 |
dataDirectory | activemq-data | 指定Derby存储数据文件的路径 |
dataSource | #derby | 指定要引用的数据源 |
lockAcquireSleepInterval | 1000 | 等待获取锁的时间长度(ms) |
lockKeepAlivePeriod | 30000 | 定期向lock表中写入当前时间,指定了锁的存活时间 |
useDatabaseLock | true | 在主从配置的时候是否使用排他锁 |
transactionIsolation | Connection.TRANSACTION_READ_UNCOMMITTED | 指定事务隔离级别 |
如果需要配置的DB不再Activemq默认支持的DB列表中,可以通过配置SQL语句和JDBC持久层来实现。在journaledJDBC节点下配置statements 节点来指定JDBC持久层对各类数据的处理方式。下面是一个样例:
<persistenceFactory>
<journaledJDBC ...>
<statements>
<statements stringIdDataType ="VARCHAR(128)"/>
</statements>
</journaledJDBC>
</persistenceFactory>
statements 节点的各个属性:
属性 | 默认值 | 描述 |
tablePrefix | | 指定创建的表名的前缀 |
messageTableName | ACTIVEMQ_MSGS | 存储消息内容的表名 |
durableSubAcksTableName | ACTIVEMQ_ACKS | 存储持久订阅者的确认消息的表名 |
lockTableName | ACTIVEMQ_LOCK | lock表名 |
binaryDataType | BLOB | 指定存放消息的数据类型 |
containerNameDataType | VARCHAR(250) | 指定存放Destination名称的数据类型 |
msgIdDataType | VARCHAR(250) | 存放消息ID的数据类型 |
sequenceDataType | INTEGER | 存放消息序列ID的数据类型 |
longDataType | BIGINT | DB中存放JAVA中long型的数据类型 |
stringIdDataType | VARCHAR(250) | 存放长字符串的DB数据类型 |
定制自己插入和读取数据的SQL语句,可以设置下面的这些属性
- addMessageStatement
- updateMessageStatement
- removeMessageStatement
- findMessageSequenceIdStatement
- findMessageStatement
- findAllMessagesStatement
- findLastSequenceIdInMsgsStatement
- findLastSequenceIdInAcksStatement
- createDurableSubStatement
- findDurableSubStatement
- findAllDurableSubsStatement
- updateLastAckOfDurableSubStatement
- deleteSubscriptionStatement
- findAllDurableSubMessagesStatement
- findDurableSubMessagesStatement
- findAllDestinationsStatement
- removeAllMessagesStatement
- removeAllSubscriptionsStatement
- deleteOldMessagesStatement
- lockCreateStatement
- lockUpdateStatement
- nextDurableSubscriberMessageStatement
- durableSubscriberMessageCountStatement
- lastAckedDurableSubscriberMessageStatement
- destinationMessageCountStatement
- findNextMessageStatement
- createSchemaStatements
- dropSchemaStatements
设置adapter属性可以设定JDBC适配器存储和访问BLOB字段的方式,可设置的方式有:
- org.activemq.store.jdbc.adapter.BlobJDBCAdapter
- org.activemq.store.jdbc.adapter.BytesJDBCAdapter
- org.activemq.store.jdbc.adapter.DefaultJDBCAdapter
- org.activemq.store.jdbc.adapter.ImageJDBCAdapter
具体需要查看JDBC的驱动数据库的文档。
<broker persistent="true" ...>
...
<persistenceFactory>
<journaledJDBC adapter="org.activemq.store.jdbc.adapter.BlobJDBCAdapter" ... />
</persistenceFactory>
...
</broker>
分享到:
相关推荐
ActiveMQ配置Mysql8为持久化方式所需Jar包
activemq消息持久化所需Jar包,详情请参见博文:http://blog.csdn.net/l1028386804/article/details/68997105
一个订阅通道,支持多个客户端监听,当某个客户端掉线后,再上线的时候可以收到它没有接收到的消息。
spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...
NULL 博文链接:https://showlike.iteye.com/blog/2000117
在网上找了很多的topic持久化的Demo做了很多的测试,现把熟肉呈上。
本人在学习activemq,然后 测试完成的demo, 包含了queue,topic,持久化到mysql,订阅模式,包好用
activemq-5.15.15 JDBC持久化mysql8.0+的activemq.xml
activemq消息持久化所需Jar包,亲测可用https://blog.csdn.net/weixin_42109071/article/details/91349406
该文档详细描述了linux环境下的 Activemq 持久化、集群环境的搭建步骤,以及测试步骤
本篇文章记录centos6下ActiveMQ+Zookeeper消息中间件集群-完整部署过程,讲解十分详细,本人线上实操手册。在此分享出来,希望能帮助到有用到的朋友。
ActiveMQ的持久化(数据库)[归类].pdf
可用于调试MSMQ、RabbitMQ、ActiveMQ三种消息队列 其中MSMQ支持Active、Binary、XML格式(要勾选事务) RabbitMQ支持逐条接发、批量接发、RPC回调模式、新建队列、建立持久化队列、连接测试等功能。
主要介绍了ActiveMQ持久化机制代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
JMS简明教程+JMS规范教程+activemq以及activemq和tomcat的整合+整合实例代码+持久化消息配置以及工程+tomcat服务器的配置+整合需要的lib文件+部署多个tomcat服务器方案等
一、JMS基本概念 二、activemq介绍及安装 1、消息中间件简介 2、activemq 2.1、activemq简介 2.2、activemq下载 2.3、运行activemq服务 2.4、测试 2.5、监控 3、activemq特性 ...5.3 activemq 持久化机制
——学习参考资料:仅用于个人学习使用! 本代码仅作学习交流,切勿用于商业用途,否则后果自负。若涉及侵权,请联系,会尽快处理! 未进行详尽测试,请自行调试!
集成activemq和spring 的消息推送 包含普通队列消息和持久化订阅消息
⒉ 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务) ⒊ 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性 ⒋ 通过了常见J2EE服务器(如 Geronimo,JBoss 4,...
自己写的ActiveMQ简单demo,包括生产者、消费者之间发送消息、持久化到文件和持久化到数据库,期中持久化需要修改activemq.xml文件