五、ThreadPoolExecutor

ThreadPoolExecutor 是 JDK 中的线程池实现,这个类实现了一个线程池需要的各个方法,它实现了任务提交、线程管理、监控等方法。我们可以基于它来进行业务上的扩展,以实现我们需要的其他功能,比如实现定时任务的ScheduledThreadPoolExecutor 就继承自 ThreadPoolExecutor

线程池是一种生产者-消费者模式,线程池的使用方式生产者,线程池本身是消费者。

img

写在前面:很多人认为线程池的源码复杂,其实核心逻辑就是上面这幅图展示的,它的复杂在于需要随时随地对线程池的状态进行判断、对线程池中线程数量进行判断、对任务队列是否为空进行判断等等,因为线程池通常是被多个线程同时调用的,很有可能其中某个线程进行了shutdown()shutdownNow()等操作,甚至在线程池运行期间通过setCorePoolSize()setMaximumPoolSize()setKeepAliveTime()allowsCoreThreadTimeOut()等方法动态调整参数,而这些都是需要线程池”时刻提防”的,所以才有了看起来很复杂的判断逻辑,当我们在遇到它们的时候,如果带着上面的思考再去考虑可能遇到的各种情况,或许就豁然开朗了。还有一点,在代码的关键部分保留了英文注释,可以结合着讲解一些看,它是我们读懂作者设计思路的一手资源。

1.构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
// 核心线程数
this.corePoolSize = corePoolSize;
// 最大线程数,线程池允许创建的最大线程数
this.maximumPoolSize = maximumPoolSize;
// 任务队列
this.workQueue = workQueue;
// 非核心线程获取任务时的等待时间,超时后就会被销毁
// 当然,也可以通过调用 allowCoreThreadTimeOut(true) 使核心线程数内的线程也可以被回收。
this.keepAliveTime = unit.toNanos(keepAliveTime);
// 线程工厂,它用来生产一组相同任务的线程,线程池的命名是通过给这个 factory 增加组名前缀来实现的。
// 在虚拟机栈分析时,就可以知道线程任务是有哪个线程工厂生产的。
this.threadFactory = threadFactory;
// 拒绝策略
this.handler = handler;
}

上面的构造方法是ThreadPoolExecutor中最全的一个构造方法。这里我们重点说一下threadFactory这个参数。如果在构造函数中不传threadFactory,默认使用的是Executors.defaultThreadFactory(),实现如下,我们在实际应用中通常需要根据自己的业务,重新定义线程工厂,为线程指定有意义的名称和序列号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Executors {
// 静态工厂方法,返回线程池默认的线程工厂实现
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}

static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}

RejectedExecutionHandler表示拒绝策略,它的执行时机是当阻塞队列已满且没有空闲的线程(包括核心和非核心)时对新提交过来的任务的执行策略。ThreadPoolExecutor内部实现了四种拒绝策略,它们是四个公共的静态内部类,都实现了RejectedExecutionHandler这个接口。由于内建的这几种策略都有各自的局限性,所以我们在工作中一般会制定自己的拒绝策略。下面我们看一下内建的四种拒绝策略:

  • AbortPolicy:默认的拒绝策略,直接抛出异常
  • DiscardPolicy:忽略提交的任务(里面是个空实现)
  • DiscardOldestPolicy:抛弃最老的任务,通过poll()方法取出任务队列队头的任务抛弃,然后提交当期任务
  • CallerRunsPolicy:让调用者线程执行当前被拒绝任务,这样原来的异步调用会退化为同步调用,所以我们一般不推荐这么做。
    1
    2
    3
    public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    public class ThreadPoolExecutor extends AbstractExecutorService {

    // 默认的拒绝策略
    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

    public static class AbortPolicy implements RejectedExecutionHandler {

    public AbortPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    // 直接抛出异常
    throw new RejectedExecutionException("Task " + r.toString() +
    " rejected from " +
    e.toString());
    }
    }

    public static class DiscardPolicy implements RejectedExecutionHandler {

    public DiscardPolicy() { }

    // 忽略任务 do nothing
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
    }
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {

    public DiscardOldestPolicy() { }

    // 抛弃队头任务,提交当前任务
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
    e.getQueue().poll();
    e.execute(r);
    }
    }
    }

    public static class CallerRunsPolicy implements RejectedExecutionHandler {

    public CallerRunsPolicy() { }

    // 让当前调用Executor#execute()的线程直接调用任务Runnable#run()
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
    r.run();
    }
    }
    }
    }

