一、一个简单的任务编排器:

在异步任务程序中,一种常见的场景是,主线程提交多个异步任务,然后希望有任务完成就处理结果,并且按任务完成顺序逐个处理,对于这种场景,Java并发包提供了一个方便的方法,使用CompletionService,这是一个接口,它的实现类是ExecutorCompletionService。​

CompletionService实现了一种行为间的解耦方式,它内部并不关心任务具体如何执行,而是将其交给Executor,而自己只负责对任务执行后的结果进行处理。

参考文章:https://www.jianshu.com/p/9a42c5338e95

1、类间关系图:

image.png

组合关系:image.png

依赖关系:image.png

2、基本原理:

CompletionService 的内部维护了一个阻塞队列BlockingQueue<Future<V>> completionQueue,当任务执行结束就把任务的执行结果Future加入到阻塞队列中,然后我们就可以从阻塞队列中获取完成的任务。对于FutureTask,任务完成后,不管是正常完成、异常结束、还是被取消,都会调用finishCompletion方法,而该方法会调用一个done方法,ExecutorCompletionService重写了FutureTaskdone方法,把Executor执行的计算结果放入BlockingQueue中。

3、接口关系图:

image.png

二、核心属性

1
2
3
4
5
// 执行任务的线程池
private final Executor executor;
private final AbstractExecutorService aes;
// 存放已完成任务的阻塞队列,默认使用 LinkedBlockingQueue
private final BlockingQueue<Future<V>> completionQueue;

三、核心内部类

1
2
3
4
5
6
7
8
9
10
11
12
// 将任务FutureTask做了扩展,实现了FutureTask的done方法
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
// 当任务完成后会回调这个方法,这时我们在这个方法中将完成的任务放到队列中,
// 就实现了按照异步任务完成的顺序,逐个处理任务的结果了。
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}

四、构造函数

在构造函数中我们至少需要传入一个Executor线程池的实现来执行异步任务,但是建议再传入一个阻塞队列,默认的LinkedBlockingQueue是一个无界队列,有内存溢出的风险。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
// 自定义executor没有继承AbstractExecutorService,aes就为null
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
// 构造没有指定的话,默认使用一个无界阻塞队列
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}

五、submit提交任务

提交任务前将task封装为QueueingFuture,当任务执行完成后就会回调done方法,放入任务队列中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 提交 Callable 任务
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
// 封装为 QueueingFuture
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}

// 提交 Runnable 任务,带返回结果
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
// 封装为 QueueingFuture
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}

private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}

private RunnableFuture<V> newTaskFor(Runnable task, V result) {
// 如果是自定义Executor的话,默认使用FutureTask
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}

六获取已完成的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 如果没有任务,一直阻塞,直到有新任务进来
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}

// 如果没有任务返回null
public Future<V> poll() {
return completionQueue.poll();
}

// timeout时间内获取任务,没有返回null
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}