1.前言

FutureTask表示异步任务(即当一个线程需要等待另一个线程把某个任务执行完后它才能继续执行,)实现了RunnableFuture接口,而RunnableFuture又继承了Runnable接口和Future接口。

image.png

2.Future接口

Future接口被设计用来代表一个异步操作的执行结果。你可以用它来获取一个操作的执行结果、取消一个操作、判断一个操作是否已经完成或者是否被取消。

method description
get() 获取执行结果,如果任务还在执行中,就阻塞等待
get(long timeout, TimeUnit unit) 指定等待的时间, 如果指定时间内任务没有完成, 则会抛出TimeoutException异常
cancel(boolean mayInterruptIfRunning) 尝试取消一个任务的执行,它的返回值是boolean类型,表示取消操作是否成功
isCancelled() 判断任务是否被取消了。如果一个任务在正常执行完成之前被cancel掉了, 则返回true
isDone() 如果一个任务已经结束,则返回true。注意, 这里的任务结束包含了以下三种情况:正常执行完毕、抛出异常、被取消

3.RunnableFuture接口

RunnableFuture接口同时实现了Runnable接口和Future接口。

1
2
3
4
5
6
7
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

问:既然已经继承了Runnable,该接口自然就继承了run方法,为什么要在该接口的内部再写一个run方法?
答:单纯从理论上来说,这里确实是没有必要的,再多写一遍,应该是为了看上去直观一点,便于文档或者UML图展示。

4.FutureTask

4.1.核心属性

(1)state
FutureTask中状态由state属性来表示,它是由volatile修饰的,确保了不同线程对它修改的可见性。state的值代表了任务在运行过程中的状态,总共有7种状态:包括了1个初始态(NEW),2个中间态(COMPLETING、INTERRUPTING)和4个终止态(NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED)。

1
2
3
4
5
6
7
8
private volatile int state;
private static final int NEW = 0;// 初始状态
private static final int COMPLETING = 1;// 正在设置任务结果
private static final int NORMAL = 2;// 任务正常执行完毕
private static final int EXCEPTIONAL = 3;// 任务执行过程中发生异常
private static final int CANCELLED = 4;// 任务被取消
private static final int INTERRUPTING = 5;// 正在中断运行任务的线程
private static final int INTERRUPTED = 6;// 任务被中断

(2)waiter
waiters表示等待队列的头结点。同一时刻可能有多个线程都在获取该任务的执行结果,如果该任务还在执行过程中,那么这些线程就要进入等待队列中挂起,知道任务执行完毕被唤醒。
1
private volatile WaitNode waiters;

FutureTask中的等待队列是一个单向链表。
1
2
3
4
5
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}

(3)runner
runner属性表示执行FutureTask中的Task的线程。
问:为什么需要一个属性来记录执行任务的线程呢?
答:这是为了中断或取消任务做准备的,只有知道执行任务的线程是谁,我们才能去中断它。

1
private volatile Thread runner;

(4)callable
callable表示执行的任务本省
1
private Callable<V> callable;

(5)outcome
outcome表示任务的执行结果或抛出的异常,outcome可以是任意类型的对象,所以当我们将正常的执行结果返回给调用者时,需要进行强制类型转换,返回由Callable定义的V类型。
1
private Object outcome; 

4.2.构造函数