2.线程池状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// ctl存储线程池状态和线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer 共有 32 位,低 29 位表示工作线程数,高 3 位表示线程池状态
private static final int COUNT_BITS = Integer.SIZE - 3;
// 最大线程数 000-11111 11111111 11111111 11111111
// 这里得到 29 个 1,用法类似于子网掩码,用于位与运算
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

// -1的补码(32个1): 111-11111 11111111 11111111 11111111
// 所以左移32位结果: 111-00000 00000000 00000000 00000000
// 此状态表示线程池能够接受新任务
private static final int RUNNING = -1 << COUNT_BITS;
// 000-00000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001-00000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS;
// 010-00000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS;
// 011-00000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS;

// CAPACITY 按位取反,高 3 变为 1,低 29 位变为 0
// 比如: 001-00000 00000000 00000000 00000011 表示当前有3个线程
// ~CAPACITY: 111-00000 00000000 00000000 00000000
// 按位与后: 001-00000 00000000 00000000 00000000 表示线程池处于 stop 状态
// 获取当前线程池的状态(高3位)
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
// 获取线程池中的线程数(低29位)
private static int workerCountOf(int c) { return c & COUNT_MASK; }
// 上面的操作的反向:根据运行状态和线程数获取 ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }

// 下面三个方法都是通过 ctl 来判断线程池状态
private static boolean runStateLessThan(int c, int s) {
return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}

private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

// 通过 CAS 使线程数 +1
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}

// 通过 CAS 增加线程数 -1
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}

/**
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
*/
// 线程数直接 -1
private void decrementWorkerCount() {
ctl.addAndGet(-1);
}

这里重点说一下compareAndDecrementWorkerCount()decrementWorkerCount()的区别:
decrementWorkerCount在JDK8中中是这样实现的:

1
2
3
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

前者会通过返回值来判断线程数 -1 操作是否成功(可能会失败),而后者是循环直到你 -1 成功!
线程池五种状态的十进制从小到大排序依次为:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED

  • RUNNING:表示正常的状态,接受新的任务,处理等待队列中的任务
  • SHUTDOWN:不接受新的任务提交,但会继续处理等待队列中的任务
  • STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程
  • TIDYING:所有任务都销毁了,线程数为0,会执行钩子方法 terminated()
  • TERMINATED:执行钩子方法 terminated() 完毕,线程池彻底关闭

线程池状态轮转图如下:
img

3.execute方法

execute() 方法是 ThreadPoolExecutor 的核心逻辑,主要负责任务的调度

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 获取ctl的值(之前说的那个表示 "线程池状态" 和 "线程数" 的整数)
int c = ctl.get();
// 如果线程数小于核心线程数,则创建新的核心线程并执行传入的任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
// 创建成功则返回
return;
// 创建失败,更新 ctl 的临时变量 c
c = ctl.get();
}
// 走到这里两种情况:
// 1.线程数 >= 核心线程数
// 2.addWorker 返回 false (线程池状态非 RUNNING)
// 所以如果线程池此时处于 RUNNING 状态(针对情况 1 ),则将任务加入任务队列
if (isRunning(c) && workQueue.offer(command)) {
// double-check,重新获取线程池的状态
int recheck = ctl.get();
// 如果线程池此时变为非 RUNNING 状态,则从任务队列中删除任务(roll back)
if (! isRunning(recheck) && remove(command))
// 执行拒绝策略处理任务
reject(command);
// 走到这里有两种情况:
// 1.线程池处于 RUNNING 状态,线程数 >= 核心线程数
// 2.线程池处于非 RUNNING 状态,但是上面 remove 失败,
// 失败的原因可能是任务在 remove 之前已经出队被执行,这种情况我们无需关心
else if (workerCountOf(recheck) == 0)
// 对于 RUNNING ,如果当前线程数量为 0,则创建一个非核心线程并且传入的任务对象为 null,
// 这里传入 null 是是因为任务已经在任务队列中
// 所以这块代码的真正意图是:担心任务提交到任务队列中了,但是此时线程数为0(可能设置了allowCoreThreadTimeOut=true)
// 那必须有一个线程来执行任务,可以理解为一种兜底机制
addWorker(null, false);
// 最后的else分支(假设有,哈哈)对应的情况,线程数不为 0,任务也已经成功入队,所以什么也不用做
}
// 走到这里两种情况:
// 1.线程池处于非 RUNNING 状态,这时调用 addWorker 肯定返回 false
// 2.线程池处于 RUNNING 状态,线程数 >= 核心线程数,且任务队列已满,这时需要创建非核心线程来执行该任务
else if (!addWorker(command, false))
// 新增失败则执行拒绝策略处理任务
// 失败原因:1.线程池处于非 RUNNING 状态 2.线程数超过最大线程数 maximumPoolSize
reject(command);
}

// 从任务队列中移除当前任务 task
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
// 因为调用此方法时线程池处于非运行状态,所以会尝试关闭线程线程池
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}

这里比较难理解的是为什么要double-check?
特意没有删了上面的英文注释,对照着我们来简单看一下第二点:
如果一个任务成功加入任务队列,我们依然需要二次检查是否需要添加一个工作线程(因为所有存活的工作线程有可能在最后一次检查之后已经终结(workerCountOf(recheck) == 0)或者执行当前方法(workQueue.offer(command))的时候线程池已经shut down了(! isRunning(recheck))。所以我们需要二次检查线程池的状态,必要时把任务从任务队列中移除或者在没有可用的工作线程的前提下新建一个工作线程(对应if (! isRunning(recheck) && remove(command))else if (workerCountOf(recheck) == 0)两个分支)

4.addWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// 参数:firstTask 为每个 Worker 新建时处理的第一个任务,无需从任务队列中取,
// 处理完第一个任务后 Worker 后续会通过 getTask() 方法从任务队列取任务执行
// 参数:core 表示是否创建核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
// 响应下文的 continue retry,快速退出多层嵌套循环
retry:
for (int c = ctl.get();;) {
// runStateAtLeast(c, SHUTDOWN):为了快速短路,线程池为 RUNNING 的话就不需要执行后面的判断
// (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())
// 1.STOP 即以上状态 ---->都快打扫完了就别添乱了
// 2.SHUTDOWN 状态且 firstTask 不为空 ----> 打烊了不接新客了
// 3.SHUTDOWN 状态且 firstTask 为空且任务队列为空 -----> 打烊了门口也没新客了店里的人也走完了
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
// 走到这里两种情况:
// 1.线程池为 RUNNING 状态
// 2.线程池为 SHUTDOWN 状态,提交任务firstTask为空,任务队列不为空,即 addWorker(null, false)
for (;;) {
// 1.创建的是核心线程且线程数量 >= 设定的核心线程数
// 2.创建的事非核心线程且线程数量 >= 设定的最大线程数
// 以上两种情况均 return false
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 走到这里说明可以创建线程了,CAS 操作线程数量 +1,成功 跳出循环执行后面语句
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
// 正常如果是 CAS 失败的话,进到下一个里层的for循环就可以了
// 可是如果是因为其他线程的操作,导致线程池的状态发生了变更,如有其他线程关闭了这个线程池
// 那么需要回到外层的for循环
// 所以这里判断只要线程池不是 RUNNING 状态,就回到最外层循环重新执行
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
// 走到这说明是 CAS 失败,存在多个线程同时执行 workCount+1,
// 所以只需内部自旋,直到每个线程都 workCount+1 成功
}
}

// 走到这里,我们认为当前这个时刻,可以开始创建线程来执行任务了
// 表示 worker 是否已经启动
boolean workerStarted = false;
// 表示 worker 是否创建成功
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
// 这里判断t != null是因为用户可以自定义ThreadFactory,
// 如果这里用户直接返回null或者因为一些谜之操作创建失败了都应该考虑到,所以需要判断一下
if (t != null) {
// 这里需要全局加锁,因为可能会并发修改一些线程池的指标值和hashset(worker 集合)
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 持有锁之后重新获取线程池状态,因为可能在获取锁的过程状态被其他线程更改
int c = ctl.get();

// 根据逻辑或的短路原则,这里有两种情况:
// 1.线程池状态为RUNNING
// 2.线程池状态为SHUTDOWN,并且传入的任务实例 firstTask 为 null,
// 因为在SHUTDOWN状态时不会再添加新的任务,但是可以继续处理workQueue中的任务
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 此时线程的状态为新建但没有启动,否则抛出异常
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
// 把新建的 worker 加入到 workers 集合中
workers.add(w);
workerAdded = true;
int s = workers.size();
// 更改线程池的峰值容量(历史的最大容量)
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
// 线程启动失败,需要从 workers 集合移除对应的 worker
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

// 回滚创建的worker
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
// 从 workers 集合中移除启动失败的 worker
workers.remove(w);
// 工作线程数量 -1
decrementWorkerCount();
// 基于状态判断尝试终结线程池
tryTerminate();
} finally {
mainLock.unlock();
}
}

