论坛首页 Java企业应用论坛

java线程阻塞中断和LockSupport的常见问题

浏览 13014 次
该帖已经被评为精华帖
作者 正文
   发表时间:2011-03-22   最后修改:2011-03-22

上周五和周末,工作忙里偷闲,在看java cocurrent中也顺便再温故了一下Thread.interrupt和java 5之后的LockSupport的实现。

 

在介绍之前,先抛几个问题。

 

  1. Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常?
  2. Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING?
  3. 一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?
  4. LockSupport.park()和unpark(),与object.wait()和notify()的区别?
  5. LockSupport.park(Object blocker)传递的blocker对象做什么用?
  6. LockSupport能响应Thread.interrupt()事件不?会抛出InterruptedException异常?
  7. Thread.interrupt()处理是否有对应的回调函数?类似于钩子调用?
如果你都都能很明确的答上来了,说明你已经完全懂Thread.interrupt,可以不用往下看那了。

那如果不清楚的,带着这几个问题,一起来梳理下。
Thread的interrupt处理的几个方法:
  • public void interrupt() :  执行线程interrupt事件
  • public boolean isInterrupted() : 检查当前线程是否处于interrupt
  • public static boolean interrupted() : check当前线程是否处于interrupt,并重置interrupt信息。类似于resetAndGet()

理解:
1. 每个线程都有一个interrupt status标志位,用于表明当前线程是否处于中断状态
2. 一般调用Thread.interrupt()会有两种处理方式
  • 遇到一个低优先级的block状态时,比如object.wait(),object.sleep(),object.join()。它会立马触发一个unblock解除阻塞,并throw一个InterruptedException。
  • 其他情况,Thread.interrupt()仅仅只是更新了status标志位。然后你的工作线程通过Thread.isInterrrupted()进行检查,可以做相应的处理,比如也throw InterruptedException或者是清理状态,取消task等。
在interrupt javadoc中描述:



最佳实践
IBM上有篇文章写的挺不错。Java theory and practice: Dealing with InterruptedException , 里面提到了Interrupt处理的几条最佳实践。
  1. Don't swallow interrupts (别吃掉Interrupt,一般是两种处理:  继续throw InterruptedException异常。  另一种就是继续设置Thread.interupt()异常标志位,让更上一层去进行相应处理。
    public class TaskRunner implements Runnable {
        private BlockingQueue<Task> queue;
    
        public TaskRunner(BlockingQueue<Task> queue) { 
            this.queue = queue; 
        }
    
        public void run() { 
            try {
                 while (true) {
                     Task task = queue.take(10, TimeUnit.SECONDS);
                     task.execute();
                 }
             }
             catch (InterruptedException e) { 
                 // Restore the interrupted status
                 Thread.currentThread().interrupt();
             }
        }
    }
     
  2. Implementing cancelable tasks with Interrupt (使用Thread.interrupt()来设计和支持可被cancel的task)
    public class PrimeProducer extends Thread {
        private final BlockingQueue<BigInteger> queue;
    
        PrimeProducer(BlockingQueue<BigInteger> queue) {
            this.queue = queue;
        }
    
        public void run() {
            try {
                BigInteger p = BigInteger.ONE;
                while (!Thread.currentThread().isInterrupted())
                    queue.put(p = p.nextProbablePrime());
            } catch (InterruptedException consumed) {
                /* Allow thread to exit */
            }
        }
    
        public void cancel() { interrupt(); } // 发起中断
    } 

注册Interrupt处理事件(非正常用法)

一般正常的task设计用来处理cancel,都是采用主动轮询的方式检查Thread.isInterrupt(),对业务本身存在一定的嵌入性,还有就是存在延迟,你得等到下一个检查点(谁知道下一个检查点是在什么时候,特别是进行一个socket.read时,遇到过一个HttpClient超时的问题)。

 

来看一下,主动抛出InterruptedException异常的实现,借鉴于InterruptibleChannel的设计,比较取巧。

 

 

interface InterruptAble { // 定义可中断的接口

    public void interrupt() throws InterruptedException;
}

abstract class InterruptSupport implements InterruptAble {

    private volatile boolean interrupted = false;
    private Interruptible    interruptor = new Interruptible() {

                                             public void interrupt() {
                                                 interrupted = true;
                                                 InterruptSupport.this.interrupt(); // 位置3
                                             }
                                         };