FutureTask共有2个构造函数,这2个构造函数一个是直接传入Callable对象,一个是传入一个Runnable对象和一个指定的result,然后通过Executors工具类将它适配成callable对象,所以这两个构造函数的本质是一样的:

  1. 将传入的参数初始化为callable成员变量
  2. FutureTask的状态设为NEW
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public FutureTask(Callable<V> callable) {
    if (callable == null)
    throw new NullPointerException();
    this.callable = callable;
    this.state = NEW; // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
    // 之前在线程池源码中分析过
    this.callable = Executors.callable(runnable, result);
    this.state = NEW; // ensure visibility of callable
    }

    4.3.核心方法

    4.3.1.run

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    public void run() {
    // 初始化runner
    if (state != NEW ||
    !RUNNER.compareAndSet(this, null, Thread.currentThread()))
    return;
    try {
    Callable<V> c = callable;
    if (c != null && state == NEW) {
    V result;
    boolean ran;
    try {
    result = c.call();
    ran = true;
    } catch (Throwable ex) {
    result = null;
    ran = false;
    // 发生异常
    setException(ex);
    }
    // 正常结束
    if (ran)
    set(result);
    }
    } finally {
    // runner must be non-null until state is settled to
    // prevent concurrent calls to run()
    runner = null;
    // state must be re-read after nulling runner to prevent
    // leaked interrupts
    int s = state;
    // 如果发生过中断
    if (s >= INTERRUPTING)
    handlePossibleCancellationInterrupt(s);
    }
    }
    set(result)方法:一开始通过CAS操作将state属性由原来的NEW修改为COMPLETING,COMPLETING是一个非常短暂的中间态,表示正在设置执行的结果。状态设置成功后,就把任务执行结果赋值给outcome,然后直接把state状态设置成NORMAL。
    1
    2
    3
    4
    5
    6
    7
    8
    protected void set(V v) {
    // 将state状态由NEW改为COMPLETING
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {
    outcome = v;
    STATE.setRelease(this, NORMAL); // final state
    finishCompletion();
    }
    }
    setException(ex):将执行结果outcome设置为异常t,将任务状态设置为EXCEPTIONAL。
    1
    2
    3
    4
    5
    6
    7
    protected void setException(Throwable t) {
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {
    outcome = t;
    STATE.setRelease(this, EXCEPTIONAL); // final state
    finishCompletion();
    }
    }
    finishCompletion()方法:无论是正常执行完毕还是发生异常,最后都会调用该方法。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
    // waiters设为null
    if (WAITERS.weakCompareAndSet(this, q, null)) {
    // 遍历等待队列,唤醒所有等待的线程
    for (;;) {
    Thread t = q.thread;
    if (t != null) {
    q.thread = null;
    LockSupport.unpark(t);
    }
    WaitNode next = q.next;
    if (next == null)
    break;
    q.next = null; // unlink to help gc
    q = next;
    }
    break;
    }
    }
    // 钩子函数,由子类覆写,以是实现一些任务执行结束前的额外操作
    done();

    callable = null; // to reduce footprint
    }

    // 这是一个空方法
    protected void done() { }
    疑问:前面已经执行过的set方法或者setException方法不是已经将state状态设置成NORMAL或者EXCEPTIONAL了吗?怎么会出现INTERRUPTING或者INTERRUPTED状态呢?
    答:在多线程的环境中,当前线程执行run方法的同时,有可能其他线程取消了任务的执行,此时其他线程就可能对state状态进行改写。

handlePossibleCancellationInterrupt()方法:该方法是一个自旋操作,如果当前的state状态是INTERRUPTING,则在原地自旋,直到state状态转换成终止态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt

// assert state == INTERRUPTED;

// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}


至此,run方法的分析就真的结束了。我们来总结一下:
run方法重点做了以下几件事:

  1. 将runner属性设置成当前正在执行run方法的线程
  2. 调用callable成员变量的call方法来执行任务
  3. 设置执行结果outcome,如果执行成功,则outcome保存的就是执行结果;如果执行过程中发生了异常, 则outcome中保存的就是异常,设置结果之前,先将state状态设为中间态
  4. 对outcome的赋值完成后,设置state状态为终止态(NORMAL或者EXCEPTIONAL)
  5. 唤醒等待队列中所有等待的线程
  6. 善后清理(waiters、callable、runner设为null)
  7. 检查是否有遗漏的中断,如果有,等待中断状态完成。

“state只要不是NEW状态,就说明任务已经执行完成了”就体现在这里,因为run方法中,我们是在c.call()执行完毕或者抛出了异常之后才开始设置中间态和终止态的。

4.3.2.cancel(boolean mayInterruptIfRunning)

既然上面在分析run方法的最后,我们提到了任务可能被别的线程取消,那我们就趁热打铁,看看怎么取消一个任务的执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW && STATE.compareAndSet
(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
STATE.setRelease(this, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}


首先有以下三种情况之一的,cancel操作一定是失败的:

  1. 任务已经执行完成了
  2. 任务已经被取消过了
  3. 任务因为某种原因不能被取消

其它情况下,cancel操作将返回true。值得注意的是,cancel操作返回true并不代表任务真的就是被取消了,这取决于发动cancel状态时,任务所处的状态:

  1. 如果发起cancel时任务还没有开始运行,则随后任务就不会被执行;
  2. 如果发起cancel时任务已经在运行了,则这时就需要看mayInterruptIfRunning参数了:
    • 如果mayInterruptIfRunning 为true, 则当前在执行的任务会被中断
    • 如果mayInterruptIfRunning 为false, 则可以允许正在执行的任务继续运行,直到它执行完

我们来看看FutureTask是怎么实现cancel方法的这几个规范的:
首先,对于“任务已经执行完成了或者任务已经被取消过了,则cancel操作一定是失败的(返回false)”这两条,是通过简单的判断state值是否为NEW实现的,因为我们前面说过了,只要state不为NEW,说明任务已经执行完毕了。从代码中可以看出,只要state不为NEW,则直接返回false。

如果state还是NEW状态,我们再往下看:STATE.compareAndSet (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))
这一段是根据mayInterruptIfRunning的值将state的状态由NEW设置成INTERRUPTING或者CANCELLED,当这一操作也成功之后,就可以执行后面的try语句了,但无论怎么,该方法最后都返回了true。