6.Worker

Worker 是 Thread 的包装类Worker 继承自 AQS,这里使用了 AQS 的独占模式,构造 Worker 的时候,把 AQS 的资源(状态 state )通过 setState(-1) 设置为 -1,这是因为 Worker 实例刚创建时 AQSstate 的默认值为 0,此时线程尚未启动,不能在这个时候进行线程中断,见 Worker#interruptIfStarted() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

// 保存 ThreadFactory 创建的线程实例,如果 ThreadFactory 创建失败则为 null
final Thread thread;
// 保存 Runnable 任务实例
Runnable firstTask;
// 记录每个线程完成的任务总数
volatile long completedTasks;

// 构造函数,传入任务实例 firstTask,可以为 null
Worker(Runnable firstTask) {
// 禁止线程中断,直到 runWorker() 方法运行,看后面 interruptIfStarted() 方法的实现
setState(-1);
this.firstTask = firstTask;
// 通过 ThreadFactory 创建线程实例,注意一下 Worker 实例自身作为 Runnable 用于创建新的线程
this.thread = getThreadFactory().newThread(this);
}

// 委托到外部的 runWorker() 方法,注意 runWorker() 是线程池的方法,不是 Worker 的方法
public void run() {
runWorker(this);
}


// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
// 是否持有独占锁,state 值为 1 的时候表示持有锁,state 值为 0 的时候表示释放锁
protected boolean isHeldExclusively() {
return getState() != 0;
}

// 独占模式下尝试获取资源,这里没有判断传入的变量,直接 CAS 判断 0 更新为 1 是否成功,
// 成功则设置独占线程为当前线程
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

// 独占模式下尝试释放资源,这里没有判断传入的变量,直接把 state 设置为 0
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

// 加锁
public void lock() { acquire(1); }
// 尝试加锁
public boolean tryLock() { return tryAcquire(1); }
// 解锁
public void unlock() { release(1); }
// 当前 worker 是否被锁定
public boolean isLocked() { return isHeldExclusively(); }

// 线程启动后进行中断
// 中断条件:线程已经启动,并且中断标志位为 false
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

