`
wenzi5a321
  • 浏览: 3895 次
最近访客 更多访客>>
社区版块
存档分类
最新评论

MQ客户端

MQ 
阅读更多
public class mqSendSample {
@SuppressWarnings("rawtypes")
private static Hashtable<String, Comparable> env = new Hashtable<String, Comparable>();

// 队列管理器名
private static String queueManagerName;
// 队列管理器引用
private static MQQueueManager queueManager;
// 队列名
private static String queueName;
// 队列引用
private MQQueue queue;

/**
* <应用启动时初始化队列管理器连
* 由于连接队列管理器如同连接数据一样,建立时需要资源较多, 连接时间较长,因此不要每次创建关闭,建议应用程序保持
* 或多个队列管理器连接但应用关闭时注意关闭连接,释放资源!
*
* @throws Exception
*/
@BeforeClass
public static void initEnvironment() throws Exception {
// 服务器地id、名称
env.put(MQConstants.HOST_NAME_PROPERTY, "192.168.102.29");
// 连接通道
env.put(MQConstants.CHANNEL_PROPERTY, "ESB_TEST");
// 服务器MQ服务使用的编1381代表GBK,1208代表UTF(Coded Character Set Identifier:CCSID)
env.put(MQConstants.CCSID_PROPERTY, 1208);
// 端口号
env.put(MQConstants.PORT_PROPERTY, 1414);
// 传输类型
env.put(MQConstants.TRANSPORT_PROPERTY, MQConstants.TRANSPORT_MQSERIES);
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
// 设置目标队列管理器
queueManagerName = "AIRP_QMGR_002";
// 设置目标队列
queueName = "topic_in3";

// 建立队列管理器连接
connectQM();
}

/**
* 程序结束时释放队列管理连接资源
*
* @throws Exception
*/
@AfterClass
public static void destroyEnvironment() throws Exception {
disconnectQM();
}

@Test
public void testSend() throws Exception {
// 队列打开参数
int openOptions = MQConstants.MQOO_BIND_AS_Q_DEF
| MQConstants.MQOO_OUTPUT;
// 打开队列(同一线程内,同时只能打开该队列一次)
queue = queueManager.accessQueue(queueName, openOptions);
// 设置发送消息参数为:具有同步性,及支持事务
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = MQConstants.MQPMO_SYNCPOINT;
try {
// 发送消息(这是为同时多条消息发送)
for (int i = 0; i < 10; i++) {
// 设置消息格式为字符串类型
MQMessage msg = new MQMessage();
msg.format = MQConstants.MQFMT_STRING;
/*
* 设置自定义消息头
*/
msg.setStringProperty("sys", "md"); // 服务id
msg.characterSet = 1208;
// 消息内容
String message = "<msg>你好<msg>";
// 设置消息内容
msg.writeString(message);
// 发�?消息
queue.put(msg, pmo);

}
// 提交事务
queueManager.commit();

} catch (MQException e) {
// 事务回滚
queueManager.backout();
e.printStackTrace();
} finally {
// 关闭队列
if (queue != null) {
queue.close();
}
}

}

private static void connectQM() throws Exception {
queueManager = new MQQueueManager(queueManagerName, env);
}

private static void disconnectQM() throws Exception {
if (queueManager != null) {
queueManager.disconnect();
}
}

public static void main(String[] args) throws Exception {
mqSendSample queue1 = new mqSendSample();
queue1.initEnvironment();
// 队列打开参数
int openOptions = MQConstants.MQOO_BIND_AS_Q_DEF
| MQConstants.MQOO_OUTPUT;
// 打开队列(同一线程内,同时只能打开该队列一次)
queue1.queue = queue1.queueManager.accessQueue(queueName, openOptions);
// 设置发送消息参数为:具有同步性,及支持事务
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = MQConstants.MQPMO_SYNCPOINT;
try {
// 设置消息格式为字符串类型
MQMessage msg = new MQMessage();
msg.format = MQConstants.MQFMT_STRING;
// 消息内容编码(1208:utf-8)
msg.characterSet = 1208;
msg.setStringProperty("sys", "md");
String message = "<root><RequestHead><RequestID>123</RequestID><SourceSystem>VMSCODE</SourceSystem>" +
"<TargetSystem>DCECODE</TargetSystem><ServiceName>S0802001A</ServiceName>" +
"<ServiceOperation>TO_ZF</ServiceOperation>" +
"<ServiceVersion></ServiceVersion></RequestHead><RequestBody><updateProductReq><product><action>2</action><prod_id>1010002956</prod_id><prod_code>1010002956</prod_code><prod_simple_code></prod_simple_code><prod_name>测试产品00013</prod_name><unit_id>118</unit_id><prod_memo2k></prod_memo2k><prod_list_price>200.0000</prod_list_price></product></updateProductReq></RequestBody></root>";
// 设置消息内容
msg.writeString(message);
// 发�?消息
queue1.queue.put(msg, pmo);

// 提交事务
queue1.queueManager.commit();

} catch (MQException e) {
// 事务回滚
queue1.queueManager.backout();
e.printStackTrace();
} finally {
// 关闭队列
if (queue1.queue != null) {
queue1.queue.close();
}
}
}
}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics