// 提交任务,带返回结果 public <T> Future<T> submit(Runnable task, T result){ if (task == null) thrownew NullPointerException(); // 1.将任务包装成FutureTask RunnableFuture<T> ftask = newTaskFor(task, result); // 2.交给执行器执行,具体执行逻辑由子类实现 execute(ftask); return ftask; } // 提交任务,带返回结果 public <T> Future<T> submit(Callable<T> task){ if (task == null) thrownew NullPointerException(); // 1.将任务包装成FutureTask RunnableFuture<T> ftask = newTaskFor(task); // 2.交给执行器执行,具体执行逻辑由子类实现 execute(ftask); return ftask; } // Invoke:调用、激活 // 将 tasks 集合中的任务提交到线程池执行,任意一个线程执行完后就可以结束了 // 第二个参数 timed 代表是否设置超时机制,超时时间为第三个参数, // 如果 timed 为 true,同时超时了还没有一个线程返回结果,那么抛出 TimeoutException 异常 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { if (tasks == null) thrownew NullPointerException(); int ntasks = tasks.size(); if (ntasks == 0) thrownew IllegalArgumentException(); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks); // 初始化ExecutorCompletionService,this作为其内部的执行器 // ExecutorCompletionService使我们可以按任务的完成顺序从中取出对应的Future ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);
// For efficiency, especially in executors with limited // parallelism, check to see if previously submitted tasks are // done before submitting more of them. This interleaving // plus the exception mechanics account for messiness of main // loop. // 为了更高效的利用执行器(有限的并行度),在提交更多任务之前检查之前已经提交的 // 任务是否已经完成。这个错综复杂的逻辑再加上异常处理机制可能导致了下面for循环的复杂度 try { // Record exceptions so that if we fail to obtain any // result, we can throw the last exception we got. // 记录异常,如果我们得到任何有效结果,我们可以抛出最后一个得到的异常 ExecutionException ee = null; finallong deadline = timed ? System.nanoTime() + nanos : 0L; Iterator<? extends Callable<T>> it = tasks.iterator();
// 先提交一个任务,后续任务到下面的for循环中一个一个提交 futures.add(ecs.submit(it.next())); // 上面提交了一个任务,所以任务数减 1 --ntasks; // 正在执行的任务数(提交的时候 +1,任务结束的时候 -1) int active = 1; for (;;) { // 以非阻塞的方式获取一个任务的执行结果 Future Future<T> f = ecs.poll(); // Future 获取失败 if (f == null) { // 提交下一个任务 // 假设一个任务执行时间很长(一个也没获取到),那么只好先把所有的任务先提交 if (ntasks > 0) { --ntasks; futures.add(ecs.submit(it.next())); ++active; } // 此处应用场景:因为break退出循环之后,必然会抛出异常ee,所以此处的发生应该是在 // 下面执行return f.get()时发生了异常,而此处正是该异常的一个出口,跳出for循环后 // 会调用throw ee 抛出 elseif (active == 0) break; // 这里是else if,说明ntask <= 0,即已经提交完所有的任务,但是还没有获取到一个任务 // 如果设置了获取任务的超时时间,那么将尝试在nanos时间内获取一个任务的执行结果 elseif (timed) { f = ecs.poll(nanos, TimeUnit.NANOSECONDS); // 超时的话抛出异常 if (f == null) thrownew TimeoutException(); nanos = deadline - System.nanoTime(); } // 如果已经提交完所有任务也没有设置超时时间, // 那么就尝试以阻塞的方式获取一个任务的执行结果 // 这个时候会一直等,直到获取到一个任务的执行结果 else f = ecs.take(); } // Future 获取成功 if (f != null) { // 正在执行的任务数 -1 --active; try { // 返回执行结果,如果有异常,都包装成ExecutionException return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } }
if (ee == null) ee = new ExecutionException(); throw ee;