7.runWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
final void runWorker(Worker w) {
// 获取当前线程,实际上和 Worker 持有的线程实例是相同的
Thread wt = Thread.currentThread();
// 获取 Worker 持有的任务对象,存放在临时变量 task 中
Runnable task = w.firstTask;
w.firstTask = null;
// 由于 Worker 初始化时 AQS 中 state 值为 -1,这里要先做一次解锁把 state 置为0,允许线程中断
w.unlock(); // allow interrupts
// 记录线程是否因用户异常终结,默认 true
boolean completedAbruptly = true;
try {
// 任务对象不为 null,或者从任务队列获取任务不为空,
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果线程池正在停止,那么要确保当期那工作线程是中断状态,否则,要确保当前线程不是中断状态
// 两种情况:
// 1.当前线程池状态 >= STOP
// 2.当前线程目前是已中断的状态 并且线程池的状态也是 >= Stop 的
// 注意 Thread.interrupted是会擦除中断标识符的,所以 !wt.isInterrupted() 一定返回 true
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 钩子方法,任务执行前
beforeExecute(wt, task);
try {
task.run();
// 钩子方法,任务执行后 - 正常情况
afterExecute(task, null);
} catch (Throwable ex) {
// 钩子方法,任务执行后 - 异常情况
afterExecute(task, ex);
throw ex;
}
} finally {
// 清空 task,准备 getTask 获取下一个任务
// 这个很重要,否则 while 会死循环执行同一个 task
task = null;
// 累计该 Worker 完成的任务数
w.completedTasks++;
// Worker 解锁,本质是 AQS 释放资源,设置 state 为 0
w.unlock();
}
}
//
completedAbruptly = false;
} finally {
// 走到这里,说明 Worker 的一生要结束了:
// 1.正常结束,completedAbruptly 为false,getTask 返回为 null,没任务可执行了
// 2.异常结束,completedAbruptly 为true,throw ex 中抛出了异常
// completedAbruptly为true说明task.run()出现了异常
processWorkerExit(w, completedAbruptly);
}
}

Thread.interrupted()方法用于获取调用方线程的中断状态同时会清空该中断状态,这里之所以会调用这个方法是因为在执行上面这个if逻辑同时外部有可能调用shutdownNow()方法,shutdownNow()方法中也存在中断所有Worker线程的逻辑,但是由于shutdownNow()方法中会遍历所有Worker做线程中断,有可能无法及时在任务提交到Worker执行之前进行中断,所以这个中断逻辑会在Worker内部执行,就是if代码块的逻辑。这里还要注意的是:STOP状态下会拒绝所有新提交的任务,不会再执行任务队列中的任务,同时会中断所有Worker线程。也就是,即使任务Runnable已经在runWorker()中前半段逻辑取出,只要还没走到调用其Runnable#run(),都有可能被中断。假设刚好发生了进入if代码块的逻辑同时外部调用了shutdownNow()方法,那么if逻辑内会判断线程中断状态并且重置,那么shutdownNow()方法中调用的interruptWorkers()就不会因为中断状态判断出现问题导致二次中断线程(会导致异常)。

8.getTask

getTask()只做一件事,就是从线程池的阻塞队列中获取任务并返回。如果当前线程小于核心线程,那么当阻塞队列中没有任务时就会阻塞,反之会等待keepAliveTime后返回。此处再理解keepAliveTime的使用含义:非核心线程等待新任务的时间,超时的话getTask()方法就会返回nullrunWorker()就会执行processWorkerExit()尝试去清理空闲的线程(此时worker的state 等于 0)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

private Runnable getTask() {
// 上次从任务队列中 poll 是否超时
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();

// 1.线程池状态 >= STOP ,runStateAtLeast(c, SHUTDOWN) && runStateAtLeast(c, STOP)
// 2.线程池状态 == SHUTDOWN && workQueue.isEmpty(),runStateAtLeast(c, SHUTDOWN) && !runStateAtLeast(c, STOP) && workQueue.isEmpty()
// 上面的分析其实是剖开了揉碎了来分析很不利于理解,所以大可不必那样
// 在我看来这还是一种快速失败机制,因为线程池在正常情况下都是 RUNNING 状态,所以这里第一条件都不符合
// 只有当线程池是 SHUTDOWN 的时候,才进一步判断是要变为 STOP(不处理任务队列中的任务)
// 还是 任务队列为空(没任务可处理),故返回 null
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
// 无论如何都要让线程数 -1
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
// 判断 worker 是否需要被剔除
// 外部如果调用了 allowCoreThreadTimeOut(true):允许核心线程也使用 keepAliveTime
// 那么如果允许核心线程数内的线程回收,或工作线程数超过了核心线程数,都有可能发生超时关闭
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 第一个条件:工作线程数大于最大线程数或者线程已经超时
// 第二个条件:存活线程数不止一个或者任务队列空了
// 同时满足上面两个条件:线程数 -1,不返回任务
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

// 走到这里说明 wc <= maximumPoolSize 并且没有超时
try {
// timed 为 true:限时等待提取任务队列中的任务(非核心线程或设置了允许超时的核心线程),超时返回 null
// timed 为 false:阻塞等待提取任务队列中的任务(核心线程才会阻塞)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
// r == null 说明已超时
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

9.processWorkerExit

走到 processWorkerExit() 这个方法时,有两种情况:一种是 runWorker()中调用 getTask() 返回 null (线程池要 SHUTDOWN 或者非核心线程超时了),另一种是 runWorker() 中执行任务发生异常了。这时需要终止这个 Worker,所以才会调用这个方法。
启示:task#run()里最好是把所有的异常捕获而不要抛出,不然抛出异常就会立刻销毁线程,然后创建一个新线程,线程池的作用就没有。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// completedAbruptly 为 true:说明线程执行时出现异常
// completedAbruptly 为 false:说明 runWorker 由于 getTask 返回 null 正常结束的,
// getTask方法中已经对 workerCount 进行减一
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 非正常结束没来记得及将 workerCount -1,所以此时需要 -1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 更新线程池已完成任务数
completedTaskCount += w.completedTasks;
// 从 workers 集合中移除该 worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试关闭线程池
tryTerminate();

int c = ctl.get();
// 线程池为 RUNNING 或 SHUTDOWN,如果不是这两个状态,说明线程已经停止了,啥都不会要干了
if (runStateLessThan(c, STOP)) {
// 如果是正常结束
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果允许核心线程超时并且当前队列里面还有任务没跑呢,那就必须留一个线程,不能全死掉.
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果线程池中线程的数量至少有 1 个,那没事了,至少能执行完任务队列中的任务
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 走到这里有三种情况
// 1.worker 是因为某个异常 task 结束的,并不是我真正想结束,所以需要新增一个 worker
// 2.任务队列里还有任务,但是核心线程都关了,这种情况下最起码要留一个线程(可能正在 SHUTDOWN)
// 3.当前线程数量 < corePoolSize值,此时会创建线程,维护线程池数量在corePoolSize这个水平
addWorker(null, false);
}
}

10.tryTerminate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 1.线程池状态为 RUNNING
// 2.线程池状态为 TIDYING,不需要你来关了
// 3.线程池状态为 SHUTDOWN,但是任务队列还有任务,得等队列中的任务处理完毕后再关
// 上面三种情况直接返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
return;
// 走到这里两种情况
// 1.线程池状态为 STOP
// 2.线程池状态为 SHUTDOWN,但是任务队列为空
if (workerCountOf(c) != 0) { // Eligible to terminate
// 中断一个空闲的 worker
interruptIdleWorkers(ONLY_ONE);
return;
}
// 走到这里,说明workerCountOf(c) == 0,
// 在调用 tryTerminate 之前,workerCount已经减为 0,表示这是最后一个退出的线程
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 设置线程池状态为 TIDYING,如果设置成功,则调用 terminated() 方法
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
// terminated() 执行完毕之后设置状态为 TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒那些阻塞在 termination 上的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

11.interruptIdleWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 中断空闲的 worker
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历 workers,根据 onlyOne 判断是中断一个 worker 还是所有
for (Worker w : workers) {
Thread t = w.thread;
// 线程没有被中断并且线程是空闲状态 tryLock() 判断是否空闲
// 因为worker执行任务上会调用 w.lock(),这时再调用 tryLock() 返回false
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

遍历workers,如果线程是空闲状态(空闲状态:queue.take()queue.poll()返回空),则给其一个中断信号,如果是处于workQueue阻塞的线程,会被唤醒,唤醒后,进入下一次自旋时,可能会return null执行退出相关的逻辑,接着又会调用processWorkerExit()->tryTerminate(),回到上面场景,当前线程退出的时候还是会继续唤醒下一个空现线程。

12.一些和关闭相关的方法

1.awaitTermination

还记得上面的 tryTerminate() 方法中的 termination.signalAll(),没错唤醒的就是调用 awaitTermination() 这个方法的线程。因为awaitTermination()这个方法支持超时,所以如果调用了该方法,并且在超时间内线程池状态还不是 TERMINATED,那么调用该方法的线程就会在条件变private final _Condition _termination = mainLock.newCondition();上等待,直到被中断或者超时。如果等待期间线程池状态变为TERMINATED,那么调用该方法的线程就可能会被唤醒,从而返回 true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
while (runStateLessThan(ctl.get(), TERMINATED)) {
if (nanos <= 0L)
return false;
nanos = termination.awaitNanos(nanos);
}
return true;
} finally {
mainLock.unlock();
}
}

