JDK1.6 ExecutorCompletionService

介绍

ExecutorCompletionService用于执行一批任务,然后按照任务执行完成的顺序来获取任务结果。可以在获取到了若干个执行结果后,把其他的任务取消掉(ThreadPoolExecutor中的invokeAny就是通过这货实现的)。
比如这样的场景:你的业务需要调用10个接口来获取一些信息,业务规定只需要其中任意2个接口的信息,那么就可以使用ExecutorCompletionService,获取前两个成功完成的任务结果,然后将其他的任务取消。

源码分析

ExecutorCompletionService实现CompletionService接口。

CompletionService()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public interface CompletionService<V> {
/**
* Submits a value-returning task for execution and returns a Future
* representing the pending results of the task. Upon completion,
* this task may be taken or polled.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
// 1
Future<V> submit(Callable<V> task);

/**
* Submits a Runnable task for execution and returns a Future
* representing that task. Upon completion, this task may be
* taken or polled.
*
* @param task the task to submit
* @param result the result to return upon successful completion
* @return a Future representing pending completion of the task,
* and whose <tt>get()</tt> method will return the given
* result value upon completion
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
// 2
Future<V> submit(Runnable task, V result);

/**
* Retrieves and removes the Future representing the next
* completed task, waiting if none are yet present.
*
* @return the Future representing the next completed task
* @throws InterruptedException if interrupted while waiting
*/
// 3
Future<V> take() throws InterruptedException;


/**
* Retrieves and removes the Future representing the next
* completed task or <tt>null</tt> if none are present.
*
* @return the Future representing the next completed task, or
* <tt>null</tt> if none are present
*/
// 4
Future<V> poll();

/**
* Retrieves and removes the Future representing the next
* completed task, waiting if necessary up to the specified wait
* time if none are yet present.
*
* @param timeout how long to wait before giving up, in units of
* <tt>unit</tt>
* @param unit a <tt>TimeUnit</tt> determining how to interpret the
* <tt>timeout</tt> parameter
* @return the Future representing the next completed task or
* <tt>null</tt> if the specified waiting time elapses
* before one is present
* @throws InterruptedException if interrupted while waiting
*/
// 5
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

标注代码分析

  1. 提交一个有返回值的任务。
  2. 提交一个Runnable和一个返回值。
  3. 获取并移除下一个完成的任务,如果当前没有任务完成,阻塞等待。
  4. 获取并移除下一个完成的任务,如果当前没有任务完成,返回null。
  5. 获取并移除下一个完成的任务,如果当前没有任务完成,阻塞等待,如果在超时前仍然没有任务完成,返回null。

ExecutorCompletionService#QueueingFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;

/**
* FutureTask extension to enqueue upon completion
*/
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
// 1
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}

标注代码分析

  1. 异步任务完成后,将其放入完成队列。

QueueingFuture用来存放完成任务的阻塞队列。

ExecutorCompletionService 构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Creates an ExecutorCompletionService using the supplied
* executor for base task execution and a
* {@link LinkedBlockingQueue} as a completion queue.
*
* @param executor the executor to use
* @throws NullPointerException if executor is <tt>null</tt>
*/
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

/**
* Creates an ExecutorCompletionService using the supplied
* executor for base task execution and the supplied queue as its
* completion queue.
*
* @param executor the executor to use
* @param completionQueue the queue to use as the completion queue
* normally one dedicated for use by this service
* @throws NullPointerException if executor or completionQueue are <tt>null</tt>
*/
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
// 1
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}

标注代码分析
1.在ExecutorCompletionService#newTaskFor()中aes作用是生成FutureTask对象。

ExecutorCompletionService#newTaskFor()

1
2
3
4
5
6
7
8
9
10
11
12
13
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}

private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}

生成FutureTask对象。

AbstractExecutorService#newTaskFor

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Returns a <tt>RunnableFuture</tt> for the given callable task.
*
* @param callable the callable task being wrapped
* @return a <tt>RunnableFuture</tt> which when run will call the
* underlying callable and which, as a <tt>Future</tt>, will yield
* the callable's result as its result and provide for
* cancellation of the underlying task.
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

ExecutorCompletionService#submit()

1
2
3
4
5
6
7
8
9
10
11
12
13
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}

public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}

任务会在submit()被包装成QueueingFuture对象,由Executor#execute()执行,任务执行完成后,会被加入到内部的队列里面,外部程序就可以通过take或者poll方法来获取完成的任务了。

Executor 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Executor {

/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the <tt>Executor</tt> implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution.
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}

如下图所示,ScheduledThreadPoolExecutor实现Executor,覆写execute。

ScheduledThreadPoolExecutor#execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Executes command with zero required delay. This has effect
* equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note
* that inspections of the queue and of the list returned by
* <tt>shutdownNow</tt> will access the zero-delayed
* {@link ScheduledFuture}, not the <tt>command</tt> itself.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
* for execution because the executor has been shut down.
* @throws NullPointerException if command is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
schedule(command, 0, TimeUnit.NANOSECONDS);
}

调用ScheduledThreadPoolExecutor#schedule(),创建延迟任务,并且执行它。返回ScheduledFutureTask对象。根据JDK1.6 ScheduledThreadPoolExecutor那篇文章,可以知道ScheduledFutureTask继承FutureTask,FutureTask#done()可以知道,任务执行完成后可以覆写此方法(done())。作为callbacks。

FutureTask#done()

1
2
3
4
5
6
7
8
9
10
/**
* Protected method invoked when this task transitions to state
* <tt>isDone</tt> (whether normally or via cancellation). The
* default implementation does nothing. Subclasses may override
* this method to invoke completion callbacks or perform
* bookkeeping. Note that you can query status inside the
* implementation of this method to determine whether this task
* has been cancelled.
*/
protected void done() { }

Subclasses may override this method to invoke completion callbacks or perform bookkeeping.

ExecutorCompletionService#QueueingFuture

1
2
3
4
5
6
7
8
9
10
11
/**
* FutureTask extension to enqueue upon completion
*/
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}

QueueingFuture#done()覆写FutureTask#done(),把完成任务添加到BlockingQueue。通过take(),poll()获取任务。

1
2
3
4
5
6
7
8
9
10
11
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}

public Future<V> poll() {
return completionQueue.poll();
}

public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
return completionQueue.poll(timeout, unit);
}

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×