- 介绍
- 源码分析
- CompletionService()
- ExecutorCompletionService#QueueingFuture
- ExecutorCompletionService 构造函数
- ExecutorCompletionService#newTaskFor()
- AbstractExecutorService#newTaskFor
- ExecutorCompletionService#submit()
- Executor 接口
- ScheduledThreadPoolExecutor#execute()
- FutureTask#done()
- ExecutorCompletionService#QueueingFuture
介绍
ExecutorCompletionService用于执行一批任务,然后按照任务执行完成的顺序来获取任务结果。可以在获取到了若干个执行结果后,把其他的任务取消掉(ThreadPoolExecutor中的invokeAny就是通过这货实现的)。
比如这样的场景:你的业务需要调用10个接口来获取一些信息,业务规定只需要其中任意2个接口的信息,那么就可以使用ExecutorCompletionService,获取前两个成功完成的任务结果,然后将其他的任务取消。
源码分析
ExecutorCompletionService实现CompletionService接口。
CompletionService()
1 | public interface CompletionService<V> { |
标注代码分析
- 提交一个有返回值的任务。
- 提交一个Runnable和一个返回值。
- 获取并移除下一个完成的任务,如果当前没有任务完成,阻塞等待。
- 获取并移除下一个完成的任务,如果当前没有任务完成,返回null。
- 获取并移除下一个完成的任务,如果当前没有任务完成,阻塞等待,如果在超时前仍然没有任务完成,返回null。
ExecutorCompletionService#QueueingFuture
1 | public class ExecutorCompletionService<V> implements CompletionService<V> { |
标注代码分析
- 异步任务完成后,将其放入完成队列。
QueueingFuture用来存放完成任务的阻塞队列。
ExecutorCompletionService 构造函数
1 | /** |
标注代码分析
1.在ExecutorCompletionService#newTaskFor()中aes作用是生成FutureTask对象。
ExecutorCompletionService#newTaskFor()
1 | private RunnableFuture<V> newTaskFor(Callable<V> task) { |
生成FutureTask对象。
AbstractExecutorService#newTaskFor
1 | /** |
ExecutorCompletionService#submit()
1 | public Future<V> submit(Callable<V> task) { |
任务会在submit()被包装成QueueingFuture对象,由Executor#execute()执行,任务执行完成后,会被加入到内部的队列里面,外部程序就可以通过take或者poll方法来获取完成的任务了。
Executor 接口
1 | public interface Executor { |
如下图所示,ScheduledThreadPoolExecutor实现Executor,覆写execute。
ScheduledThreadPoolExecutor#execute()
1 | /** |
调用ScheduledThreadPoolExecutor#schedule(),创建延迟任务,并且执行它。返回ScheduledFutureTask对象。根据JDK1.6 ScheduledThreadPoolExecutor那篇文章,可以知道ScheduledFutureTask继承FutureTask,FutureTask#done()可以知道,任务执行完成后可以覆写此方法(done())。作为callbacks。
FutureTask#done()
1 | /** |
Subclasses may override this method to invoke completion callbacks or perform bookkeeping.
ExecutorCompletionService#QueueingFuture
1 | /** |
QueueingFuture#done()覆写FutureTask#done(),把完成任务添加到BlockingQueue。通过take(),poll()获取任务。1
2
3
4
5
6
7
8
9
10
11public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
return completionQueue.poll(timeout, unit);
}