论坛首页 Java企业应用论坛

多线程中的状态转换转换问题

浏览 7463 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2007-01-08  
private Queue<byte[]> receivedData = new ArrayBlockingQueue<byte[]>(100);
	
	public void receive(byte[] data, int length) {
		byte [] ldata = new byte[data.length+1];
		System.arraycopy(data, 0, ldata, 0, data.length);
		ldata[data.length]=(byte) length;
		this.receivedData.add(ldata);	

		synchronized(receivedData){					
			this.receivedData.notify();	
		}						

	}
	
	private class HandlingThread implements Runnable{

		public void run() {		
			while(true){				
					if(!receivedData.isEmpty()){
						byte[] data = receivedData.remove();						
						dealReceiveData(data);					
					}						
					else{
						try {
							synchronized(receivedData){
								receivedData.wait();
							}
							
						} catch (InterruptedException e) {
							e.printStackTrace();
							
						}
					}
				
			}
		}	
		
	}

说明,receive(byte[] data, int length)由一个事件触发,读入数据并保存在一个队列中,HandlingThread 负责处理数据,作为一个独立的线程运行。 这个程序多数时候运行正常,但是有的时候HandlingThread处于 TIMED-WAITING,这很奇怪。
我跟踪了HandlingThread的状态,正常情况它处于WAITING状态或者RUNNABLE状态,这个TIMED-WAITING状态是如何变出来的?根据api文档,只有Thread.sleep Object.wait with timeout
Thread.join with timeout LockSupport.parkNanos LockSupport.parkUntil 可以把线程状态变为这个TIMED-WAITING,可是在我的代码里并没有出现过对这些方法的调用,找不到到底在哪里改变了HandlingThread的状态。
   发表时间:2007-01-08  
大哥,你wait了,有没有notify啊?
如果队列有出现一次空,你这个handling线程就永久阻塞了!
假如你在其他地方对receivedData.notify(),你要保证同步到位!
还有,如果handling线程是多个一起处理任务的,那你要能探测到有多少个线程被wait了,然后随机的让notify一个个激活!

本质上,你没设置超时,这个。。。
0 请登录后投票
   发表时间:2007-01-08  
galaxystar 写道
大哥,你wait了,有没有notify啊?
假如你在其他地方对receivedData.notify(),你要保证同步到位!

大哥,仔细的看了又看,我的确notify了呀。
#  synchronized(receivedData){                   
#             this.receivedData.notify();   
#         }       

这里我不保证收到的数据马上能被处理,只保证有机会被处理。我的问题不是线程没有被唤醒,而是它为什么会处于TIMED-WAITING状态,如果处于WAITING我是没有问题的。
0 请登录后投票
   发表时间:2007-01-08  
是不是因为用synchronized同步了同一个对象receivedData,这样在同步块中进行receivedData.wait()等待后,其他线程因为无法获得对象receivedData的同步锁,而不能进入下面块中
        synchronized(receivedData){                   
            this.receivedData.notify();   
        } 
从而产生无法唤醒
1 请登录后投票
   发表时间:2007-01-08  
有点晕,time-waiting出现的可能性有吗?强制wait(0)试试!反正wait() = wait(0)
0 请登录后投票
   发表时间:2007-01-08  
galaxystar 写道
大哥,你wait了,有没有notify啊?
如果队列有出现一次空,你这个handling线程就永久阻塞了!
假如你在其他地方对receivedData.notify(),你要保证同步到位!
还有,如果handling线程是多个一起处理任务的,那你要能探测到有多少个线程被wait了,然后随机的让notify一个个激活!

本质上,你没设置超时,这个。。。

多谢。队列有出现一次空,这个handling线程不会就永久阻塞。它只是阻塞到有数据为止。一旦收到就会被唤醒。我的程序开始就是这样的,队列为空。没设置超时是故意的,用线程的原因就是因为handling经常需要处于阻塞状态,好让出cpu。receivedData.notify()只出现过一次,这里没有疑问。handling线程也只有一个。这个程序多数时候运行是正常的。我试过设置超时时间为100,问题同样出现。问题不是它不能被唤醒,只是在特殊情况下它自己转换线程状态到了TIMED-WAITING状态,当情况结束它自己又回到正常状态了,我始终找不到,跟踪不到是在哪里,是谁改变了它的线程状态
0 请登录后投票
   发表时间:2007-01-08  