    public final boolean execute() throws InterruptedException {
        try {
            blockedOn(interruptor); // 位置1
            if (Thread.currentThread().isInterrupted()) { // 立马被interrupted
                interruptor.interrupt();
            }
            // 执行业务代码
            bussiness();
        } finally {
            blockedOn(null);   // 位置2
        }

        return interrupted;
    }

    public abstract void bussiness() ;

    public abstract void interrupt();

    // -- sun.misc.SharedSecrets --
    static void blockedOn(Interruptible intr) { // package-private
        sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
    }
}

 

代码说明,几个取巧的点:

位置1:利用sun提供的blockedOn方法,绑定对应的Interruptible事件处理钩子到指定的Thread上。

位置2:执行完代码后,清空钩子。避免使用连接池时,对下一个Thread处理事件的影响。

位置3:定义了Interruptible事件钩子的处理方法,回调InterruptSupport.this.interrupt()方法,子类可以集成实现自己的业务逻辑,比如sock流关闭等等。

 

使用: 

 

class InterruptRead extends InterruptSupport {

    private FileInputStream in;

    @Override
    public void bussiness() {
        File file = new File("/dev/urandom"); // 读取linux黑洞,永远读不完
        try {
            in = new FileInputStream(file);
            byte[] bytes = new byte[1024];
            while (in.read(bytes, 0, 1024) > 0) {
                // Thread.sleep(100);
                // if (Thread.interrupted()) {// 以前的Interrupt检查方式
                // throw new InterruptedException("");
                // }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public FileInputStream getIn() {
        return in;
    }

    @Override
    public void interrupt() {
        try {
            in.getChannel().close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

public static void main(String args[]) throws Exception {
        final InterruptRead test = new InterruptRead();
        Thread t = new Thread() {

            @Override
            public void run() {
                long start = System.currentTimeMillis();
                try {
                    System.out.println("InterruptRead start!");
                    test.execute();
                } catch (InterruptedException e) {
                    System.out.println("InterruptRead end! cost time : " + (System.currentTimeMillis() - start));
                    e.printStackTrace();
                }
            }
        };
        t.start();
        // 先让Read执行3秒
        Thread.sleep(3000);
        // 发出interrupt中断
        t.interrupt();
    }
 

 

jdk源码介绍: 

1. sun提供的钩子可以查看System的相关代码, line : 1125

 

sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
            public sun.reflect.ConstantPool getConstantPool(Class klass) {
                return klass.getConstantPool();
            }
            public void setAnnotationType(Class klass, AnnotationType type) {
                klass.setAnnotationType(type);
            }
            public AnnotationType getAnnotationType(Class klass) {
                return klass.getAnnotationType();
            }
            public <E extends Enum<E>>
		    E[] getEnumConstantsShared(Class<E> klass) {
                return klass.getEnumConstantsShared();
            }
            public void blockedOn(Thread t, Interruptible b) {
                t.blockedOn(b);
            }
        });

 

 2. Thread.interrupt()

 

public void interrupt() {
	if (this != Thread.currentThread())
	    checkAccess();

	synchronized (blockerLock) {
	    Interruptible b = blocker;
	    if (b != null) {
		interrupt0();		// Just to set the interrupt flag
		b.interrupt(); //回调钩子
		return;
	    }
	}
	interrupt0();
    }

 

 

更多

更多关于Thread.stop,suspend,resume,interrupt的使用注意点,可以看一下sun的文档,比如http://download.oracle.com/javase/6/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html

 

 

最后来解答一下之前的几个问题:

问题1: Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常? 

答: Thread.interrupt()只是在Object.wait() .Object.join(), Object.sleep()几个方法会主动抛出InterruptedException异常。而在其他的的block常见,只是通过设置了Thread的一个标志位信息,需要程序自我进行处理。

 

if (Thread.interrupted())  // Clears interrupted status!
    throw new InterruptedException();

 

问题2:Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING?

答:Thread.interrupt设计的目的主要是用于处理线程处于block状态,比如wait(),sleep()状态就是个例子。但可以在程序设计时为支持task cancel,同样可以支持RUNNING状态。比如Object.join()和一些支持interrupt的一些nio channel设计。

 

问题3: 一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?

答: interrupt用途: unBlock操作,支持任务cancel, 数据清理等。

 

问题4: LockSupport.park()和unpark(),与object.wait()和notify()的区别?

答:

1.  面向的主体不一样。LockSuport主要是针对Thread进进行阻塞处理,可以指定阻塞队列的目标对象,每次可以指定具体的线程唤醒。Object.wait()是以对象为纬度,阻塞当前的线程和唤醒单个(随机)或者所有线程。

2.  实现机制不同。虽然LockSuport可以指定monitor的object对象,但和object.wait(),两者的阻塞队列并不交叉。可以看下测试例子。object.notifyAll()不能唤醒LockSupport的阻塞Thread.

 

 

 

问题5: LockSupport.park(Object blocker)传递的blocker对象做什么用?

答: 对应的blcoker会记录在Thread的一个parkBlocker属性中,通过jstack命令可以非常方便的监控具体的阻塞对象.

 

public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker); // 设置Thread.parkBlocker属性的值
        unsafe.park(false, 0L);
        setBlocker(t, null);  // 清除Thread.parkBlocker属性的值
    }

 具体LockSupport的javadoc描述也比较清楚,可以看下:

 

 

问题6: LockSupport能响应Thread.interrupt()事件不?会抛出InterruptedException异常?

答:能响应interrupt事件,但不会抛出InterruptedException异常。针对LockSupport对Thread.interrupte支持,也先看一下javadoc中的描述:


 

相关测试代码

 

package com.agapple.cocurrent;

import java.io.File;
import java.io.FileInputStream;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

public class LockSupportTest {

    private static LockSupportTest blocker = new LockSupportTest();

    public static void main(String args[]) throws Exception {
        lockSupportTest();
        parkTest();
        interruptParkTest();
        interruptSleepTest();
        interruptWaitTest();
    }

    /**
     * LockSupport.park对象后,尝试获取Thread.blocker对象,调用其single唤醒
     * 
     * @throws Exception
     */
    private static void lockSupportTest() throws Exception {
        Thread t = doTest(new TestCallBack() {

            @Override
            public void callback() throws Exception {
                // 尝试sleep 5s
                System.out.println("blocker");
                LockSupport.park(blocker);
                System.out.println("wakeup now!");
            }

            @Override
            public String getName() {
                return "lockSupportTest";
            }

        });
        t.start(); // 启动读取线程

        Thread.sleep(150);
        synchronized (blocker) {
            Field field = Thread.class.getDeclaredField("parkBlocker");
            field.setAccessible(true);
            Object fBlocker = field.get(t);
            System.out.println(blocker == fBlocker);
            Thread.sleep(100);
            System.out.println("notifyAll");
            blocker.notifyAll();
        }
    }

    /**
     * 尝试去中断一个object.wait(),会抛出对应的InterruptedException异常
     * 
     * @throws InterruptedException
     */
    private static void interruptWaitTest() throws InterruptedException {
        final Object obj = new Object();
        Thread t = doTest(new TestCallBack() {

            @Override
            public void callback() throws Exception {
                // 尝试sleep 5s
                obj.wait();
                System.out.println("wakeup now!");
            }

            @Override
            public String getName() {
                return "interruptWaitTest";
            }

        });
        t.start(); // 启动读取线程
        Thread.sleep(2000);
        t.interrupt(); // 检查下在park时,是否响应中断
    }

    /**
     * 尝试去中断一个Thread.sleep(),会抛出对应的InterruptedException异常
     * 
     * @throws InterruptedException
     */
    private static void interruptSleepTest() throws InterruptedException {
        Thread t = doTest(new TestCallBack() {

            @Override
            public void callback() throws Exception {
                // 尝试sleep 5s
                Thread.sleep(5000);
                System.out.println("wakeup now!");
            }

            @Override
            public String getName() {
                return "interruptSleepTest";
            }

        });
        t.start(); // 启动读取线程
        Thread.sleep(2000);
        t.interrupt(); // 检查下在park时,是否响应中断
    }

    /**
     * 尝试去中断一个LockSupport.park(),会有响应但不会抛出InterruptedException异常
     * 
     * @throws InterruptedException
     */
    private static void interruptParkTest() throws InterruptedException {
        Thread t = doTest(new TestCallBack() {

            @Override
            public void callback() {
                // 尝试去park 自己线程
                LockSupport.parkNanos(blocker, TimeUnit.SECONDS.toNanos(5));
                System.out.println("wakeup now!");
            }

            @Override
            public String getName() {
                return "interruptParkTest";
            }

        });
        t.start(); // 启动读取线程
        Thread.sleep(2000);
        t.interrupt(); // 检查下在park时,是否响应中断
    }

    /**
     * 尝试去中断一个LockSupport.unPark(),会有响应
     * 
     * @throws InterruptedException
     */
    private static void parkTest() throws InterruptedException {
        Thread t = doTest(new TestCallBack() {

            @Override
            public void callback() {
                // 尝试去park 自己线程
                LockSupport.park(blocker);
                System.out.println("wakeup now!");
            }

            @Override
            public String getName() {
                return "parkTest";
            }

        });

        t.start(); // 启动读取线程
        Thread.sleep(2000);
        LockSupport.unpark(t);
        t.interrupt();
    }

    public static Thread doTest(final TestCallBack call) {
        return new Thread() {

            @Override
            public void run() {
                File file = new File("/dev/urandom"); // 读取linux黑洞
                try {
                    FileInputStream in = new FileInputStream(file);
                    byte[] bytes = new byte[1024];
                    while (in.read(bytes, 0, 1024) > 0) {
                        if (Thread.interrupted()) {
                            throw new InterruptedException("");
                        }
                        System.out.println(bytes[0]);
                        Thread.sleep(100);
                        long start = System.currentTimeMillis();
                        call.callback();
                        System.out.println(call.getName() + " callback finish cost : "
                                           + (System.currentTimeMillis() - start));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

        };
    }
}

interface TestCallBack {

    public void callback() throws Exception;

    public String getName();
}
 

 

最后

发觉文章越写越长,那就索性发到了论坛,大家一起讨论下.毕竟文章中描述的都是一些使用层面的东东,并没有从操作系统或者sun native实现上去介绍Thread的一些机制,熟悉这块的大牛门也可以出来发表下高见.

 

本文仅当抛砖引玉,欢迎发言!

  • 大小: 72.7 KB
  • 大小: 51.2 KB
  • 大小: 105.3 KB
   发表时间:2011-06-23   最后修改:2011-06-23

public void interrupt() {
	if (this != Thread.currentThread())
	    checkAccess();

	synchronized (blockerLock) {
	    Interruptible b = blocker;
	    if (b != null) {
		interrupt0();		// Just to set the interrupt flag
		b.interrupt();
		return;
	    }
	}
	interrupt0();
    }



设置interrupt flag 调用的是interrupt0(); ,该方法是native方法,请问这个flag是不是类似一个boolean 变量?
0 请登录后投票
   发表时间:2011-06-23  
楼主,您好,能否请教下如下一段代码:

/**
 * 线程A: 循环5次后等待并放弃锁,让线程B执行。
 */
class ThreadA extends Thread {
	// 线程同步的公共数据区
	Object oa = null;

	ThreadA(Object o) {
		this.oa = o;
	}

	// 线程A执行逻辑
	public void run() {
		// 线程同步区域,需要申请公共数据的锁
		synchronized (oa) {
			System.out.println("ThreadA is running......");
			for (int i = 0; i < 10; i++) {
				System.out.println("   ThreadA value is " + i);
				if (i == 4) {
					try {
						// 当前线程等待
						oa.wait(1000*10);
					} catch (InterruptedException e) {
						System.out.println("我被中断了");
						boolean flg = Thread.interrupted();
						System.out.println("重置中断状态后,我当前的状态" + flg);
					}
				}
				
			}
		}
	}
}

/**
 * 线程B:等待线程A放弃锁,然后获得锁并执行,完成后唤醒线程A
 */
class ThreadB extends Thread {
	// 线程同步的公共数据区
	Object ob = null;

	ThreadB(Object o) {
		this.ob = o;
	}

	// 线程B执行逻辑
	public void run() {
		// 线程同步区域,需要申请公共数据的锁
		synchronized (ob) {
			System.out.println("ThreadB is running......");
			for (int i = 0; i < 5; i++) {
				System.out.println("   ThreadB value is " + i);
			}
			System.out.println("------------------ThreadB is over--------------------");
			// 唤醒等待的线程
			ob.notifyAll();
		}
	}
}

// 测试
public class ThreadTest {
	public static void main(String[] args) throws InterruptedException {
		Object lock = new Object(); // 公共数据区
		ThreadA threada = new ThreadA(lock);
		ThreadB threadb = new ThreadB(lock);
		threada.start(); // 线程A执行
		threadb.start(); // 线程B执行
		Thread.sleep(1000 * 2);
		threada.interrupt();
	}
}



我本次测试有两种运行结果,对其中的一种运行结果不明白,
ThreadA is running......
   ThreadA value is 0
   ThreadA value is 1
   ThreadA value is 2
   ThreadA value is 3
   ThreadA value is 4
ThreadB is running......
   ThreadB value is 0
   ThreadB value is 1
   ThreadB value is 2
   ThreadB value is 3
   ThreadB value is 4
------------------ThreadB is over--------------------
   ThreadA value is 5
   ThreadA value is 6
   ThreadA value is 7
   ThreadA value is 8
   ThreadA value is 9


为什么ThreadA中的oa.wait(1000*10); 这句代码没有执行,main方法里的threada.interrupt(); 这句也没起作用,谢谢。
0 请登录后投票
   发表时间:2011-06-23  
刚刚再仔细看了下,明白了为什么会有这样的结果,ThreadB运行完后,调用了notifyAll方法,所以ThreadA对象被唤醒了,不再需要等待。
0 请登录后投票
   发表时间:2011-06-23   最后修改:2011-06-23
zhangyou1010 写道

public void interrupt() {
	if (this != Thread.currentThread())
	    checkAccess();

	synchronized (blockerLock) {
	    Interruptible b = blocker;
	    if (b != null) {
		interrupt0();		// Just to set the interrupt flag
		b.interrupt();
		return;
	    }
	}
	interrupt0();
    }



设置interrupt flag 调用的是interrupt0(); ,该方法是native方法,请问这个flag是不是类似一个boolean 变量?


猜的基本正确,我看了下native源码。

thread.c: 
#include "jni.h"
#include "jvm.h"

#include "java_lang_Thread.h"

#define THD "Ljava/lang/Thread;"
#define OBJ "Ljava/lang/Object;"
#define STE "Ljava/lang/StackTraceElement;"

#define ARRAY_LENGTH(a) (sizeof(a)/sizeof(a[0]))

static JNINativeMethod methods[] = {
    {"start0",           "()V",        (void *)&JVM_StartThread},
    {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},
    {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},
    {"suspend0",         "()V",        (void *)&JVM_SuspendThread},
    {"resume0",          "()V",        (void *)&JVM_ResumeThread},
    {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},
    {"yield",            "()V",        (void *)&JVM_Yield},
    {"sleep",            "(J)V",       (void *)&JVM_Sleep},
    {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},
    {"countStackFrames", "()I",        (void *)&JVM_CountStackFrames},
    {"interrupt0",       "()V",        (void *)&JVM_Interrupt},
    {"isInterrupted",    "(Z)Z",       (void *)&JVM_IsInterrupted},
    {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},
    {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},
    {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
};

#undef THD
#undef OBJ
#undef STE

JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
    (*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}


最终的实现类:jvm.c

在oracle jdk提供的源码中没有jvm.c,所以网上找了一份
JNIEXPORT void JNICALL
JVM_Interrupt(JNIEnv *env, jobject thread)
{
    CVMExecEnv *ee = CVMjniEnv2ExecEnv(env);
    CVMExecEnv *targetEE;
    CVMJavaLong eetopVal;
   
    CVMsysMutexLock(ee, &CVMglobals.threadLock);

    CVMID_fieldReadLong(ee, thread,
			CVMoffsetOfjava_lang_Thread_eetop,
			eetopVal);
    targetEE = (CVMExecEnv *)CVMlong2VoidPtr(eetopVal);

    /* %comment: rt035 */
    if (targetEE != NULL) {
	if (!targetEE->interruptsMasked) {
	    CVMthreadInterruptWait(CVMexecEnv2threadID(targetEE));
	} else {
	    targetEE->maskedInterrupt = CVM_TRUE;  //设置个变量
	}
    }

    CVMsysMutexUnlock(ee, &CVMglobals.threadLock);
}

JNIEXPORT jboolean JNICALL
JVM_IsInterrupted(JNIEnv *env, jobject thread, jboolean clearInterrupted)
{
    CVMExecEnv *ee = CVMjniEnv2ExecEnv(env);
    CVMExecEnv *targetEE;
    CVMThreadICell *thisThreadCell;
    CVMJavaLong eetopVal;
    jboolean result = JNI_FALSE;
    CVMBool isSelf;

    thisThreadCell = CVMcurrentThreadICell(ee);
    CVMID_icellSameObject(ee, thisThreadCell, thread, isSelf);

    if (isSelf) {
	/*
	 * The thread is the current thread, so we can avoid the
	 * locking because we know we aren't going away.
	 *
	 * Current thread will not see any interrupts while
	 * interrupts are masked.
	 */
	result = !ee->interruptsMasked &&
	    CVMthreadIsInterrupted(CVMexecEnv2threadID(ee), clearInterrupted);
    } else {
	/* a thread can only clear its own interrupt */
	CVMassert(!clearInterrupted);
	CVMsysMutexLock(ee, &CVMglobals.threadLock);
	CVMID_fieldReadLong(ee, thread,
			    CVMoffsetOfjava_lang_Thread_eetop,
			    eetopVal);
	if (!CVMlongEqz(eetopVal)) {
	    targetEE = (CVMExecEnv *)CVMlong2VoidPtr(eetopVal);
	    result = !targetEE->interruptsMasked ?
		CVMthreadIsInterrupted(CVMexecEnv2threadID(targetEE),
		    clearInterrupted)
		: targetEE->maskedInterrupt;
	}
	CVMsysMutexUnlock(ee, &CVMglobals.threadLock);
    }

    return result;
}


虽然对C语言不是很了解,不过基本的意思相信大家也能看的懂了。
0 请登录后投票
   发表时间:2011-06-23   最后修改:2011-06-23
zhangyou1010 写道
刚刚再仔细看了下,明白了为什么会有这样的结果,ThreadB运行完后,调用了notifyAll方法,所以ThreadA对象被唤醒了,不再需要等待。


多线程编码还是有蛮多的陷阱,以前大家都比较忽略interrupt事件的处理,这是不应该。
还有在面试中,如果一个人说精通多线程编程,我一般都会问问interrupt和锁的问题。

ps : 3个月之前的老帖也让你捞起来了 
0 请登录后投票
   发表时间:2011-06-23  
很奇怪,貌似关注这帖子的人不多啊。
0 请登录后投票
   发表时间:2011-06-23  
zhangyou1010 写道
很奇怪,貌似关注这帖子的人不多啊。


因为interrupt很多时候都是人忽视的,至少我一段时间之前写多线程代码,都是直接catch Exception,不会单独处理interrupt。

平时还是多看看I/O,多线程和一些opensource代码的阅读
0 请登录后投票
   发表时间:2011-06-25  

说说俺的看到,Thread.isInterrupt()这种主动轮询的方法主要是在没有阻塞的时候处理的,它可以充当一个标志位的作用但是,在有阻塞的时候,比如sleep/wait他们会响应中断,抛出异常,类似socket、内部锁这种是不响应中断的,所以就死在那里了,类似你说的HTTPCLIENT超时的问题,

Thread.isInterrupted 本身设计上来讲,就不能处理这种情况;

InterruptAble 真是一种好方法,学习了,不过貌似如果是通过Thread这种方式起线程的话,也可以
覆盖Interrupted方法就好了,假设有个不响应中断的Socket阻塞:

public void interrupt(){
		try{
			println "interruptSelf"
			socket.close();
		}catch(IOException e){
		
		}finally{
			super.interrupt()
		}
	}
 

如果用FutureTask和Cancellable的话,也可以考虑覆盖FutureTask的cancel方法

 

 

0 请登录后投票
   发表时间:2011-06-26  
碧海山城 写道

说说俺的看到,Thread.isInterrupt()这种主动轮询的方法主要是在没有阻塞的时候处理的,它可以充当一个标志位的作用但是,在有阻塞的时候,比如sleep/wait他们会响应中断,抛出异常,类似socket、内部锁这种是不响应中断的,所以就死在那里了,类似你说的HTTPCLIENT超时的问题,

Thread.isInterrupted 本身设计上来讲,就不能处理这种情况;

InterruptAble 真是一种好方法,学习了,不过貌似如果是通过Thread这种方式起线程的话,也可以
覆盖Interrupted方法就好了,假设有个不响应中断的Socket阻塞:

 

public void interrupt(){
		try{
			println "interruptSelf"
			socket.close();
		}catch(IOException e){
		
		}finally{
			super.interrupt()
		}
	}
 

如果用FutureTask和Cancellable的话,也可以考虑覆盖FutureTask的cancel方法

 

 

 

FutureTask已经实现了cancel方法,也是发起了一个interrupt事件给运行的Thread,至于是否可以正常的响应中断,还是得看写任务代码的是否有处理中断异常。

 

你的这种处理方式也是一种思路,原理主要还是获取中断事件进行请求处理。

不过一般不建议这么做,如果是你设计一个多线程处理的tool工具,你不能假定使用者是否会正确/响应的线程中断,是把?

0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics