2007-09-03
线程任务的取消
关键字: java 线程 取消
当外部代码能够在活动自然完成之前,把它的状态更改为完成状态,那么这个活动被称为可取消(cancellable)。取消任务是一个很常见的需求,无论 是由于用户请求还是系统错误引起的服务关闭等等原因。最简单的任务取消策略就是在线程中维持一个bool变量,在run方法中判断此变量的bool值来决 定是否取消任务。显然,这个bool变量需要声明为volatile,以保持多线程环境下可见性(所谓可见性,就是当一个线程修改共享对象的状态变量后,另一个线程 可以马上看到修改的结果)。下面是一个来自《java并发编程实践》的例子:
main中启动一个素数生成的任务,线程运行一秒就取消掉。通过线程中的cancelled变量来表征任务是否继续执行。既然是最简单的策略,那么什么是 例外情况?显然,阻塞操作下(比如调用join,wait,sleep方法),这样的策略会出问题。任务因为调用这些阻塞方法而被阻塞,它将不会去检查 volatile变量,导致取消操作失效。那么解决办法是什么?中断!考虑我们用BlockingQueue去保存生成的素数, BlockingQueue的put方法是阻塞的(当BlockingQueue满的时候,put操作会阻塞直到有元素被take),让我们看看不采用中 断,仍然采用简单策略会出现什么情况:
我们在main中通过queue.take来消费产生的素数(虽然仅仅是取出扔掉),我们只消费了1000个素数,然后尝试取消产生素数的任务,很遗憾, 取消不了,因为产生素数的线程产生素数的速度大于我们消费的速度,我们在消费1000后就停止消费了,那么任务将被queue的put方法阻塞,永远也不 会去判断cancelled状态变量,任务取消不了。正确的做法应当是使用中断(interrupt):
在run方法中,通过Thread的isInterrupted来判断interrupt status是否已经被修改,从而正确实现了任务的取消。关于interrupt,有一点需要特别说明,调用interrupt并不意味着必然停止目标线 程的正在进行的工作,它仅仅是传递一个请求中断的信号给目标线程,目标线程会在下一个方便的时刻中断。而对于阻塞方法产生的 InterruptedException的处理,两种选择:要么重新抛出让上层代码来处理,要么在catch块中调用Thread的interrupt 来保存中断状态。除非你确定要让工作线程终止(如上所示代码),否则不要仅仅是catch而不做任务处理工作(生吞了 InterruptedException),更详细可以参考这里。如果不清楚外部线程的中断策略,生搬硬套地调用interrupt可能产生不可预料的后果,可参见书中7.1.4例子。
另外一个取消任务的方法就是采用Future来管理任务,这是JDK5引入的,用于管理任务的生命周期,处理异常等。比如调用ExecutorService的sumit方法会返回一个Future来描述任务,而Future有一个cancel方法用于取消任务。
那么,如果任务调用了不可中断的阻塞方法,比如Socket的read、write方法,java.nio中的同步I/O,那么该怎么处理呢?简单地,关闭它们!参考下面的例子:
Reader线程重写了interrupt方法,其中调用了socket的close方法用于中断read方法,最后,又调用了super.interrupt(),防止当调用可中断的阻塞方法时不能正常中断。
java 代码
- package net.rubyeye.concurrency.chapter7;
- import java.math.BigInteger;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.TimeUnit;
- public class PrimeGenerator implements Runnable {
- private final List
primes = new ArrayList(); - private volatile boolean cancelled;
- public void run() {
- BigInteger p = BigInteger.ONE;
- while (!cancelled) {
- p = p.nextProbablePrime();
- synchronized (this) {
- primes.add(p);
- }
- }
- }
- public void cancel() {
- cancelled = true;
- }
- public synchronized List
get() { - return new ArrayList
(primes); - }
- public static void main(String args[]) throws InterruptedException {
- PrimeGenerator generator = new PrimeGenerator();
- new Thread(generator).start();
- try {
- TimeUnit.SECONDS.sleep(1);
- } finally {
- generator.cancel();
- }
- }
- }
main中启动一个素数生成的任务,线程运行一秒就取消掉。通过线程中的cancelled变量来表征任务是否继续执行。既然是最简单的策略,那么什么是 例外情况?显然,阻塞操作下(比如调用join,wait,sleep方法),这样的策略会出问题。任务因为调用这些阻塞方法而被阻塞,它将不会去检查 volatile变量,导致取消操作失效。那么解决办法是什么?中断!考虑我们用BlockingQueue去保存生成的素数, BlockingQueue的put方法是阻塞的(当BlockingQueue满的时候,put操作会阻塞直到有元素被take),让我们看看不采用中 断,仍然采用简单策略会出现什么情况:
java 代码
- package net.rubyeye.concurrency.chapter7;
- import java.math.BigInteger;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.TimeUnit;
- public class BrokenPrimeProducer extends Thread {
- static int i = 1000;
- private final BlockingQueue
queue; - private volatile boolean cancelled = false;
- BrokenPrimeProducer(BlockingQueue
queue) { - this.queue = queue;
- }
- public void run() {
- BigInteger p = BigInteger.ONE;
- try {
- while (!cancelled) {
- p = p.nextProbablePrime();
- queue.put(p);
- }
- } catch (InterruptedException cusumed) {
- }
- }
- public void cancel() {
- this.cancelled = false;
- }
- public static void main(String args[]) throws InterruptedException {
- BlockingQueue
queue = new LinkedBlockingQueue ( - 10);
- BrokenPrimeProducer producer = new BrokenPrimeProducer(queue);
- producer.start();
- try {
- while (needMorePrimes())
- queue.take();
- } finally {
- producer.cancel();
- }
- }
- public static boolean needMorePrimes() throws InterruptedException {
- boolean result = true;
- i--;
- if (i == 0)
- result = false;
- return result;
- }
- }
我们在main中通过queue.take来消费产生的素数(虽然仅仅是取出扔掉),我们只消费了1000个素数,然后尝试取消产生素数的任务,很遗憾, 取消不了,因为产生素数的线程产生素数的速度大于我们消费的速度,我们在消费1000后就停止消费了,那么任务将被queue的put方法阻塞,永远也不 会去判断cancelled状态变量,任务取消不了。正确的做法应当是使用中断(interrupt):
java 代码
- package net.rubyeye.concurrency.chapter7;
- import java.math.BigInteger;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.TimeUnit;
- public class PrimeProducer extends Thread {
- static int i = 1000;
- private final BlockingQueue
queue; - private volatile boolean cancelled = false;
- PrimeProducer(BlockingQueue
queue) { - this.queue = queue;
- }
- public void run() {
- BigInteger p = BigInteger.ONE;
- try {
- while (!Thread.currentThread().isInterrupted()) {
- p = p.nextProbablePrime();
- queue.put(p);
- }
- } catch (InterruptedException cusumed) {
- }
- }
- public void cancel() {
- interrupt();
- }
- public static void main(String args[]) throws InterruptedException {
- BlockingQueue
queue = new LinkedBlockingQueue ( - 10);
- PrimeProducer producer = new PrimeProducer(queue);
- producer.start();
- try {
- while (needMorePrimes())
- queue.take();
- } finally {
- producer.cancel();
- }
- }
- public static boolean needMorePrimes() throws InterruptedException {
- boolean result = true;
- i--;
- if (i == 0)
- result = false;
- return result;
- }
- }
在run方法中,通过Thread的isInterrupted来判断interrupt status是否已经被修改,从而正确实现了任务的取消。关于interrupt,有一点需要特别说明,调用interrupt并不意味着必然停止目标线 程的正在进行的工作,它仅仅是传递一个请求中断的信号给目标线程,目标线程会在下一个方便的时刻中断。而对于阻塞方法产生的 InterruptedException的处理,两种选择:要么重新抛出让上层代码来处理,要么在catch块中调用Thread的interrupt 来保存中断状态。除非你确定要让工作线程终止(如上所示代码),否则不要仅仅是catch而不做任务处理工作(生吞了 InterruptedException),更详细可以参考这里。如果不清楚外部线程的中断策略,生搬硬套地调用interrupt可能产生不可预料的后果,可参见书中7.1.4例子。
另外一个取消任务的方法就是采用Future来管理任务,这是JDK5引入的,用于管理任务的生命周期,处理异常等。比如调用ExecutorService的sumit方法会返回一个Future来描述任务,而Future有一个cancel方法用于取消任务。
那么,如果任务调用了不可中断的阻塞方法,比如Socket的read、write方法,java.nio中的同步I/O,那么该怎么处理呢?简单地,关闭它们!参考下面的例子:
java 代码
- package net.rubyeye.concurrency.chapter7;
- import java.io.IOException;
- import java.io.InputStream;
- import java.net.Socket;
- /**
- * 展示对于不可中断阻塞的取消任务 通过关闭socket引发异常来中断
- *
- * @author Admin
- *
- */
- public abstract class ReaderThread extends Thread {
- private final Socket socket;
- private final InputStream in;
- public ReaderThread(Socket socket) throws IOException {
- this.socket = socket;
- this.in = socket.getInputStream();
- }
- // 重写interrupt方法
- public void interrupt() {
- try {
- socket.close();
- } catch (IOException e) {
- } finally {
- super.interrupt();
- }
- }
- public void run() {
- try {
- byte[] buf = new byte[1024];
- while (true) {
- int count = in.read(buf);
- if (count < 0)
- break;
- else if (count > 0)
- processBuff(buf, count);
- }
- } catch (IOException e) {
- }
- }
- public abstract void processBuff(byte[] buf, int count);
- }
Reader线程重写了interrupt方法,其中调用了socket的close方法用于中断read方法,最后,又调用了super.interrupt(),防止当调用可中断的阻塞方法时不能正常中断。
评论
yufanmaster
2008-02-17
代码有问题,第二个类BrokenPrimeProducer的cancel方法里应该改为:
this.cancelled = true;线程BrokenPrimeProducer会去判断cancelled状态变量的
this.cancelled = true;线程BrokenPrimeProducer会去判断cancelled状态变量的
Godlikeme
2008-02-03
文章写的很不错,线程可中断实现是非常麻烦的事情,通过volatile标志和中断都可能有问题。java concurrent practice 和 java 多线程设计模式都有讲解,其中比较赞同volatile和interrupt中断同时应用来保证cancel成功。
其他的关于有人评灌水有些时候是很讨厌的事情,真的怀疑投票得人弄清楚问题是什么没有。
其他的关于有人评灌水有些时候是很讨厌的事情,真的怀疑投票得人弄清楚问题是什么没有。
kris_xu
2008-02-03
hellsing42 写道
我前两天解决过这个问题,给你写了解决办法,结果被可笑的弄成灌水帖子,这个问题我研究了3天才弄明白.你自己在我博客上找吧,有个StreamGobbler的方法,用那个就不会无限 waitFor()再次bs下javaeye.还有有问题可以去javaworld上问我现在一般都去那里了.这里已经走上歧途
看来大家的见解基本相同啊。
hellsing42
2007-09-05
我前两天解决过这个问题,给你写了解决办法,结果被可笑的弄成灌水帖子,这个问题我研究了3天才弄明白.你自己在我博客上找吧,有个StreamGobbler的方法,用那个就不会无限 waitFor()再次bs下javaeye.还有有问题可以去javaworld上问我现在一般都去那里了.这里已经走上歧途
dennis_zane
2007-09-05
调用阻塞方法(join,wait,sleep),那么当收到中断信号,会抛出InterruptedException,离开阻塞状态;而如果没有调用 这些阻塞方法,中断信号将可以被忽略,当然你可以通过Thread.currentThread().isInterrupted()判断状态来进行手工 处理。
Groovy
2007-09-04
这个里面如果子线程里面在接收到中断后(interrupt status被改变),如果一直没有阻塞方法 此任务是不是会一直执行下去呢?
也就是说这个"下一个方便的时刻"是不是就是指阻塞方法被调用时呢?
在run方法中,通过Thread的isInterrupted来判断interrupt status是否已经被修改,从而正确实现了任务的取消。关于interrupt,有一点需要特别说明,调用interrupt并不意味着必然停止目标线 程的正在进行的工作,它仅仅是传递一个请求中断的信号给目标线程,目标线程会在下一个方便的时刻中断。而对于阻塞方法产生的 InterruptedException的处理,两种选择:要么重新抛出让上层代码来处理,要么在catch块中调用Thread的interrupt 来保存中断状态。除非你确定要让工作线程终止(如上所示代码),否则不要仅仅是catch而不做任务处理工作(生吞了 InterruptedException),更详细可以参考这里。如果不清楚外部线程的中断策略,生搬硬套地调用interrupt可能产生不可预料的后果,可参见书中7.1.4例子。
也就是说这个"下一个方便的时刻"是不是就是指阻塞方法被调用时呢?
在run方法中,通过Thread的isInterrupted来判断interrupt status是否已经被修改,从而正确实现了任务的取消。关于interrupt,有一点需要特别说明,调用interrupt并不意味着必然停止目标线 程的正在进行的工作,它仅仅是传递一个请求中断的信号给目标线程,目标线程会在下一个方便的时刻中断。而对于阻塞方法产生的 InterruptedException的处理,两种选择:要么重新抛出让上层代码来处理,要么在catch块中调用Thread的interrupt 来保存中断状态。除非你确定要让工作线程终止(如上所示代码),否则不要仅仅是catch而不做任务处理工作(生吞了 InterruptedException),更详细可以参考这里。如果不清楚外部线程的中断策略,生搬硬套地调用interrupt可能产生不可预料的后果,可参见书中7.1.4例子。
发表评论
提醒: 该博客已发表在公共论坛,博客所有留言会成为论坛回贴,留言请注意遵守论坛发贴规则
- 浏览: 145393 次
- 性别:

- 来自: 广州

- 详细资料
搜索本博客
最新评论
-
最近的学习和工作
楼主住在棠下。学的一些技术我都没有做过 不过ruby 还是会一点点的
-- by penghao122 -
PL/SQL学习笔记(五)
ELSEIF不对,应该是ELSIF
-- by gmizr -
oracle table-lock的5种 ...
select for update 应该是row share mode的锁, 也 ...
-- by xiaoxiao1984 -
oracle table-lock的5种 ...
如果允许别的session查询或用select for update锁定记录,不 ...
-- by xiaoxiao1984 -
Hadoop分布式文件系统:架 ...
beijing.josh 写道dennis_zane 写道sunhengxin ...
-- by dogstar






评论排行榜