可以考虑用BlockingQueue来做,你这个是典型的生产与消费者的模式需求,以下摘抄自JDK文档:

java.util.concurrent
接口 BlockingQueue<E>
类型参数:
E - 在此 collection 中保持的元素类型
所有超级接口:
Collection<E>, Iterable<E>, Queue<E>
所有已知实现类:
ArrayBlockingQueue, DelayQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue

--------------------------------------------------------------------------------

public interface BlockingQueue<E>extends Queue<E>支持两个附加操作的 Queue,这两个操作是:检索元素时等待队列变为非空,以及存储元素时等待空间变得可用。

BlockingQueue 不接受 null 元素。试图 add、put 或 offer 一个 null 元素时,某些实现会抛出 NullPointerException。null 被用作指示 poll 操作失败的警戒值。

BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个 remainingCapacity,超出此容量,便无法无阻塞地 put 额外的元素。没有任何内部容量约束的 BlockingQueue 总是报告 Integer.MAX_VALUE 的剩余容量。

BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持 Collection 接口。因此,举例来说,使用 remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操作通常不 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。

BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁定或其他形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)没有 必要自动执行,除非在实现中特别说明。因此,举例来说,在只添加了 c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。

BlockingQueue 实质上不 支持使用任何一种“close”或“shutdown”操作来指示不再添加任何项。这种功能的需求和使用有依赖于实现的倾向。例如,一种常用的策略是:对于生产者,插入特殊的 end-of-stream 或 poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。

以下是基于典型的生产者-使用者场景的一个用例。注意,BlockingQueue 可以安全地与多个生产者和多个使用者一起使用。
 class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while(true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while(true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }

 class Setup {
   void main() {
     BlockingQueue q = new SomeQueueImplementation();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
 }


此接口是 Java Collections Framework 的成员。

从以下版本开始:
1.5

--------------------------------------------------------------------------------

方法摘要
boolean add(E o)
          将指定的元素添加到此队列中(如果立即可行),在成功时返回 true,其他情况则抛出 IllegalStateException。
int drainTo(Collection<? super E> c)
          移除此队列中所有可用的元素,并将它们添加到给定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
          最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
boolean offer(E o)
          如果可能的话,将指定元素插入此队列中。
boolean offer(E o, long timeout, TimeUnit unit)
          将指定的元素插入此队列中,如果没有可用空间,将等待指定的等待时间(如果有必要)。
E poll(long timeout, TimeUnit unit)
          检索并移除此队列的头部,如果此队列中没有任何元素,则等待指定等待的时间(如果有必要)。
void put(E o)
          将指定元素添加到此队列中,如果没有可用空间,将一直等待(如果有必要)。
int remainingCapacity()
          返回在无阻塞的理想情况下(不存在内存或资源约束)此队列能接受的元素数量;如果没有内部限制,则返回 Integer.MAX_VALUE。
E take()
          检索并移除此队列的头部,如果此队列不存在任何元素,则一直等待。
0 请登录后投票
   发表时间:2007-01-08  
谢谢楼上这么详细的回答,我用的ArrayBlockingQueue就是BlockingQueue呀。
0 请登录后投票
   发表时间:2007-01-08  
javavsnet 写道
谢谢楼上这么详细的回答,我用的ArrayBlockingQueue就是BlockingQueue呀。
哦,对啊,那根本就不要你去关心同步的问题,你只要

添:queue.put()
取:queue.take()

就搞定了,不要你自找麻烦啊。
0 请登录后投票
   发表时间:2007-01-09  
既然使用的是阻塞的queue(ArrayBlockingQueue),就应该使用阻塞的方法:
ArrayBlockingQueue.offer(E o);
ArrayBlockingQueue.offer(E o, long timeout, TimeUnit unit);
ArrayBlockingQueue.put(E o);

ArrayBlockingQueue.poll();
ArrayBlockingQueue.poll(long timeout, TimeUnit unit);
ArrayBlockingQueue.take();

根本不需要使用wait()
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics