前言

本源码系列均为JDK11版本,写文章之前曾对比过JDK11与JDK8版本之间的差异,总的来说,核心逻辑变化不大,但JDK11相比之前的JDK8,变量命名更加规范、代码更加简练、优化了很多糟糕的判断逻辑、方法抽象更加模块化。当然读者也可以将JDK11和JDK8配合使用,感受一下代码重构之美!

线程的创建需要开辟虚拟机栈、本地方法栈、程序计数器等线程私有的的内存空间。在线程销毁时需要回收这些系统资源。
线程池的作用:

  • 使用池化技术管理并复用线程、控制最大并发数。
  • 实现任务线程队列的缓存策略和拒绝机制。
  • 利用某些与时间相关的功能,比如定时执行、周期执行。
  • 隔离线程环境,不同的服务配置不同的线程池,避免相互影响。

参考文章:
https://www.cnblogs.com/wang-meng/p/12945703.html 基于JDK8,是我目前看过最牛逼的线程池源码解析
https://juejin.cn/post/6844903494621773832#heading-1基于JDK8,比较全
https://segmentfault.com/a/1190000023546243 基于JDK11,算是比较新了,但是有几处错误需注意

一、总体设计

类关系说明图:

继承关系:
实现关系:image.png
依赖关系(作为参数或返回值):image.png

线程池的设计本质上和JDK中的集合框架、AQS设计一样,都是使用模板方法设计模式ExecutorService接口继承了Executor接口,定义了管理线程任务的方法,AbstractExecutorService抽象类实现了ExecutorService接口,并提供了submit()invokeAll()invokeAny()等部分方法的实现。但是核心方法Executor.execute()交由子类ThreadPoolExecutorForkJoinPool来分别实现。Executors工具类的静态工厂方法可以创建ThreadPoolExecutorScheduledThreadPoolExecutorForkJoinPool等线程池的包装对象。

二、Executor

Executor接口的设计思想是将任务的创建和任务的执行进行解耦,使用者只需制定任务的执行逻辑(Runnable),无需关心任务在线程池中是如何被调度的以及线程创建和销毁。

1
2
3
4
public interface Executor {
// 提交任务给执行器,由执行器负责去执行任务
void execute(Runnable command);
}

三、ExecutorService

  • 扩充Executor能力:可以提交一个或多个任务,获取任务的执行结果

  • 提供管理线程任务的方法:终止线程池的运行,回收资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public interface ExecutorService extends Executor {

// 关闭线程池,已提交的任务继续执行,不接受继续提交新任务
void shutdown();

// 关闭线程池,尝试停止所有正在执行的任务,停止等待任务的处理,并且返回等待执行的任务列表
List<Runnable> shutdownNow();

// 判断线程池是否已关闭(不对外接受任务,相当店里于打烊了,里面还在收拾东西)
boolean isShutdown();

// 判断线程池是否终止(店里关灯了,收拾完了)
boolean isTerminated();

// 等待一段时间,如果到超时了,还没有terminated则返回false,反之则线程池已经terminated,返回true。
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

// 提交一个有返回值的任务,返回一个表示该任务挂起结果的Future,
// 可使用Future的get方法获取任务成功完成时的返回结果
<T> Future<T> submit(Callable<T> task);

// 同上,result可以接受返回结果,
<T> Future<T> submit(Runnable task, T result);

// 提交一个无返回值的任务
Future<?> submit(Runnable task);

// 执行所有任务,返回一个 Future 类型的 List
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

// 同上,但设置了超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

// 只要一个任务结束了,就可以返回该任务的执行结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

// 同上,但设置了超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

四、AbstractExecutorService

AbstractExecutorService实现了 submit()invokeAny()invokeAll() 方法(利用模板方法设计模式),但它们只是在方法内部调用了execute方法,需要等具体执行器来实现这个最重要的部分,私有方法newTaskFor()Runnable包装为FutureTask。定义于最上层接口 Executor中的void execute(Runnable command)由于不需要获取结果,不会进行 FutureTask 的包装。需要获取结果FutureTask,用 submit() 方法,不需要获取结果,可以用 execute() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
public abstract class AbstractExecutorService implements ExecutorService {

// 将Runnable包装为FutureTask,用于submit调用
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

// 将Callable包装为FutureTask,用于submit调用
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

// 提交任务,使用了模板方法设计模式
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 1.将任务包装成FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 2.交给执行器执行,具体执行逻辑由子类实现
execute(ftask);
return ftask;
}

// 提交任务,带返回结果
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
// 1.将任务包装成FutureTask
RunnableFuture<T> ftask = newTaskFor(task, result);
// 2.交给执行器执行,具体执行逻辑由子类实现
execute(ftask);
return ftask;
}

// 提交任务,带返回结果
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 1.将任务包装成FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
// 2.交给执行器执行,具体执行逻辑由子类实现
execute(ftask);
return ftask;
}

// Invoke:调用、激活
// 将 tasks 集合中的任务提交到线程池执行,任意一个线程执行完后就可以结束了
// 第二个参数 timed 代表是否设置超时机制,超时时间为第三个参数,
// 如果 timed 为 true,同时超时了还没有一个线程返回结果,那么抛出 TimeoutException 异常
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
// 初始化ExecutorCompletionService,this作为其内部的执行器
// ExecutorCompletionService使我们可以按任务的完成顺序从中取出对应的Future
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);

// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.
// 为了更高效的利用执行器(有限的并行度),在提交更多任务之前检查之前已经提交的
// 任务是否已经完成。这个错综复杂的逻辑再加上异常处理机制可能导致了下面for循环的复杂度
try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
// 记录异常,如果我们得到任何有效结果,我们可以抛出最后一个得到的异常
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();

// 先提交一个任务,后续任务到下面的for循环中一个一个提交
futures.add(ecs.submit(it.next()));
// 上面提交了一个任务,所以任务数减 1
--ntasks;
// 正在执行的任务数(提交的时候 +1,任务结束的时候 -1)
int active = 1;

for (;;) {
// 以非阻塞的方式获取一个任务的执行结果 Future
Future<T> f = ecs.poll();
// Future 获取失败
if (f == null) {
// 提交下一个任务
// 假设一个任务执行时间很长(一个也没获取到),那么只好先把所有的任务先提交
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}

// 此处应用场景:因为break退出循环之后,必然会抛出异常ee,所以此处的发生应该是在
// 下面执行return f.get()时发生了异常,而此处正是该异常的一个出口,跳出for循环后
// 会调用throw ee 抛出
else if (active == 0)
break;
// 这里是else if,说明ntask <= 0,即已经提交完所有的任务,但是还没有获取到一个任务
// 如果设置了获取任务的超时时间,那么将尝试在nanos时间内获取一个任务的执行结果
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
// 超时的话抛出异常
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
// 如果已经提交完所有任务也没有设置超时时间,
// 那么就尝试以阻塞的方式获取一个任务的执行结果
// 这个时候会一直等,直到获取到一个任务的执行结果
else
f = ecs.take();
}
// Future 获取成功
if (f != null) {
// 正在执行的任务数 -1
--active;
try {
// 返回执行结果,如果有异常,都包装成ExecutionException
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}

if (ee == null)
ee = new ExecutionException();
throw ee;

} finally {
// 方法退出之前,取消其他任务,
// 不会取消已完成的任务(对于已完成的任务,取消没有什么效果)
cancelAll(futures);
}
}

// 不带超时时间的 invokeAny
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}

// 带超时时间的 invokeAny
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}

// 执行所有的任务,返回任务结果
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
// 包含所有任务执行结果的 Future 列表
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
try {
for (Callable<T> t : tasks) {
// 包装成 FutureTask
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
// 提交任务
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
// 这是一个阻塞方法,直到获取到值,或抛出了异常
// 这里有个小细节,其实 get 方法签名上是会抛出 InterruptedException 的
// 可是这里没有进行处理,而是抛给外层去了。此异常发生于还没执行完的任务被取消了
f.get();
}
catch (CancellationException | ExecutionException ignore) {}
}
}
// 这个方法返回,不像其他的场景,返回 List<Future>,其实执行结果还没出来
// 这个方法返回是包含所有任务执行结果的 Future 列表
return futures;
} catch (Throwable t) {
// 发生异常,取消所有的 Future
cancelAll(futures);
throw t;
}
}

// 带超时的 invokeAll
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
// 截止时间
final long deadline = System.nanoTime() + nanos;
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
int j=0;
timeOut:try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));

final int size = futures.size();

// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
for (int i = 0; i < size; i++) {
if (((i == 0) ? nanos : deadline - System.nanoTime()) <= 0L)
// 如果提交任务时就超时,直接break
break timedOut;
// 提交任务
execute((Runnable)futures.get(i));
}
// 获取完成的任务
for (; j < size; j++) {
Future<T> f = futures.get(j);
if (!f.isDone()) {
try { f.get(deadline - System.nanoTime(), NANOSECONDS); }
catch (CancellationException | ExecutionException ignore) {}
catch (TimeoutException timedOut) {
// 某个任务获取超时了,就 break
break timedOut;
}
}
}
return futures;
}
// 走到这里应该是超时 break 了,所以取消还没有完成的 Future
cancelAll(futures, j);
return futures;

}

// 取消所有的 Future
private static <T> void cancelAll(ArrayList<Future<T>> futures) {
cancelAll(futures, 0);
}

// 取消索引 j 之后所有的 Future
private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) {
for (int size = futures.size(); j < size; j++)
futures.get(j).cancel(true);
}
}

番外篇:Runnable在执行时也是通过适配器模式被包装成Callable。
看如下解释:

1
2
3
4
5
6
public abstract class AbstractExecutorService implements ExecutorService {
// 上面代码的其中一小段(将Runnable包装为FutureTask,用于submit调用)
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
}
1
2
3
4
5
6
7
public class FutureTask<V> implements RunnableFuture<V> {
public FutureTask(Runnable runnable, V result) {
// 调用Executors工具类的静态工厂callable方法
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Executors {
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
// 将Runnable包装成一个向Callable转化的适配器RunnableAdapter
return new RunnableAdapter<T>(task, result);
}

// 适配器设计模式
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
}