一、一个简单的任务编排器:
在异步任务程序中,一种常见的场景是,主线程提交多个异步任务,然后希望有任务完成就处理结果,并且按任务完成顺序逐个处理,对于这种场景,Java并发包提供了一个方便的方法,使用CompletionService
,这是一个接口,它的实现类是ExecutorCompletionService
。
CompletionService
实现了一种行为间的解耦方式,它内部并不关心任务具体如何执行,而是将其交给Executor
,而自己只负责对任务执行后的结果进行处理。
参考文章:https://www.jianshu.com/p/9a42c5338e95
1、类间关系图:

组合关系:
依赖关系:
2、基本原理:
CompletionService
的内部维护了一个阻塞队列BlockingQueue<Future<V>> completionQueue
,当任务执行结束就把任务的执行结果Future
加入到阻塞队列中,然后我们就可以从阻塞队列中获取完成的任务。对于FutureTask
,任务完成后,不管是正常完成、异常结束、还是被取消,都会调用finishCompletion
方法,而该方法会调用一个done
方法,ExecutorCompletionService
重写了FutureTask
的done
方法,把Executor
执行的计算结果放入BlockingQueue
中。
3、接口关系图:

二、核心属性
1 2 3 4 5
| private final Executor executor; private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
|
三、核心内部类
1 2 3 4 5 6 7 8 9 10 11 12
| private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; }
|
四、构造函数
在构造函数中我们至少需要传入一个Executor
线程池的实现来执行异步任务,但是建议再传入一个阻塞队列,默认的LinkedBlockingQueue
是一个无界队列,有内存溢出的风险。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }
public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; }
|
五、submit提交任务
提交任务前将task封装为QueueingFuture
,当任务执行完成后就会回调done
方法,放入任务队列中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }
public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; }
private RunnableFuture<V> newTaskFor(Callable<V> task) { if (aes == null) return new FutureTask<V>(task); else return aes.newTaskFor(task); }
private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); }
|
六获取已完成的任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public Future<V> take() throws InterruptedException { return completionQueue.take(); }
public Future<V> poll() { return completionQueue.poll(); }
public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); }
|