实战篇
本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/Linyufa/archive/2009/07/24/4375670.aspx
前面对JMS概念的作了一个基本介绍,下面我们看一个具体的例子程序
Pub/sub方式的消息传递的例子:
l HelloPublisher.java
package com.jms.test;
import java.util.Hashtable;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
* pub/sub方式的消息发送程序
*/
public class HelloPublisher {
TopicConnection topicConnection;// JMS连接,属于Pub/Sub方式的连接
TopicSession topicSession; //JMS会话,属于Pub/Sub方式的会话
TopicPublisher topicPublisher; //消息发布者
Topic topic; //主题
public HelloPublisher(String factoryJNDI, String topicJNDI)
throws JMSException, NamingException {
Hashtable<String, String> env = new Hashtable<String, String>();
//设置好连接JMS容器的属性,不同的容器需要的属性可能不同,需要查阅相关文档
env.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
env.put(Context.PROVIDER_URL, "localhost:1099");
env.put("java.naming.rmi.security.manager", "yes");
env.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
//创建连接JMS容器的上下文(context)
Context context = new InitialContext(env);
//通过连接工厂的JNDI名查找ConnectionFactory
TopicConnectionFactory topicFactory =
(TopicConnectionFactory) context.lookup(factoryJNDI);
//用连接工厂创建一个JMS连接
topicConnection = topicFactory.createTopicConnection();
//通过JMS连接创建一个Session
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
//通过上下文查找到一个主题(topic)
topic = (Topic) context.lookup(topicJNDI);
//用session来创建一个特定主题的消息发送者
topicPublisher = topicSession.createPublisher(topic);
}
/**
* 发布一条文本消息
* @param msg 待发布的消息
* @throws JMSException
*/
public void publish(String msg) throws JMSException {
//用session来创建一个文本类型的消息
TextMessage message = topicSession.createTextMessage();
message.setText(msg);//设置消息内容
topicPublisher.publish(topic, message);//消息发送,发送到特定主题
}
public void close() throws JMSException {
topicSession.close();//关闭session
topicConnection.close();//关闭连接
}
public static void main(String[] args)
throws JMSException, NamingException {
HelloPublisher publisher =
new HelloPublisher("ConnectionFactory", "topic/testTopic");
try {
for (int i = 1; i < 11; i++) {
String msg = "Hello World no. " + i;
System.out.println("Publishing message: " + msg);
publisher.publish(msg);
}
publisher.close();//session和connection用完之后一定记得关闭
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
程序在控制台输出:
Publishing message: Hello World no. 1
Publishing message: Hello World no. 2
Publishing message: Hello World no. 3
Publishing message: Hello World no. 4
Publishing message: Hello World no. 5
Publishing message: Hello World no. 6
Publishing message: Hello World no. 7
Publishing message: Hello World no. 8
Publishing message: Hello World no. 9
Publishing message: Hello World no. 10
l HelloSubscriber.java
package com.jms.test;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
* pub/sub方式下的消息接收器。注意,这个消息接收器可以与上面的消息发送器可以工作
* 在不同的JVM中,只要保证它们各自能够连通JMS容器(JMS Provider)
*
*/
public class HelloSubscriber implements MessageListener {
TopicConnection topicConnection;
TopicSession topicSession;
TopicSubscriber topicSubscriber;
Topic topic;
public HelloSubscriber(String factoryJNDI, String topicJNDI)
throws JMSException, NamingException {
Hashtable<String, String> env = new Hashtable<String, String>();
//设置好连接JMS容器的属性,不同的容器需要的属性可能不同,需要查阅相关文档
env.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
env.put(Context.PROVIDER_URL, "localhost:1099");
env.put("java.naming.rmi.security.manager", "yes");
env.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
Context context = new InitialContext();
TopicConnectionFactory topicFactory =
(TopicConnectionFactory) context.lookup(factoryJNDI);
//创建连接
topicConnection = topicFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);//创建session
topic = (Topic) context.lookup(topicJNDI);//查找到主题
//用session创建一个特定queue的消息接收者
topicSubscriber = topicSession.createSubscriber(topic);
//注册监听,这里设置的监听是自己,因为本类已经实现了MessageListener接口,
//一旦queueReceiver接收到了消息,就会调用本类的onMessage方法
topicSubscriber.setMessageListener(this);
System.out.println("HelloSubscriber subscribed to topic: "
+ topicJNDI);
topicConnection.start();//启动连接,这时监听器才真正生效
}
public void onMessage(Message msg) {
try {
if (msg instanceof TextMessage) {
//把Message 转型成 TextMessage 并提取消息内容
String msgTxt = ((TextMessage) msg).getText();
System.out.println("HelloSubscriber got message: " +
msgTxt);
}
} catch (JMSException ex) {
System.err.println("Could not get text message: " + ex);
ex.printStackTrace();
}
}
public void close() throws JMSException {
topicSession.close();
topicConnection.close();
}
public static void main(String[] args) {
try {
new HelloSubscriber("TopicConnectionFactory",
"topic/testTopic");
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
程序在控制台输出:
HelloSubscriber subscribed to topic: topic/testTopic
HelloSubscriber got message: Hello World no. 1
HelloSubscriber got message: Hello World no. 2
HelloSubscriber got message: Hello World no. 3
HelloSubscriber got message: Hello World no. 4
HelloSubscriber got message: Hello World no. 5
HelloSubscriber got message: Hello World no. 6
HelloSubscriber got message: Hello World no. 7
HelloSubscriber got message: Hello World no. 8
HelloSubscriber got message: Hello World no. 9
HelloSubscriber got message: Hello World no. 10
P2P方式下的消息传递
l HelloQueue.java
package com.jms.test;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.QueueSender;
import javax.jms.Queue;
import javax.jms.TextMessage;
import javax.jms.Session;
import javax.jms.JMSException;
import java.util.Hashtable;
public class HelloQueue {
QueueConnection queueConnection; //queue方式的JMS连接
QueueSession queueSession; //queue会话
QueueSender queueSender; //queue消息发送者
Queue queue; //消息队列
public HelloQueue(String factoryJNDI, String topicJNDI)
throws JMSException, NamingException {
//连接JMS Provider的环境参数
Hashtable<String, String> props = new Hashtable<String, String>();
props.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
//JMS provider的主机和端口
props.put(Context.PROVIDER_URL, "localhost:1099");
props.put("java.naming.rmi.security.manager", "yes");
props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
Context context = new InitialContext(props);
//lookup到连接工厂
QueueConnectionFactory queueFactory =
(QueueConnectionFactory) context.lookup(factoryJNDI);
queueConnection = queueFactory.createQueueConnection();//创建连接
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);//创建会话
queue = (Queue) context.lookup(topicJNDI);//lookup到特定的消息队列
queueSender = queueSession.createSender(queue);//创建队列消息的发送者
}
public void send(String msg) throws JMSException {
TextMessage message = queueSession.createTextMessage();
message.setText(msg);
queueSender.send(queue, message);
}
public void close() throws JMSException {
queueSession.close();
queueConnection.close();
}
public static void main(String[] args) {
try {
HelloQueue queue = new HelloQueue("ConnectionFactory",
"queue/testQueue");
for (int i = 11; i < 21; i++) {
String msg = "Hello World no. " + i;
System.out.println("Hello Queue Publishing message: " + msg);
queue.send(msg);
}
queue.close();
} catch (Exception ex) {
System.err.println("An exception occurred " +
"while testing HelloPublisher25: " + ex);
ex.printStackTrace();
}
}
}
程序在控制台输出:
Hello Queue Publishing message: " Hello World no. 11
Hello Queue Publishing message: " Hello World no. 12
Hello Queue Publishing message: " Hello World no. 13
Hello Queue Publishing message: " Hello World no. 14
Hello Queue Publishing message: " Hello World no. 15
Hello Queue Publishing message: " Hello World no. 16
Hello Queue Publishing message: " Hello World no. 17
Hello Queue Publishing message: " Hello World no. 18
Hello Queue Publishing message: " Hello World no. 19
Hello Queue Publishing message: " Hello World no. 20
l HelloRecvQueue.java
package com.jms.test;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.QueueReceiver;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class HelloRecvQueue implements MessageListener {
QueueConnection queueConnection;
QueueSession queueSession;
QueueReceiver queueReceiver;
Queue queue;
public HelloRecvQueue(String factoryJNDI, String topicJNDI)
throws JMSException, NamingException {
Context context = new InitialContext();
QueueConnectionFactory queueFactory =
(QueueConnectionFactory) context.lookup(factoryJNDI);
queueConnection = queueFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
queue = (Queue) context.lookup(topicJNDI);
queueReceiver = queueSession.createReceiver(queue);
queueReceiver.setMessageListener(this);
System.out.println("HelloReceQueue receiver to queue: " + topicJNDI);
queueConnection.start();
}
public void onMessage(Message m) {
try {
String msg = ((TextMessage) m).getText();
System.out.println("HelloReceQueue got message: " + msg);
} catch (JMSException ex) {
System.err.println("Could not get text message: " + ex);
ex.printStackTrace();
}
}
public void close() throws JMSException {
queueSession.close();
queueConnection.close();
}
Public ovid main(String[] args) {
new HelloRecvQueue();
}
}
程序在控制台输出:
HelloReceQueue got message: Hello World no. 11
HelloReceQueue got message: Hello World no. 12
HelloReceQueue got message: Hello World no. 13
HelloReceQueue got message: Hello World no. 14
HelloReceQueue got message: Hello World no. 15
HelloReceQueue got message: Hello World no. 16
HelloReceQueue got message: Hello World no. 17
HelloReceQueue got message: Hello World no. 18
HelloReceQueue got message: Hello World no. 19
HelloReceQueue got message: Hello World no. 20
分享到:
相关推荐
JMS简明教程 JMS文档 JMS标准规范
JMS规范和简明教程JMS规范和简明教程JMS规范和简明教程JMS规范和简明教程JMS规范和简明教程JMS规范和简明教程
JMS简明教程(Word版),JMS1.1标准说明
JMS简明教程
JMS简明教程 简体中文 简体中文 简体中文
JMS规范并不要求供应商同时支持这两种消息模型,但开发者应该熟悉这两种消息模型的优势与缺点。 Java 语言的客户端和 Java 语言的中间层服务必须能够使用这些消息系统。JMS 为 Java 语言程序提供了一个通用的方式来...
JMS 是一个接口和相关语义的集合,那些语义定义了 JMS 客户端如何获取企业消息产品 的功能。 由于消息是点对点的,所以 JMS 的所有用户都称为客户端(clients)。JMS 应用由定义 消息的应用和一系列与他们交互的...
技术分享:JMS详细实例学习教程
JMS-详细教程.pdf
JMS 规范培训教程 手册 教程 指南 pdf
JMS规范培训教程 SUN MQ
JMS简明教程+JMS规范教程+activemq以及activemq和tomcat的整合+整合实例代码+持久化消息配置以及工程+tomcat服务器的配置+整合需要的lib文件+部署多个tomcat服务器方案等
jms学习笔记jms学习笔记jms学习笔记
jms JMS规范培训教程 jms简介 jms JMS规范培训教程 jms简介jms JMS规范培训教程 jms简介
JMS的中文教程(Java的消息驱动)很好的一本书,我的珍藏~
JMS教程,学习笔记,基于XML和JMS的异构数据交换集成
JMS规范教程,学习JMS的朋友可以好好看看
SUN JMS 教程 APACHE ACTIVEMQ 教程
Sun官方JMS教程,非常之权威,是学习JMS的必备好资料。