2.shutdown

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 安全检查:判断调用该方法的线程是否具有关闭线程池的权限
checkShutdownAccess();
// RUNNING -> SHUTDOWN 状态转换
advanceRunState(SHUTDOWN);
// 中断所有空闲线程,默认 onlyOne 为false
interruptIdleWorkers();
// ScheduledThreadPoolExecutor 预留的钩子
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}

3.shutdownNow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
// 中断所有线程,注意和 interruptIdleWorkers 的区别
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

private void interruptWorkers() {
// assert mainLock.isHeldByCurrentThread();
for (Worker w : workers)
w.interruptIfStarted();
}

void interruptIfStarted() {
Thread t;
// 不管有没有获得锁 w.lock(),只要线程启动后(可能在执行任务也可能空闲)都可以被中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}

4.isShutdown

1
2
3
4
public boolean isShutdown() {
// 线程池状态 >= SHUTDOWN,注意并不只是等于
return runStateAtLeast(ctl.get(), SHUTDOWN);
}

5.isTerminating

1
2
3
4
5
public boolean isTerminating() {
// SHUTDOWN =< 线程池状态 <= TERMINATED
int c = ctl.get();
return runStateAtLeast(c, SHUTDOWN) && runStateLessThan(c, TERMINATED);
}

6.isTerminated

1
2
3
4
public boolean isTerminated() {
// 线程池状态 >= TERMINATED
return runStateAtLeast(ctl.get(), TERMINATED);
}

13线程池的预热和动态调参

1.核心线程的预热功能

:::info
核心线程启动后可能任务队列为空,此时会阻塞在任务队列上直到有任务进来。
:::

1
2
3
4
5
6
7
8
9
10
11
12
// 启动一个核心线程
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
// 启动所有核心线程
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}

2.动态调整线程池参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}

public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}

public void setKeepAliveTime(long time, TimeUnit unit) {
if (time < 0)
throw new IllegalArgumentException();
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
long keepAliveTime = unit.toNanos(time);
long delta = keepAliveTime - this.keepAliveTime;
this.keepAliveTime = keepAliveTime;
if (delta < 0)
interruptIdleWorkers();
}

3.线程池的一些其他参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// 线程池中线程的数量(包含活跃线程和空闲线程)
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}

// 线程池中活跃线程的数量
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers)
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
}

// 线程池线程的历史最大数量
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}

// 线程池中任务总和(包括已经完成的、正在执行的和未执行的)
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}

// 线程池中已经完成的任务总和
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}

14.总结

image.png