`
dev_liu
  • 浏览: 110095 次
  • 性别: Icon_minigender_1
  • 来自: 成都
最近访客 更多访客>>
社区版块
存档分类
最新评论

今天把多线程的发送搞定了一半.而且实现了服务器和客户端不在一个主机了

    博客分类:
  • JMS
阅读更多

public class Sendert {
 QueueConnectionFactory  queueConnectionFactory = null;
  QueueConnection         queueConnection = null;
    QueueSession            queueSession = null;
    Queue                   queue = null;
   QueueSender             queueSender = null;
      final String            MSG_TEXT1 =
          new String("Here is a client-acknowledge message 4tetrtretertert4t45ewrwerwrgergrt4rqwerew5t479347597505890 34535&%^*&$&^$^%$&%&*)_*)&^*%*^$&$^(*&(*&(*%*%&^%*%^%*&^*&^&*^*&^*&^");
      final String            MSG_TEXT2 =
       new String("Here is a Message  327893475984 ryeirrfr9urtyureyt&&&&&&&)))__^&&&))gerg)%%^%^%^%&(*(*(*)*(**)(*)*)*)T^HJCJBDKJFBKJJDOAIWEU(Q&#(&E(*&#Q*E&QEUODIJEDLJLHDKGDUWTE&QE");
      TextMessage         message1 = null;
      TextMessage         message2 =null;

 public Sendert() throws JMSException, NamingException {

  javax.naming.Context ictx = new InitialContext(JMSConstant.getProp());

  queueConnectionFactory = (QueueConnectionFactory) ictx.lookup("qcf");

  queueConnection = queueConnectionFactory.createQueueConnection("user1",
    "user1");
  queueSession = queueConnection.createQueueSession(false,
    Session.CLIENT_ACKNOWLEDGE);
  queue = (Queue) ictx.lookup("queue");
  ictx.close();
  System.out.println("  SENDER: Created client-acknowledge session");
  queueSender = queueSession.createSender(queue);
  message1 = queueSession.createTextMessage();
  message2 = queueSession.createTextMessage();
  message1.setIntProperty("sessionID", 1);
  message2.setIntProperty("sessionID", 2);
 }
 
   class AsynSender implements Runnable
  {
   public void run ()
   {   
    
       try {
            
             for(int i=0;i<10;i++){
              message1.setText("i"+MSG_TEXT1);
              message2.setText("i"+MSG_TEXT2);
             System.out.println("  SENDER: Sending message: "
                 + message1.getText()+i);
             queueSender.send(message1);
             queueSender.send(message2);
             }
         } catch (JMSException e) {
             System.out.println("Exception occurred: " + e.toString());
           
         }
//         finally {
//          if(queueSender!=null){
//           try{queueSender.close();
//           
//          }catch(JMSException e){}
//          if(queueSession!=null)
//           {
//           try{
//           queueSession.commit();
//           }catch(JMSException e){}
//          }
//             if (queueConnection != null)
//             {
//                 try {
//                     queueConnection.close();
//                 } catch (JMSException e) {}
//             }
//         }
//         }
         }
  }
  
 public void run_threads() {
  AsynSender tt = new AsynSender();
  for (int i = 0; i < 5; i++)
   {   new Thread(tt).start();
   
   
   
   System.out.println("go");}
//  try{
//   tt.join();
//   
//  }catch(InterruptedException je){}
  System.out.println("go2");
  }
  
     
 
 void close() throws JMSException {
  queueSender.close();
  queueSession.close();
   queueConnection.close();
 }


    public static void main(String[] args) throws JMSException, NamingException {
     int count=0;
  Sendert se = new Sendert();
  se.run_threads();
  Thread tt=Thread.currentThread();
  System.out.println("count==: "+tt.activeCount());
//  tt.g
//  System.out.println("tt==: "+tt);
//  myLable:{
//   count=tt.activeCount();
//  }
//  for(;;){
//    break myLable;
//  }
//  while(true){
//   
//  
//  if(tt.activeCount()==3) 
//        se.close();
//  else break myLable;;}
//  se.close();
  System.out.println("count==: "+tt.activeCount());
 }

}
 

说明.就是一个内部类,更好的封装了一个可以实现多线程的run方法..不过多线程有个问题..我无法知道子线程什么时候完毕,然后调用close()方法关闭所以对象...搞了几个小时,判断活动的线程,join().好象都不行.

分享到:
评论
2 楼 dev_liu 2007-01-05  
package duoxianchengtest;

/**
*
*/

import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
import java_cup.runtime.Symbol;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
*
* @author Owner
*
*/

public class Sendert {



private static transient final Log log = LogFactory.getLog(Sendert.class);

    QueueConnection        queueConnection = null;
    QueueSession            queueSession = null;
    QueueSender             queueSender = null;
      final String       MSG_TEXT1 =
          new String("Here is a client-acknowledge message 4tetrtretertert4t45ewrwerwrgergrt4rqwerew5t479347597505890 34535&%^*&$&^$^%$&%&*)_*)&^*%*^$&$^(*&(*&(*%*%&^%*%^%*&^*&^&*^*&^*&^");
      final String       MSG_TEXT2 =
      new String("Here is a Message  327893475984 ryeirrfr9urtyureyt&&&&&&&)))__^&&&))gerg)%%^%^%^%&(*(*(*)*(**)(*)*)*)T^HJCJBDKJFBKJJDOAIWEU(Q&#(&E(*&#Q*E&QEUODIJEDLJLHDKGDUWTE&QE");
      TextMessage         message1 = null;
      TextMessage         message2 = null;
      static long time1;
      static long time4;
public Sendert() throws JMSException, NamingException {

time1 = System.currentTimeMillis();
// javax.naming.Context ictx = new InitialContext(JMSConstant.getProp());
/**
  * 直接创建一个Properties,把jndi配置属性key,value放进去.可以省去读取jdni.properties的时间和大量代码.
  */
Properties pr =new Properties();
pr.put("82.0.176.214","16400");
javax.naming.Context  ictx = new InitialContext(pr);
QueueConnectionFactory  queueConnectionFactory = (QueueConnectionFactory) ictx.lookup("qcf");

queueConnection = queueConnectionFactory.createQueueConnection("user1","user1");
queueSession = queueConnection.createQueueSession(false,
Session.CLIENT_ACKNOWLEDGE);
Queue queue = (Queue) ictx.lookup("queue");
ictx.close();

queueSender = queueSession.createSender(queue);
message1 = queueSession.createTextMessage();
message2 = queueSession.createTextMessage();
message1.setIntProperty("sessionID", 1);
message2.setIntProperty("sessionID", 2);
}

class AsynSender extends Thread {

public void run () {   

      try {
           
            for(int i=0;i<5;i++){
            message1.setText("i"+MSG_TEXT1);
            message2.setText("i"+MSG_TEXT2);
System.out.println("  message1: "  + message1.getText());
System.out.println("  message2: "  + message2.getText());
            queueSender.send(message1);
            queueSender.send(message2);
            }
        } catch (JMSException e) {
            System.out.println("Exception occurred: " + e.toString());
          
        }
      }
}

        /**
         * 由于是多线程,无法控制每个线程同时完成,所以会发生没有发送消息完毕而关闭连接的情况
         * 省去下面注释的代码.
         */
//         finally {
//         if(queueSender!=null){
//         try{queueSender.close();
//        
//         }catch(JMSException e){}
//         if(queueSession!=null)
//         {
//         try{
//         queueSession.commit();
//         }catch(JMSException e){}
//         }
//             if (queueConnection != null)
//             {
//                 try {
//                     queueConnection.close();
//                 } catch (JMSException e) {}
//             }
//         }
//         }
 
public void run_threads() {
AsynSender[] tt = new AsynSender[1000];
for (int i = 0; i < tt.length; i++)
       {
tt[i] = new AsynSender();
tt[i].start();

// new Thread(tt).setPriority(Thread.MAX_PRIORITY);
// new Thread(tt).start();

  log.info("run"+Thread.activeCount());
System.out.println("go");
}

for (int i = 0; i < tt.length; i++) {
System.out.println("go1");
try {
tt[i].join();//让子线程发送消息完毕后,才执行主控线程,关闭发送器,连接,会话对象

} catch (InterruptedException je) {}


System.out.println("go2");
}
}
     
/**
* 创建方法close()关闭消息发送器,会话,连接对象.
*
* @throws JMSException
*/
void close() throws JMSException {
queueSender.close();
queueSession.close();
queueConnection.close();
}

/**
*
* @param args
* @throws JMSException
* @throws NamingException
* @throws IOException
* 主控线程,当子线程发送消息完毕后.调用close()方法关闭消息发送器,会话,连接对象.
* 最后打印出发送消息的时间.和一些活动的线程测试.
*
*/
    public static void main(String[] args) throws JMSException, NamingException, IOException {
    System.out.println(" sender: Created client-acknowledge session");
Sendert se = new Sendert();
long time4 = System.currentTimeMillis();
se.run_threads();
      System.out.println("count==: "+Thread.activeCount());

      se.close();//发送消息完毕关闭消息发送器,会话,连接对象.


// System.out.println("tt==: "+tt);
// myLable:{
// count=tt.activeCount();
// }
// for(;;){
// break myLable;
// }
// while(true){
//
//
// if(tt.activeCount()==3)
//        se.close();
// else break myLable;;}
// System.in.read();

// try{Thread.sleep(10000);
// }catch(InterruptedException e){};

long time = System.currentTimeMillis();
long time2= time-time1;
System.out.println("count==: "+Thread.activeCount());
System.out.println("time=:"+time);//当前时间
System.out.println("time1=:"+time1);//程序初始化时间
System.out.println("time2=:"+time2);//程序执行耗费时间
System.out.println("time4=:"+time4);//程序初始化耗费时间
log.info("time4="+(time4-time1));
System.out.println("count==: "+Thread.activeCount()); //当前线程活动数
}

}
1 楼 dev_liu 2007-01-05  
这个问题终于解决了..做一个线程组...然后在for()循环里调用join().这样就ok了直到所有的发送消息线程执行完毕后执行主控线程..嘿嘿....

相关推荐

Global site tag (gtag.js) - Google Analytics