实际上,cancel方法实际上完成以下两种状态转换之一:

  1. NEW -> CANCELLED (对应于mayInterruptIfRunning=false)
  2. NEW -> INTERRUPTING -> INTERRUPTED (对应于mayInterruptIfRunning=true)

对于第一条路径,虽说cancel方法最终返回了true,但它只是简单的把state状态设为CANCELLED,并不会中断线程的执行。但是这样带来的后果是,任务即使执行完毕了,也无法设置任务的执行结果,因为前面分析run方法的时候我们知道,设置任务结果有一个中间态,而这个中间态的设置,是以当前state状态为NEW为前提的。

对于第二条路径,则会中断执行任务的线程。虽然第二条路径中断了当前正在执行的线程,但是,响不响应这个中断是由执行任务的线程自己决定的,更具体的说,这取决于c.call()方法内部是否对中断进行了响应,是否将中断异常抛出。
那call方法中是怎么处理中断的呢?从run的代码中可以看出,catch语句处理了所有的Throwable的异常,这自然也包括了中断异常。
然而,值得一提的是,即使这里进入了catch (Throwable ex){}代码块,setException(ex)的操作一定是失败的,因为在我们取消任务执行的线程中,我们已经先把state状态设为INTERRUPTING了,而setException(ex)的操作要求设置前线程的状态为NEW。所以这里响应cancel方法所造成的中断最大的意义不是为了对中断进行处理,而是简单的停止任务线程的执行,节省CPU资源。
那读者可能会问了,既然这个setException(ex)的操作一定是失败的,那放在这里有什么用呢?事实上,这个setException(ex)是用来处理任务自己在正常执行过程中产生的异常的,在我们没有主动去cancel任务时,任务的state状态在执行过程中就会始终是NEW,如果任务此时自己发生了异常,则这个异常就会被setException(ex)方法成功的记录到outcome中。
反正无论如何,run方法最终都会进入finally块,而这时候它会发现s >= INTERRUPTING,如果检测发现s = INTERRUPTING,说明cancel方法还没有执行到中断当前线程的地方,那就等待它将state状态设置成INTERRUPTED。到这里,对cancel方法的分析就和上面对run方法的分析对接上了。
cancel方法到这里就分析完了,如果你一条条的去对照Future接口对于cancel方法的规范,它每一条都是实现了的,而它实现的核心机理,就是对state的当前状态的判断和设置。由此可见,state属性是贯穿整个FutureTask的最核心的属性。

4.3.3.isCancelled()

1
2
3
public boolean isCancelled() {
return state >= CANCELLED;
}

那么state >= CANCELLED 包含了那些状态呢,它包括了: CANCELLED、INTERRUPTING、INTERRUPTED

4.3.4.isDone()

与 isCancelled方法类似,isDone方法也是简单地通过state状态来判断。关于这一点,其实我们之前已经说过了,只要state状态不是NEW,则任务已经执行完毕了,因为state状态不存在类似“任务正在执行中”这种状态,即使是短暂的中间态,也是发生在任务已经执行完毕,正在设置任务结果的时候。

1
2
3
public boolean isDone() {
return state != NEW;
}

4.3.5.get()

最后我们来看看获取执行结果的get方法,先来看看无参的版本:

1
2
3
4
5
6
7
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}


该方法其实很简单,当任务还没有执行完毕或者正在设置执行结果时,我们就使用awaitDone方法等待任务进入终止态,注意,awaitDone的返回值是任务的状态,而不是任务的结果。任务进入终止态之后,我们就根据任务的执行结果来返回计算结果或者抛出异常。

在具体分析它的源码之前,有一点我们先特别说明一下,FutureTask中会涉及到两类线程,一类是执行任务的线程,它只有一个,FutureTask的run方法就由该线程来执行;一类是获取任务执行结果的线程,它可以有多个,这些线程可以并发执行,每一个线程都是独立的,都可以调用get方法来获取任务的执行结果。如果任务还没有执行完,则这些线程就需要进入等待队列中挂起,直到任务执行结束,或者等待的线程自身被中断。

