`
阅读更多

 我在IBM MQ7和suse9的环境下做的开发。

 我在发送一个大文件消息时,使用分段的方式进行操作,处理完成后放到远程队列当中,到达下一个目的地。

 如果基于单独的mq的队列管理,如上的方法没有问题。但如果通过远程队列放到一个集群当中时,就会把一个完整的消息体四分五裂。在网上找N多资料,大都是要使用BIND_ON_OPEN做为打开的options。但每次都不管用。

 说下具体情况。

  QM_FIRST 表示要发送的队列管理器

  QM_PROXY 表示集群的代理服务器,也是一个队列管理器,属于集群ANT

       QM_EXE1 一个队列管理器,属于集群ANT,权重25%

       QM_EXE2 一个队列管理器,属于集群ANT,权重25%

       QM_EXE3 一个队列管理器,属于集群ANT,权重25%

       QM_EXE4 一个队列管理器,属于集群ANT,权重25%

 

      QM_FIRST通过对象UR_FIRST_TO_PROXY来完成于PROXY的发送。

 

  目前的效果。


 

  想要达到的效果。

 


 

      说明一下,远程队列的使用方式,远程队列通过一个别名队列QM_PROXY_ALIAS放在集群当中的本地队列当中。


 

  经过反复的论证和测试,发现这个问题的罪魁祸首就是这个远程队列,最后的解决方法是,弃用这个远程队列,直接连接到对方的集群当中的本地队列,也就是如上图的远程队列里。就可以了

  没有代码哟。大家可以自己直接写,比较简单,

 

  部分代码示例。
 private int SINGLE_MSG_MAX_LENGTH = 1048576; // 1M 默认是1M

/**
	 * 分段发送文件.
	 * 
	 * @param b
	 * @param queueName
	 * @throws QueueAccessException
	 */
	@SuppressWarnings("deprecation")
	public void putMessageMultiple(byte[] b, String queueName) throws QueueAccessException {

		MQQueue mqPut = null;
		try {
			mqPut = getPutQueue(queueName);

			int length, m, n;
			int packNum;

			length = b.length;

			m = length / SINGLE_MSG_MAX_LENGTH;
			n = length % SINGLE_MSG_MAX_LENGTH;

			if (n == 0) {
				packNum = m;
			} else {
				packNum = m + 1;
			}

			// second, put the msg to queue

			// 设置放消息时的参数
			for (int packCount = 1; packCount <= packNum; packCount++) {				
				MQMessage myMsg = new MQMessage();
				MQPutMessageOptions pmo = new MQPutMessageOptions();
				pmo.options = MQC.MQPMO_LOGICAL_ORDER + MQC.MQPMO_SYNCPOINT;
				if (packCount == packNum) {
					myMsg.messageFlags = MQC.MQMF_LAST_SEGMENT;
					if (n == 0) {						
						myMsg.write(b, (packCount - 1) * SINGLE_MSG_MAX_LENGTH, SINGLE_MSG_MAX_LENGTH);
					} else {						
						myMsg.write(b, (packCount - 1) * SINGLE_MSG_MAX_LENGTH, n);
					}
				} else {
					myMsg.messageFlags = MQC.MQMF_SEGMENT;
					
					myMsg.write(b, (packCount - 1) * SINGLE_MSG_MAX_LENGTH, SINGLE_MSG_MAX_LENGTH);
				}				
				mqPut.put(myMsg, pmo);
			}

		} catch (MQException e) {

			if (e.completionCode == MQException.MQCC_FAILED && e.reasonCode == MQException.MQRC_UNKNOWN_OBJECT_NAME) {
				throw new InvalidQueueException(queueName + " MQException occur  when getting the messsage.", e);
			} else {
				throw new QueueAccessException(queueName + " MQException occur  when sending  the messsage.", e);
			}

		} catch (IOException e) {

			throw new QueueAccessException(queueName + " IOException occur  when sending  the messsage.", e);
		}
	}

 

/**
	 * 得到发送消息队列的实例
	 */
	@SuppressWarnings("deprecation")
	private MQQueue getPutQueue(String queueName) throws MQException {
		MQQueue queue = null;
		queue = (MQQueue) putQueues.get(queueName);
		if (queue == null) {
			int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
			queue = qMgr.accessQueue(queueName, openOptions);
			putQueues.put(queueName, queue);
		}
		return queue;
	}

 

 

如上只是发送的代码。

 

 

如下是取出的代码。

 

 

/**
	 * 从队列中浏览分段消息
	 * 
	 * @param queueName
	 * @return byte[]
	 * @roseuid 447FA30301A0
	 */
	@SuppressWarnings("deprecation")
	public byte[] browseBigMessage(String queueName) throws QueueAccessException {

		byte[] b = null;
		byte[] retbb = null;

		ByteArrayOutputStream out = null;

		MQQueue browseQueue = null;
		try {
			browseQueue = getBrowseQueueBySeq(queueName);
			out = new ByteArrayOutputStream();
			MQMessage inMsg;

			// 设置取消息时的参数
			MQGetMessageOptions gmo = new MQGetMessageOptions();			
			
			gmo.options = MQC.MQGMO_LOGICAL_ORDER + MQC.MQGMO_SYNCPOINT + MQC.MQGMO_ALL_SEGMENTS_AVAILABLE;
			boolean isLastSegment = false;
			while (!isLastSegment) {
				inMsg = new MQMessage();
				browseQueue.get(inMsg, gmo);
				if (inMsg.messageFlags == MQC.MQMF_SEGMENT + MQC.MQMF_LAST_SEGMENT)
					isLastSegment = true;
				b = new byte[inMsg.getMessageLength()];
				inMsg.readFully(b);
				out.write(b);
				out.flush();
				b = null;
			}
			retbb = out.toByteArray();

		} catch (MQException e) {

			if (e.completionCode == MQException.MQCC_FAILED && e.reasonCode == MQException.MQRC_UNKNOWN_OBJECT_NAME) {
				throw new InvalidQueueException(queueName + " MQException occur  when browsing the messsage.", e);
			} else if (!(e.completionCode == 2 && e.reasonCode == 2033)) {
				throw new QueueAccessException(queueName + " MQException occur  when browsing the messsage.", e);
			}
		} catch (IOException e) {

			throw new QueueAccessException(queueName + " IOException occur  when browsing the messsage.", e);
		} catch (NumberFormatException e) {

			throw new QueueAccessException(queueName + " NumberFormatException occur  when browsing the messsage.", e);
		}
		return retbb;
	}


/**
	 * 得到浏览消息队列的实例,分段时实用
	 */
	@SuppressWarnings("deprecation")
	private MQQueue getBrowseQueueBySeq(String queueName) throws MQException {
		MQQueue queue = null;
		queue = (MQQueue) browseQueues.get(queueName);
		if (queue == null) {
			int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING | MQC.MQOO_INQUIRE;// 本属性是browse and
			queue = qMgr.accessQueue(queueName, openOptions);
			browseQueues.put(queueName, queue);
		}
		return queue;
	}

 呵呵。

  • 大小: 39.3 KB
  • 大小: 37.5 KB
  • 大小: 3.9 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics