首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

java线程阻塞中断和LockSupport的常见有关问题

2012-12-26 
java线程阻塞中断和LockSupport的常见问题上周五和周末,工作忙里偷闲,在看java cocurrent中也顺便再温故了

java线程阻塞中断和LockSupport的常见问题

上周五和周末,工作忙里偷闲,在看java cocurrent中也顺便再温故了一下Thread.interrupt和java 5之后的LockSupport的实现。

?

在介绍之前,先抛几个问题。

?

    Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常?Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING?一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?LockSupport.park()和unpark(),与object.wait()和notify()的区别?LockSupport.park(Object blocker)传递的blocker对象做什么用?LockSupport能响应Thread.interrupt()事件不?会抛出InterruptedException异常?Thread.interrupt()处理是否有对应的回调函数?类似于钩子调用?
如果你都都能很明确的答上来了,说明你已经完全懂Thread.interrupt,可以不用往下看那了。
那如果不清楚的,带着这几个问题,一起来梳理下。Thread的interrupt处理的几个方法:public void interrupt() : ?执行线程interrupt事件public boolean isInterrupted() : 检查当前线程是否处于interruptpublic static boolean interrupted() : check当前线程是否处于interrupt,并重置interrupt信息。类似于resetAndGet()
理解:1. 每个线程都有一个interrupt status标志位,用于表明当前线程是否处于中断状态2. 一般调用Thread.interrupt()会有两种处理方式遇到一个低优先级的block状态时,比如object.wait(),object.sleep(),object.join()。它会立马触发一个unblock解除阻塞,并throw一个InterruptedException。其他情况,Thread.interrupt()仅仅只是更新了status标志位。然后你的工作线程通过Thread.isInterrrupted()进行检查,可以做相应的处理,比如也throw?InterruptedException或者是清理状态,取消task等。在interrupt javadoc中描述:
java线程阻塞中断和LockSupport的常见有关问题

最佳实践IBM上有篇文章写的挺不错。Java theory and practice: Dealing with InterruptedException?,?里面提到了Interrupt处理的几条最佳实践。
    Don't swallow interrupts (别吃掉Interrupt,一般是两种处理: ?继续throw InterruptedException异常。 ?另一种就是继续设置Thread.interupt()异常标志位,让更上一层去进行相应处理。
    public class TaskRunner implements Runnable {    private BlockingQueue<Task> queue;    public TaskRunner(BlockingQueue<Task> queue) {         this.queue = queue;     }    public void run() {         try {             while (true) {                 Task task = queue.take(10, TimeUnit.SECONDS);                 task.execute();             }         }         catch (InterruptedException e) {              // Restore the interrupted status             Thread.currentThread().interrupt();         }    }}
    ?Implementing cancelable tasks with Interrupt (使用Thread.interrupt()来设计和支持可被cancel的task)
    public class PrimeProducer extends Thread {    private final BlockingQueue<BigInteger> queue;    PrimeProducer(BlockingQueue<BigInteger> queue) {        this.queue = queue;    }    public void run() {        try {            BigInteger p = BigInteger.ONE;            while (!Thread.currentThread().isInterrupted())                queue.put(p = p.nextProbablePrime());        } catch (InterruptedException consumed) {            /* Allow thread to exit */        }    }    public void cancel() { interrupt(); } // 发起中断}?
注册Interrupt处理事件(非正常用法)

一般正常的task设计用来处理cancel,都是采用主动轮询的方式检查Thread.isInterrupt(),对业务本身存在一定的嵌入性,还有就是存在延迟,你得等到下一个检查点(谁知道下一个检查点是在什么时候,特别是进行一个socket.read时,遇到过一个HttpClient超时的问题)。

?

来看一下,主动抛出InterruptedException异常的实现,借鉴于InterruptibleChannel的设计,比较取巧。

?

?

interface InterruptAble { // 定义可中断的接口    public void interrupt() throws InterruptedException;}abstract class InterruptSupport implements InterruptAble {    private volatile boolean interrupted = false;    private Interruptible    interruptor = new Interruptible() {                                             public void interrupt() {                                                 interrupted = true;                                                 InterruptSupport.this.interrupt(); // 位置3                                             }                                         };    public final boolean execute() throws InterruptedException {        try {            blockedOn(interruptor); // 位置1            if (Thread.currentThread().isInterrupted()) { // 立马被interrupted                interruptor.interrupt();            }            // 执行业务代码            bussiness();        } finally {            blockedOn(null);   // 位置2        }        return interrupted;    }    public abstract void bussiness() ;    public abstract void interrupt();    // -- sun.misc.SharedSecrets --    static void blockedOn(Interruptible intr) { // package-private        sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);    }}

?

代码说明,几个取巧的点:

位置1:利用sun提供的blockedOn方法,绑定对应的Interruptible事件处理钩子到指定的Thread上。

位置2:执行完代码后,清空钩子。避免使用连接池时,对下一个Thread处理事件的影响。

位置3:定义了Interruptible事件钩子的处理方法,回调InterruptSupport.this.interrupt()方法,子类可以集成实现自己的业务逻辑,比如sock流关闭等等。

?

使用:?

?

class InterruptRead extends InterruptSupport {    private FileInputStream in;    @Override    public void bussiness() {        File file = new File("/dev/urandom"); // 读取linux黑洞,永远读不完        try {            in = new FileInputStream(file);            byte[] bytes = new byte[1024];            while (in.read(bytes, 0, 1024) > 0) {                // Thread.sleep(100);                // if (Thread.interrupted()) {// 以前的Interrupt检查方式                // throw new InterruptedException("");                // }            }        } catch (Exception e) {            throw new RuntimeException(e);        }    }    public FileInputStream getIn() {        return in;    }    @Override    public void interrupt() {        try {            in.getChannel().close();        } catch (IOException e) {            e.printStackTrace();        }    }}public static void main(String args[]) throws Exception {        final InterruptRead test = new InterruptRead();        Thread t = new Thread() {            @Override            public void run() {                long start = System.currentTimeMillis();                try {                    System.out.println("InterruptRead start!");                    test.execute();                } catch (InterruptedException e) {                    System.out.println("InterruptRead end! cost time : " + (System.currentTimeMillis() - start));                    e.printStackTrace();                }            }        };        t.start();        // 先让Read执行3秒        Thread.sleep(3000);        // 发出interrupt中断        t.interrupt();    }
?

?

jdk源码介绍:?

1. sun提供的钩子可以查看System的相关代码, line : 1125

?

sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){            public sun.reflect.ConstantPool getConstantPool(Class klass) {                return klass.getConstantPool();            }            public void setAnnotationType(Class klass, AnnotationType type) {                klass.setAnnotationType(type);            }            public AnnotationType getAnnotationType(Class klass) {                return klass.getAnnotationType();            }            public <E extends Enum<E>>    E[] getEnumConstantsShared(Class<E> klass) {                return klass.getEnumConstantsShared();            }            public void blockedOn(Thread t, Interruptible b) {                t.blockedOn(b);            }        });

?

?2. Thread.interrupt()

?

public void interrupt() {if (this != Thread.currentThread())    checkAccess();synchronized (blockerLock) {    Interruptible b = blocker;    if (b != null) {interrupt0();// Just to set the interrupt flagb.interrupt(); //回调钩子return;    }}interrupt0();    }

?

?

更多

更多关于Thread.stop,suspend,resume,interrupt的使用注意点,可以看一下sun的文档,比如http://download.oracle.com/javase/6/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html

?

?

最后来解答一下之前的几个问题:

问题1:?Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常??

答:?Thread.interrupt()只是在Object.wait() .Object.join(), Object.sleep()几个方法会主动抛出InterruptedException异常。而在其他的的block常见,只是通过设置了Thread的一个标志位信息,需要程序自我进行处理。

?

if (Thread.interrupted())  // Clears interrupted status!    throw new InterruptedException();

?

问题2:Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING?

答:Thread.interrupt设计的目的主要是用于处理线程处于block状态,比如wait(),sleep()状态就是个例子。但可以在程序设计时为支持task cancel,同样可以支持RUNNING状态。比如Object.join()和一些支持interrupt的一些nio channel设计。

?

问题3:?一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?

答:?interrupt用途: unBlock操作,支持任务cancel, 数据清理等。

?

问题4:?LockSupport.park()和unpark(),与object.wait()和notify()的区别?

答:

1. ?面向的主体不一样。LockSuport主要是针对Thread进进行阻塞处理,可以指定阻塞队列的目标对象,每次可以指定具体的线程唤醒。Object.wait()是以对象为纬度,阻塞当前的线程和唤醒单个(随机)或者所有线程。

2. ?实现机制不同。虽然LockSuport可以指定monitor的object对象,但和object.wait(),两者的阻塞队列并不交叉。可以看下测试例子。object.notifyAll()不能唤醒LockSupport的阻塞Thread.

?

?

?

问题5:?LockSupport.park(Object blocker)传递的blocker对象做什么用?

答: 对应的blcoker会记录在Thread的一个parkBlocker属性中,通过jstack命令可以非常方便的监控具体的阻塞对象.

?

public static void park(Object blocker) {        Thread t = Thread.currentThread();        setBlocker(t, blocker); // 设置Thread.parkBlocker属性的值        unsafe.park(false, 0L);        setBlocker(t, null);  // 清除Thread.parkBlocker属性的值    }

?具体LockSupport的javadoc描述也比较清楚,可以看下:

?

java线程阻塞中断和LockSupport的常见有关问题

?

问题6:?LockSupport能响应Thread.interrupt()事件不?会抛出InterruptedException异常?

答:能响应interrupt事件,但不会抛出InterruptedException异常。针对LockSupport对Thread.interrupte支持,也先看一下javadoc中的描述:


java线程阻塞中断和LockSupport的常见有关问题

?

相关测试代码

?

package com.agapple.cocurrent;import java.io.File;import java.io.FileInputStream;import java.lang.reflect.Field;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.LockSupport;public class LockSupportTest {    private static LockSupportTest blocker = new LockSupportTest();    public static void main(String args[]) throws Exception {        lockSupportTest();        parkTest();        interruptParkTest();        interruptSleepTest();        interruptWaitTest();    }    /**     * LockSupport.park对象后,尝试获取Thread.blocker对象,调用其single唤醒     *      * @throws Exception     */    private static void lockSupportTest() throws Exception {        Thread t = doTest(new TestCallBack() {            @Override            public void callback() throws Exception {                // 尝试sleep 5s                System.out.println("blocker");                LockSupport.park(blocker);                System.out.println("wakeup now!");            }            @Override            public String getName() {                return "lockSupportTest";            }        });        t.start(); // 启动读取线程        Thread.sleep(150);        synchronized (blocker) {            Field field = Thread.class.getDeclaredField("parkBlocker");            field.setAccessible(true);            Object fBlocker = field.get(t);            System.out.println(blocker == fBlocker);            Thread.sleep(100);            System.out.println("notifyAll");            blocker.notifyAll();        }    }    /**     * 尝试去中断一个object.wait(),会抛出对应的InterruptedException异常     *      * @throws InterruptedException     */    private static void interruptWaitTest() throws InterruptedException {        final Object obj = new Object();        Thread t = doTest(new TestCallBack() {            @Override            public void callback() throws Exception {                // 尝试sleep 5s                obj.wait();                System.out.println("wakeup now!");            }            @Override            public String getName() {                return "interruptWaitTest";            }        });        t.start(); // 启动读取线程        Thread.sleep(2000);        t.interrupt(); // 检查下在park时,是否响应中断    }    /**     * 尝试去中断一个Thread.sleep(),会抛出对应的InterruptedException异常     *      * @throws InterruptedException     */    private static void interruptSleepTest() throws InterruptedException {        Thread t = doTest(new TestCallBack() {            @Override            public void callback() throws Exception {                // 尝试sleep 5s                Thread.sleep(5000);                System.out.println("wakeup now!");            }            @Override            public String getName() {                return "interruptSleepTest";            }        });        t.start(); // 启动读取线程        Thread.sleep(2000);        t.interrupt(); // 检查下在park时,是否响应中断    }    /**     * 尝试去中断一个LockSupport.park(),会有响应但不会抛出InterruptedException异常     *      * @throws InterruptedException     */    private static void interruptParkTest() throws InterruptedException {        Thread t = doTest(new TestCallBack() {            @Override            public void callback() {                // 尝试去park 自己线程                LockSupport.parkNanos(blocker, TimeUnit.SECONDS.toNanos(5));                System.out.println("wakeup now!");            }            @Override            public String getName() {                return "interruptParkTest";            }        });        t.start(); // 启动读取线程        Thread.sleep(2000);        t.interrupt(); // 检查下在park时,是否响应中断    }    /**     * 尝试去中断一个LockSupport.unPark(),会有响应     *      * @throws InterruptedException     */    private static void parkTest() throws InterruptedException {        Thread t = doTest(new TestCallBack() {            @Override            public void callback() {                // 尝试去park 自己线程                LockSupport.park(blocker);                System.out.println("wakeup now!");            }            @Override            public String getName() {                return "parkTest";            }        });        t.start(); // 启动读取线程        Thread.sleep(2000);        LockSupport.unpark(t);        t.interrupt();    }    public static Thread doTest(final TestCallBack call) {        return new Thread() {            @Override            public void run() {                File file = new File("/dev/urandom"); // 读取linux黑洞                try {                    FileInputStream in = new FileInputStream(file);                    byte[] bytes = new byte[1024];                    while (in.read(bytes, 0, 1024) > 0) {                        if (Thread.interrupted()) {                            throw new InterruptedException("");                        }                        System.out.println(bytes[0]);                        Thread.sleep(100);                        long start = System.currentTimeMillis();                        call.callback();                        System.out.println(call.getName() + " callback finish cost : "                                           + (System.currentTimeMillis() - start));                    }                } catch (Exception e) {                    e.printStackTrace();                }            }        };    }}interface TestCallBack {    public void callback() throws Exception;    public String getName();}
?

?

最后

发觉文章越写越长,那就索性发到了论坛,大家一起讨论下.毕竟文章中描述的都是一些使用层面的东东,并没有从操作系统或者sun native实现上去介绍Thread的一些机制,熟悉这块的大牛门也可以出来发表下高见.

?

本文仅当抛砖引玉,欢迎发言!

public void interrupt() {if (this != Thread.currentThread()) checkAccess();synchronized (blockerLock) { Interruptible b = blocker; if (b != null) {interrupt0();// Just to set the interrupt flagb.interrupt();return; }}interrupt0(); }


设置interrupt flag 调用的是interrupt0(); ,该方法是native方法,请问这个flag是不是类似一个boolean 变量?/** * 线程A: 循环5次后等待并放弃锁,让线程B执行。 */class ThreadA extends Thread {// 线程同步的公共数据区Object oa = null;ThreadA(Object o) {this.oa = o;}// 线程A执行逻辑public void run() {// 线程同步区域,需要申请公共数据的锁synchronized (oa) {System.out.println("ThreadA is running......");for (int i = 0; i < 10; i++) {System.out.println(" ThreadA value is " + i);if (i == 4) {try {// 当前线程等待oa.wait(1000*10);} catch (InterruptedException e) {System.out.println("我被中断了");boolean flg = Thread.interrupted();System.out.println("重置中断状态后,我当前的状态" + flg);}}}}}}/** * 线程B:等待线程A放弃锁,然后获得锁并执行,完成后唤醒线程A */class ThreadB extends Thread {// 线程同步的公共数据区Object ob = null;ThreadB(Object o) {this.ob = o;}// 线程B执行逻辑public void run() {// 线程同步区域,需要申请公共数据的锁synchronized (ob) {System.out.println("ThreadB is running......");for (int i = 0; i < 5; i++) {System.out.println(" ThreadB value is " + i);}System.out.println("------------------ThreadB is over--------------------");// 唤醒等待的线程ob.notifyAll();}}}// 测试public class ThreadTest {public static void main(String[] args) throws InterruptedException {Object lock = new Object(); // 公共数据区ThreadA threada = new ThreadA(lock);ThreadB threadb = new ThreadB(lock);threada.start(); // 线程A执行threadb.start(); // 线程B执行Thread.sleep(1000 * 2);threada.interrupt();}}


我本次测试有两种运行结果,对其中的一种运行结果不明白,
ThreadA is running......   ThreadA value is 0   ThreadA value is 1   ThreadA value is 2   ThreadA value is 3   ThreadA value is 4ThreadB is running......   ThreadB value is 0   ThreadB value is 1   ThreadB value is 2   ThreadB value is 3   ThreadB value is 4------------------ThreadB is over--------------------   ThreadA value is 5   ThreadA value is 6   ThreadA value is 7   ThreadA value is 8   ThreadA value is 9


为什么ThreadA中的oa.wait(1000*10); 这句代码没有执行,main方法里的threada.interrupt(); 这句也没起作用,谢谢。public void interrupt() {if (this != Thread.currentThread()) checkAccess();synchronized (blockerLock) { Interruptible b = blocker; if (b != null) {interrupt0();// Just to set the interrupt flagb.interrupt();return; }}interrupt0(); }


设置interrupt flag 调用的是interrupt0(); ,该方法是native方法,请问这个flag是不是类似一个boolean 变量?

猜的基本正确,我看了下native源码。

thread.c: 
#include "jni.h"#include "jvm.h"#include "java_lang_Thread.h"#define THD "Ljava/lang/Thread;"#define OBJ "Ljava/lang/Object;"#define STE "Ljava/lang/StackTraceElement;"#define ARRAY_LENGTH(a) (sizeof(a)/sizeof(a[0]))static JNINativeMethod methods[] = {    {"start0",           "()V",        (void *)&JVM_StartThread},    {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},    {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},    {"suspend0",         "()V",        (void *)&JVM_SuspendThread},    {"resume0",          "()V",        (void *)&JVM_ResumeThread},    {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},    {"yield",            "()V",        (void *)&JVM_Yield},    {"sleep",            "(J)V",       (void *)&JVM_Sleep},    {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},    {"countStackFrames", "()I",        (void *)&JVM_CountStackFrames},    {"interrupt0",       "()V",        (void *)&JVM_Interrupt},    {"isInterrupted",    "(Z)Z",       (void *)&JVM_IsInterrupted},    {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},    {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},    {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},};#undef THD#undef OBJ#undef STEJNIEXPORT void JNICALLJava_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls){    (*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));}


最终的实现类:jvm.c

在oracle jdk提供的源码中没有jvm.c,所以网上找了一份
JNIEXPORT void JNICALLJVM_Interrupt(JNIEnv *env, jobject thread){    CVMExecEnv *ee = CVMjniEnv2ExecEnv(env);    CVMExecEnv *targetEE;    CVMJavaLong eetopVal;       CVMsysMutexLock(ee, &CVMglobals.threadLock);    CVMID_fieldReadLong(ee, thread,CVMoffsetOfjava_lang_Thread_eetop,eetopVal);    targetEE = (CVMExecEnv *)CVMlong2VoidPtr(eetopVal);    /* %comment: rt035 */    if (targetEE != NULL) {if (!targetEE->interruptsMasked) {    CVMthreadInterruptWait(CVMexecEnv2threadID(targetEE));} else {    targetEE->maskedInterrupt = CVM_TRUE;  //设置个变量}    }    CVMsysMutexUnlock(ee, &CVMglobals.threadLock);}JNIEXPORT jboolean JNICALLJVM_IsInterrupted(JNIEnv *env, jobject thread, jboolean clearInterrupted){    CVMExecEnv *ee = CVMjniEnv2ExecEnv(env);    CVMExecEnv *targetEE;    CVMThreadICell *thisThreadCell;    CVMJavaLong eetopVal;    jboolean result = JNI_FALSE;    CVMBool isSelf;    thisThreadCell = CVMcurrentThreadICell(ee);    CVMID_icellSameObject(ee, thisThreadCell, thread, isSelf);    if (isSelf) {/* * The thread is the current thread, so we can avoid the * locking because we know we aren't going away. * * Current thread will not see any interrupts while * interrupts are masked. */result = !ee->interruptsMasked &&    CVMthreadIsInterrupted(CVMexecEnv2threadID(ee), clearInterrupted);    } else {/* a thread can only clear its own interrupt */CVMassert(!clearInterrupted);CVMsysMutexLock(ee, &CVMglobals.threadLock);CVMID_fieldReadLong(ee, thread,    CVMoffsetOfjava_lang_Thread_eetop,    eetopVal);if (!CVMlongEqz(eetopVal)) {    targetEE = (CVMExecEnv *)CVMlong2VoidPtr(eetopVal);    result = !targetEE->interruptsMasked ?CVMthreadIsInterrupted(CVMexecEnv2threadID(targetEE),    clearInterrupted): targetEE->maskedInterrupt;}CVMsysMutexUnlock(ee, &CVMglobals.threadLock);    }    return result;}


虽然对C语言不是很了解,不过基本的意思相信大家也能看的懂了。  <p>?</p>
<p>?</p><p>?</p>
<p>?</p>
</div>
<p>?</p>
<p>FutureTask已经实现了cancel方法,也是发起了一个interrupt事件给运行的Thread,至于是否可以正常的响应中断,还是得看写任务代码的是否有处理中断异常。</p>
<p>?</p>
<p>你的这种处理方式也是一种思路,原理主要还是获取中断事件进行请求处理。</p>
<p>不过一般不建议这么做,如果是你设计一个多线程处理的tool工具,你不能假定使用者是否会正确/响应的线程中断,是把?</p>    12 楼 mojunbin 2012-04-24   这类帖子比较好,Java多线程很多地方还是很值得讨论.

热点排行