awaitDone方法:该方法是获取任务结果最核心的方法,它完成了获取结果,挂起线程,响应中断等诸多操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// The code below is very delicate, to achieve these goals:
// - call nanoTime exactly once for each call to park
// - if nanos <= 0L, return promptly without allocation or nanoTime
// - if nanos == Long.MIN_VALUE, don't underflow
// - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
// and we suffer a spurious wakeup, we will do no worse than
// to park-spin for a while
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
// 任务已经进入终止态,就直接返回任务状态
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 任务正在设置执行结果,我们就让出当前线程让出CPU的使用权,因为最重要的部分已经完成了
else if (s == COMPLETING)
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
// 当前线程被中断了
else if (Thread.interrupted()) {
// 将线程从等待队列中移除
removeWaiter(q);
throw new InterruptedException();
}
// 当跟前线程还没有进入等待队列,则新建一个等待队列
else if (q == null) {
// 如果已经超时就无需入队了
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
// 头插法入队
else if (!queued)
// 这个CAS操作就是为了保证同一时刻如果有多个线程在同时入栈,则只有一个能够操作成功
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
// 在任务没有执行完毕的情况下,获取任务执行结果的线程就会在Treiber栈中
LockSupport.park(this);
}
}

removeWaiter方法:将参数中的node从等待队列(即Treiber栈)中移除,被中断的线程就不用在等待队列中等待了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void removeWaiter(WaitNode node) {
if (node != null) {
// 将node中的线程引用置为null
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
// 排除掉thread为null的node
if (q.thread != null)
pred = q;
// 要移除的元素不在栈顶
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
// 要移除的元素在栈顶
else if (!WAITERS.compareAndSet(this, q, s))
continue retry;
}
break;
}
}
}


那么这个挂起的线程什么时候会被唤醒呢?有两种情况:

  1. 任务执行完毕了,在finishCompletion方法中会唤醒所有在Treiber栈中等待的线程
  2. 等待的线程自身因为被中断等原因而被唤醒。

我们接下来就继续看看线程被唤醒后的情况,此时,线程将回到for(;;)循环的开头,继续下一轮循环。至此我们知道,除非被中断,否则get方法会在原地自旋等待(用的是Thread.yield,对应于s == COMPLETING)或者直接挂起(对应任务还没有执行完的情况),直到任务执行完成。而我们前面分析run方法和cancel方法的时候知道,在run方法结束后,或者cancel方法取消完成后,都会调用finishCompletion()来唤醒挂起的线程,使它们得以进入下一轮循环,获取任务执行结果。
最后,等awaitDone函数返回后,get方法返回了report(s),以根据任务的状态,汇报执行结果:

1
2
3
4
5
6
7
8
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

4.3.6.get(long timeout, TimeUnit unit)

最后我们来看看带超时版本的get方法:

1
2
3
4
5
6
7
8
9
10
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}

它和上面不带超时时间的get方法很类似,只是在awaitDone方法中多了超时检测。如果指定的超时时间到了,则直接返回,如果返回时,任务还没有进入终止状态,则直接抛出TimeoutException异常,否则就像get()方法一样,正常的返回执行结果。

5.总结

FutureTask实现了Runnable和Future接口,它表示了一个带有任务状态和任务结果的任务,它的各种操作都是围绕着任务的状态展开的,值得注意的是,在所有的7个任务状态中,只要不是NEW状态,就表示任务已经执行完毕或者不再执行了,并没有表示“任务正在执行中”的状态
除了代表了任务的Callable对象、代表任务执行结果的outcome属性,FutureTask还包含了一个代表所有等待任务结束的线程的Treiber栈,这一点其实和各种锁的等待队列特别像,即如果拿不到锁,则当前线程就会被扔进等待队列中;这里则是如果任务还没有执行结束,则所有等待任务执行完毕的线程就会被扔进Treiber栈中,直到任务执行完毕了,才会被唤醒。
FutureTask虽然为我们提供了获取任务执行结果的途径,遗憾的是,在获取任务结果时,如果任务还没有执行完成,则当前线程会自旋或者挂起等待,这和我们实现异步的初衷是相违背的,我们后面将继续介绍另一个同步工具类CompletableFuture,它解决了这个问题。


参考文章:
FutureTask源码解析(1)——预备知识
FutureTask源码解析(2)——深入理解FutureTask