一、什么是JMS,为什么需要它
采用异步通信模式:发送消息者可以在发送消息后进行其它的工作,不用等待接收者的回应,而接收者也不必在接到消息后立即对发送者的请求进行处理;
客户和服务对象生命周期的松耦合关系:客户进程和服务对象进程不要求都正常运行,如果由于服务对象崩溃或者网络故障导致客户的请求不可达,客户不会接收到异常,消息中间件能保证消息不会丢失。
整体结构如下:
(1)、ConnectionFactory
package javax.jms; public interface ConnectionFactory { Connection createConnection() throws JMSException;//创建一个连接 Connection createConnection(String userName, String password)//创建一个有密码的连接 throws JMSException; }
(2)、Connection
package javax.jms; public interface Connection { Session createSession(boolean transacted, int acknowledgeMode) throws JMSException; String getClientID() throws JMSException; //唯一客户端ID void setClientID(String clientID) throws JMSException; ConnectionMetaData getMetaData() throws JMSException; ExceptionListener getExceptionListener() throws JMSException; void setExceptionListener(ExceptionListener listener) throws JMSException; void start() throws JMSException; //开启连接 void stop() throws JMSException; //停止连接 void close() throws JMSException; //关闭连接 ConnectionConsumer createConnectionConsumer( Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException; ConnectionConsumer createDurableConnectionConsumer( Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException; }
JMS 客户端到JMS Provider 的连接,可以理解为数据库里的Connection
(3)、Session:一个发送或接收消息的线程
package javax.jms; import java.io.Serializable; public interface Session extends Runnable { static final int AUTO_ACKNOWLEDGE = 1; //四种模式 static final int CLIENT_ACKNOWLEDGE = 2; static final int DUPS_OK_ACKNOWLEDGE = 3; static final int SESSION_TRANSACTED = 0; BytesMessage createBytesMessage() throws JMSException; //创建消息 MapMessage createMapMessage() throws JMSException; Message createMessage() throws JMSException; ObjectMessage createObjectMessage() throws JMSException; ObjectMessage createObjectMessage(Serializable object) throws JMSException; StreamMessage createStreamMessage() throws JMSException; TextMessage createTextMessage() throws JMSException; TextMessage createTextMessage(String text) throws JMSException; boolean getTransacted() throws JMSException; int getAcknowledgeMode() throws JMSException; void commit() throws JMSException; void rollback() throws JMSException; void close() throws JMSException; void recover() throws JMSException; MessageListener getMessageListener() throws JMSException; void setMessageListener(MessageListener listener) throws JMSException; public void run(); MessageProducer createProducer(Destination destination) //创建消息生产者 throws JMSException; MessageConsumer createConsumer(Destination destination) throws JMSException; MessageConsumer createConsumer( Destination destination, java.lang.String messageSelector) throws JMSException; MessageConsumer createConsumer( Destination destination, java.lang.String messageSelector, boolean NoLocal) throws JMSException; Queue createQueue(String queueName) throws JMSException; //创建队列 Topic createTopic(String topicName) throws JMSException; //创建主题 TopicSubscriber createDurableSubscriber(Topic topic, String name)//创建主题订阅者 throws JMSException; TopicSubscriber createDurableSubscriber( Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException; QueueBrowser createBrowser(Queue queue) throws JMSException; QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException; TemporaryQueue createTemporaryQueue() throws JMSException; TemporaryTopic createTemporaryTopic() throws JMSException; void unsubscribe(String name) throws JMSException; }
JMS 消息由以下几部分组成:消息头,属性,消息体。
消息头(header):JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息确定路由。
属性(property):由消息发送者产生,用来添加删除消息头以外的附加信息。
消息体(body):由消息发送者产生,JMS中定义了5种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。
JMS Parent |
PTP Domain |
Pub/Sub Domain |
ConnectionFactory |
QueueConnectionFactory |
TopicConnectionFactory |
Connection |
QueueConnection |
TopicConnection |
Destination |
Queue |
Topic |
Session |
QueueSession |
TopicSession |
MessageProducer |
QueueSender |
TopicPublisher |
MessageConsumer |
QueueReceiver |
TopicSubscriber |
广义上说,一个JMS应用是几个JMS 客户端交换消息,开发JMS客户端应用由以下几步构成
(1)、用JNDI 得到ConnectionFactory对象;
(2)、用JNDI 得到目标队列或主题对象,即Destination对象;
(3)、用ConnectionFactory创建Connection 对象;
(4)、用Connection对象创建一个或多个JMS Session;
(5)、用Session 和Destination 创建MessageProducer和MessageConsumer;
(6)、通知Connection 开始传递消息。
import java.io.*; import javax.jms.*; import javax.naming.*; public class Sender { public static void main(String[] args) { new Sender().send(); } public void send() { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); try { //Prompt for JNDI names System.out.println("Enter ConnectionFactory name:"); String factoryName = reader.readLine(); System.out.println("Enter Destination name:"); String destinationName = reader.readLine(); //Look up administered objects InitialContext initContext = new InitialContext(); ConnectionFactory factory = (ConnectionFactory) initContext.lookup(factoryName); Destination destination = (Destination) initContext.lookup(destinationName); initContext.close(); //Create JMS objects Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer sender = session.createProducer(queue); //Send messages String messageText = null; while (true) { System.out.println("Enter message to send or 'quit':"); messageText = reader.readLine(); if ("quit".equals(messageText)) break; TextMessage message = session.createTextMessage(messageText); sender.send(message); } //Exit System.out.println("Exiting..."); reader.close(); connection.close(); System.out.println("Goodbye!"); } catch (Exception e) { e.printStackTrace(); System.exit(1); } } }消息消费者
import java.io.*; import javax.jms.*; import javax.naming.*; public class Receiver implements MessageListener { private boolean stop = false; public static void main(String[] args) { new Receiver().receive(); } public void receive() { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); try { //Prompt for JNDI names System.out.println("Enter ConnectionFactory name:"); String factoryName = reader.readLine(); System.out.println("Enter Destination name:"); String destinationName = reader.readLine(); reader.close(); //Look up administered objects InitialContext initContext = new InitialContext(); ConnectionFactory factory = (ConnectionFactory) initContext.lookup(factoryName); Destination destination = (Destination) initContext.lookup(destinationName); initContext.close(); //Create JMS objects Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer receiver = session.createConsumer(queue); receiver.setMessageListener(this); connection.start(); //Wait for stop while (!stop) { Thread.sleep(1000); } //Exit System.out.println("Exiting..."); connection.close(); System.out.println("Goodbye!"); } catch (Exception e) { e.printStackTrace(); System.exit(1); } } public void onMessage(Message message) { try { String msgText = ((TextMessage) message).getText(); System.out.println(msgText); if ("stop".equals(msgText)) stop = true; } catch (JMSException e) { e.printStackTrace(); stop = true; } } }
Active MQ是一个基于Apcache 2.0 licenced发布,开放源码的JMS产品。其特点为:
(1)、提供点到点消息模式和发布/订阅消息模式;
(2)、支持JBoss、Geronimo等开源应用服务器,支持Spring框架的消息驱动;
(3)、新增了一个P2P传输层,可以用于创建可靠的P2P JMS网络连接;
(4)、拥有消息持久化、事务、集群支持等JMS基础设施服务。
下篇学习Active MQ
相关推荐
JMS规范详情 AMQP协议详情 RocketMQ RabbitMQ Kafka ActiveMQ ......对比
activemq中间件视频 jms
JMS规范并不要求供应商同时支持这两种消息模型,但开发者应该熟悉这两种消息模型的优势与缺点。 企业消息产品(或者有时称为面向消息的中间件产品)正逐渐成为公司内操作集成的关 键组件。这些产品可以将分离的业务...
JMS1.1规范教程 包含jms基本的接口说明,等
本文来自于csdn,文章分享了分布式消息中间件,主要基于JMS规范、Rocketmq的介绍、部署方式、特性的一些使用几大模块阐述。 今天要给大家分享的是分布式消息中间件。消息中间件主要是实现分布式系统中解耦、异步消息...
Java Message Service(JMS)是SUN提出的旨在统一各种MOM(Message-Oriented Middleware )系统接口的规范,它包含点对点(Point to Point,PTP)和发布/订阅(Publish/Subscribe,pub/sub)两种消息模型,提供可靠...
java消息中间件入门资料,中间件 非底层操作系统软件,非...activeMQ(支持多语言,实现jms1.1,j2ee1.4规范),RabbitMQ(支持更多语言,基于AMQP规范),kafka(高吞吐量,分布式,分区,O(1)磁盘顺序提供消息持久化)
企业消息产品(有时也被称为面向消息的中间件MOMMessageOriented Middleware),正成为一种用来整合公司内部操作的重要组件。它们使得分离的业务组件变成可靠而又灵活性的系统。Java语言编写的客户端以及中间层服务...
ActiveMQ消息中间件是一种在分布式系统中应用程序借以传递消息的媒介。常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。
现今,越来越多的企业面临着各种各样的数据集成和系统整合,CORBA、DCOM、RMI等RPC中间件技术也应运而生,但由于采用RPC同步处理...最后详细分析了SUN及其伙伴公司提出的旨在统一各种消息中间件系统接口的规范(JMS)。
1999年,原来的SUN公司领衔提出了一种面向消息的中间件服务--JMS规范(标准);常用的几种信息交互技术(httpClient、hessian、dubbo、jms、webservice五种).JMS即Java消息服务(JavaMessageService的简称),是...
jms api chm 消息中间件 叫你认识jms规范!
JMS1.1规范是一个来源于现有各种消息系统,用于Java应用系统的消息规范。所有期望在消息系统间进行移植的应用都要使用该规范的接口,同样期望用于J2EE架构的消息系统实现也要遵循本规范。
针对JMS规范和消息中间件的特点,提出面向JMS消息中间件的安全协议MSTP(包括消息传输协议、认证协议、审计协议、分布式信任协议),为解决消息中间件的安全问题提供了一个可选的方案。在具有自主知识产权的JTang...
本文来自于csdn,文章主要介绍了JMS作用,模型,基本构件,消息...简单的说,JMS制定了一个发消息的规范,是一个与具体平台无关的API,绝大多数MOM(面向消息中间件)提供商都对JMS提供支持。ActiveMQ是Apache出品的
ActiveMQ是Apache软件基金会所研发开源的消息中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。抛开网络服务(Network Services)不说,我们应该知道的是ActiveMQ的三个重要组成部分:连接...
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息...ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。