`

在jboss4.2.2GA中部署JMS

阅读更多
JMS是Java EE消息服务标准。应用程序可以通过JMS的API,发送/接收获发布/订阅消息。JMS的消息服务有应用服务器实现,不同的服务器实现方式不,部署方式也不同。jboss 4.2 使用了 jbossMQ 来实现消息服务。
警告: 注意,应用程序不能够自行创建物理的消息主题(topic)或队列(queue),它们由应用服务器负责实现。
一、部署物理topic 或 queue
[edit] jboss里默认已经有了一些示例的消息配置, 在jboss/server/default/deploy/jms/jbossmq-destinations-service.xml 文件里。打开这个文件,你会看到里面有很多的mbean,例如:

<mbean code="org.jboss.mq.server.jmx.Queue"
name="jboss.mq.destination:service=Queue,name=testQueue">
    <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>
    <depends optional-attribute-name="SecurityManager">jboss.mq:service=SecurityManager</depends>
    <attribute name="MessageCounterHistoryDayLimit">-1</attribute>
    <attribute name="SecurityConf">
      <security>
        <role name="guest" read="true" write="true"/>
        <role name="publisher" read="true" write="true" create="false"/>
        <role name="noacc" read="false" write="false" create="false"/>
      </security>
    </attribute>
  </mbean>

当code属性的值为org.jboss.mq.server.jmx.Queue时,表示部署一个消息队列,而为 org.jboss.mq.server.jmx.Topic时,则为主题。name属性定义了消息的名称。要部署消息队列或主题,只要把这个文件复制出来放到deploye目录里,并修改消息队列或出题的名称就可以了。这个文件放到deploy/jms目录下也可以被部署。

二、使用mysql来持久化消息
[edit] 默认情况小,jbossMQ会用内存数据库hsqldb来存储消息,如果消息量大,会占用过多内存,如果关机,消息可能会丢失掉。因此大部分情况下,你需要使用其他的数据库来存储消息。在jboss/docs/examples/jms/ 下有示例的消息持久服务的配置文件,你只需要复制一个放到jboss/server/default/deploy/jms/目录下,并删除jboss /server/default/deploy/jms/hsqldb-jdbc2-service.xml就可以了。比如,你可以用mysql- jdbc2-service.xml文件,来使用mysql存储消息。
把mysql-jdbc2-service.xml放到jboss/server/default/deploy/jms/目录下后,打开它,你会看到

<mbean code="org.jboss.mq.pm.jdbc2.PersistenceManager"
name="jboss.mq:service=PersistenceManager">
    <depends optional-attribute-name="ConnectionManager">jboss.jca:service=DataSourceBinding,name=MySqlDS</depends>
    <attribute name="SqlProperties">
      BLOB_TYPE=BYTES_BLOB
      INSERT_TX = INSERT INTO JMS_TRANSACTIONS (TXID) values(?)
      INSERT_MESSAGE = INSERT INTO JMS_MESSAGES (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP) VALUES(?,?,?,?,?)
      SELECT_ALL_UNCOMMITED_TXS = SELECT TXID FROM JMS_TRANSACTIONS
      SELECT_MAX_TX = SELECT MAX(TXID) FROM JMS_MESSAGES
      DELETE_ALL_TX = DELETE FROM JMS_TRANSACTIONS
      SELECT_MESSAGES_IN_DEST = SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE DESTINATION=?
      SELECT_MESSAGE_KEYS_IN_DEST = SELECT MESSAGEID FROM JMS_MESSAGES WHERE DESTINATION=?
      SELECT_MESSAGE = SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?
      MARK_MESSAGE = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?
      UPDATE_MESSAGE = UPDATE JMS_MESSAGES SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?
      UPDATE_MARKED_MESSAGES = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=?
      UPDATE_MARKED_MESSAGES_WITH_TX = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?
      DELETE_MARKED_MESSAGES_WITH_TX = DELETE JMS_MESSAGES FROM JMS_MESSAGES, JMS_TRANSACTIONS WHERE JMS_MESSAGES.TXID = JMS_TRANSACTIONS.TXID AND JMS_MESSAGES.TXOP=?
      DELETE_TX = DELETE FROM JMS_TRANSACTIONS WHERE TXID = ?
      DELETE_MARKED_MESSAGES = DELETE FROM JMS_MESSAGES WHERE TXID=? AND TXOP=?
      DELETE_TEMPORARY_MESSAGES = DELETE FROM JMS_MESSAGES WHERE TXOP='T'
      DELETE_MESSAGE = DELETE FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?
      CREATE_MESSAGE_TABLE = CREATE TABLE JMS_MESSAGES (MESSAGEID INTEGER NOT NULL, DESTINATION VARCHAR(150) NOT NULL, TXID INTEGER, TXOP CHAR(1), MESSAGEBLOB LONGBLOB, PRIMARY KEY (MESSAGEID, DESTINATION))
      CREATE_IDX_MESSAGE_TXOP_TXID = CREATE INDEX JMS_MESSAGES_TXOP_TXID ON JMS_MESSAGES (TXOP, TXID)
      CREATE_IDX_MESSAGE_DESTINATION = CREATE INDEX JMS_MESSAGES_DESTINATION ON JMS_MESSAGES (DESTINATION)
      CREATE_TX_TABLE = CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, PRIMARY KEY (TXID) )
      CREATE_TABLES_ON_STARTUP = TRUE
    </attribute>
    <!-- Uncomment to override the transaction timeout for recovery per queue/subscription, in seconds -->
    <!--attribute name="RecoveryTimeout">0</attribute-->
    <!-- The number of blobs to load at once during message recovery -->
    <attribute name="RecoverMessagesChunk">1</attribute>
  </mbean>

其中 jboss.jca:service=DataSourceBinding,name=MySqlDS 定义了要使用的mysql数据源的名称,默认为 MySqlDS,你可以把它改成自己要使用的名字。

三、使用jbossmq-state.xml来存储认证好授权信息
[edit] 默认的jboss会使用hsqldb来存储认证和授权信息,并不是很好用,使用jbossmq-state.xml文件会更方便。你要做的是
第一步 修改jboss/server/default/conf/login-config.xml,把:

<!-- Security domain for JBossMQ -->
    <application-policy name = "jbossmq">
       <authentication>
          <login-module code = "org.jboss.security.auth.spi.DatabaseServerLoginModule"
             flag = "required">
             <module-option name = "unauthenticatedIdentity">guest</module-option>
             <module-option name = "dsJndiName">java:/DefaultDS</module-option>
             <module-option name = "principalsQuery">SELECT PASSWD FROM JMS_USERS WHERE USERID=?</module-option>
             <module-option name = "rolesQuery">SELECT ROLEID, 'Roles' FROM JMS_ROLES WHERE USERID=?</module-option>
          </login-module>
       </authentication>
    </application-policy>

    <!-- Security domain for JBossMQ when using file-state-service.xml
    <application-policy name = "jbossmq">
       <authentication>
          <login-module code = "org.jboss.mq.sm.file.DynamicLoginModule"
             flag = "required">
             <module-option name = "unauthenticatedIdentity">guest</module-option>
             <module-option name = "sm.objectname">jboss.mq:service=StateManager</module-option>
          </login-module>
       </authentication>
    </application-policy>
    -->

变成:

<!-- Security domain for JBossMQ
    <application-policy name = "jbossmq">
       <authentication>
          <login-module code = "org.jboss.security.auth.spi.DatabaseServerLoginModule"
             flag = "required">
             <module-option name = "unauthenticatedIdentity">guest</module-option>
             <module-option name = "dsJndiName">java:/DefaultDS</module-option>
             <module-option name = "principalsQuery">SELECT PASSWD FROM JMS_USERS WHERE USERID=?</module-option>
             <module-option name = "rolesQuery">SELECT ROLEID, 'Roles' FROM JMS_ROLES WHERE USERID=?</module-option>
          </login-module>
       </authentication>
    </application-policy>
    -->

    <!-- Security domain for JBossMQ when using file-state-service.xml-->
    <application-policy name = "jbossmq">
       <authentication>
          <login-module code = "org.jboss.mq.sm.file.DynamicLoginModule"
             flag = "required">
             <module-option name = "unauthenticatedIdentity">guest</module-option>
             <module-option name = "sm.objectname">jboss.mq:service=StateManager</module-option>
          </login-module>
       </authentication>
    </application-policy>

第二步 复制 jboss/docs/examples/jms/file-state-server.xml 到 jboss/server/default/deploye/jms/ 下
第三步 把jboss/docs/examples/jms/conf/jbossmq-state.xml复制到 jboss/server/default/conf/目录下。
第四步 删除jboss/server/default/deploy/jms/hsqldb-jdbc-state-service.xml
至于jbossmq-state.xml里怎么用,打开看看就知道的,没什么好说的。



那么如何在程序中指定jms客户端的用户名和密码呢?如果你是手动直接连的,用 connectionFactory.openConnection(String username, String password)指定用户名密码。如果是用的MessageDriven Bean的话,用annotation在头上加上@ActivationConfigProperty, 看看例子:

@AutoCreate
@MessageDriven(mappedName="queue/paymentOrder",
activationConfig= {
@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
@ActivationConfigProperty(propertyName="destination", propertyValue="queue/paymentOrder"),
@ActivationConfigProperty(propertyName="user", propertyValue="paymentOrderReceiver"),
@ActivationConfigProperty(propertyName="password", propertyValue="secret")
})
public class PaymentOrderTopicReceiver implements MessageListener {
private static final Log log = LogFactory.getLog(PaymentOrderTopicReceiver.class);

@EJB(name="paymentOrderMessageConsumer")
PaymentOrderMessageConsumer paymentOrderMessageConsumer;

@Resource
private MessageDrivenContext mdc;

public void onMessage(Message msg) {
try {
if (!(msg instanceof ObjectMessage)) {
throw new IllegalArgumentException("accept ObjectMessage only");
}
ObjectMessage message = (ObjectMessage) msg;
OrderEventDTO dto = (OrderEventDTO) message.getObject();

paymentOrderMessageConsumer.consume(dto);
} catch (JMSException e) {
log.error("jms exception while consuming payment jms message", e);
}
}

}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics