JDK1.6 Executors

介绍

Executors是JUC包提供的一个工具性质的帮助类,它针对ExecutorService、ScheduledExecutorService、ThreadFactory和Callable提供了一系列工厂方法和工具方法。

源码解析

Executors#newFixedThreadPool()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* <tt>nThreads</tt> threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if <tt>nThreads &lt;= 0</tt>
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
  • 这个方法创建了一个核心线程数量和最大线程数量一致的,并且任务队列是无界队列的线程池。
  • 由于默认核心线程不会超时,所以超时相关的参数也没有意义。
  • 如果在线程关闭之前,一个工作线程由于某种原因挂了,那么线程池会自动补上一个新的工作线程。

Executors#newFixedThreadPool(ThreadFactory)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue, using the provided
* ThreadFactory to create new threads when needed. At any point,
* at most <tt>nThreads</tt> threads will be active processing
* tasks. If additional tasks are submitted when all threads are
* active, they will wait in the queue until a thread is
* available. If any thread terminates due to a failure during
* execution prior to shutdown, a new one will take its place if
* needed to execute subsequent tasks. The threads in the pool will
* exist until it is explicitly {@link ExecutorService#shutdown
* shutdown}.
*
* @param nThreads the number of threads in the pool
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
* @throws IllegalArgumentException if <tt>nThreads &lt;= 0</tt>
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

除了能定制ThreadFactory之外,和上个方法一样。

Executors#newSingleThreadExecutor()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* <tt>newFixedThreadPool(1)</tt> the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

这个工厂方法看上去有点类似newFixedThreadPool(1) ,但有一点儿区别,这个不能重新调整配置(比如动态增大核心线程数量)了,由于方法内返回的不是ThreadPoolExecutor实例,而是一个包装类。

Executors#FinalizableDelegatedExecutorService

1
2
3
4
5
6
7
8
9
10
  static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
// 1
super.shutdown();
}
}

标注代码分析

  1. 被垃圾回收时,关闭线程池。

FinalizableDelegatedExecutorService类图

Executors#DelegatedExecutorService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* A wrapper class that exposes only the ExecutorService methods
* of an ExecutorService implementation.
*/
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List<Runnable> shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public <T> Future<T> submit(Callable<T> task) {
return e.submit(task);
}
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}

包装类,只实现public ExecutorService定义的方法。

Executors#newCachedThreadPool()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to <tt>execute</tt> will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

这个方法创建了一个核心线程数量为0,最大线程(可以认为)无上限,并且任务队列是同步队列(无实际容量)的线程池。
针对每一个新任务,如果当前没有空闲线程,都会创建一个新的工作线程来处理任务。工作线程默认空闲超过60秒超时被回收。

Executors#unconfigurableExecutorService()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Returns an object that delegates all defined {@link
* ExecutorService} methods to the given executor, but not any
* other methods that might otherwise be accessible using
* casts. This provides a way to safely "freeze" configuration and
* disallow tuning of a given concrete implementation.
* @param executor the underlying implementation
* @return an <tt>ExecutorService</tt> instance
* @throws NullPointerException if executor null
*/
public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedExecutorService(executor);
}

包装成默认ExecutorService实现方式。

Executors#newScheduledThreadPool()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle.
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if <tt>corePoolSize &lt; 0</tt>
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle.
* @param threadFactory the factory to use when the executor
* creates a new thread.
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if <tt>corePoolSize &lt; 0</tt>
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

创建指定线程数量的ScheduledThreadPoolExecutor。

Executors#newSingleThreadScheduledExecutor()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* Creates a single-threaded executor that can schedule commands
* to run after a given delay, or to execute periodically.
* (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* <tt>newScheduledThreadPool(1)</tt> the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
* @return the newly created scheduled executor
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}

/**
* Creates a single-threaded executor that can schedule commands
* to run after a given delay, or to execute periodically. (Note
* however that if this single thread terminates due to a failure
* during execution prior to shutdown, a new one will take its
* place if needed to execute subsequent tasks.) Tasks are
* guaranteed to execute sequentially, and no more than one task
* will be active at any given time. Unlike the otherwise
* equivalent <tt>newScheduledThreadPool(1, threadFactory)</tt>
* the returned executor is guaranteed not to be reconfigurable to
* use additional threads.
* @param threadFactory the factory to use when creating new
* threads
* @return a newly created scheduled executor
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}

new ScheduledThreadPoolExecutor(1)进行DelegatedScheduledExecutorService包装。

Executors#DelegatedScheduledExecutorService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* A wrapper class that exposes only the ScheduledExecutorService
* methods of a ScheduledExecutorService implementation.
*/
static class DelegatedScheduledExecutorService
extends DelegatedExecutorService
implements ScheduledExecutorService {
private final ScheduledExecutorService e;
DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
super(executor);
e = executor;
}
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return e.schedule(command, delay, unit);
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return e.schedule(callable, delay, unit);
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return e.scheduleAtFixedRate(command, initialDelay, period, unit);
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
}

DelegatedScheduledExecutorService作为包装类实现ScheduledExecutorService的方法。

Executors#unconfigurableScheduledExecutorService()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Returns an object that delegates all defined {@link
* ScheduledExecutorService} methods to the given executor, but
* not any other methods that might otherwise be accessible using
* casts. This provides a way to safely "freeze" configuration and
* disallow tuning of a given concrete implementation.
* @param executor the underlying implementation
* @return a <tt>ScheduledExecutorService</tt> instance
* @throws NullPointerException if executor null
*/
public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedScheduledExecutorService(executor);
}

包装成ScheduledExecutorService默认实现方式。

Executors#DefaultThreadFactory()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

简单的线程工厂,通过ThreadGroup创建Thread。

Executors#PrivilegedThreadFactory()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Thread factory capturing access control and class loader
*/
static class PrivilegedThreadFactory extends DefaultThreadFactory {
private final ClassLoader ccl;
private final AccessControlContext acc;

PrivilegedThreadFactory() {
super();
this.ccl = Thread.currentThread().getContextClassLoader();
this.acc = AccessController.getContext();
acc.checkPermission(new RuntimePermission("setContextClassLoader"));
}

public Thread newThread(final Runnable r) {
return super.newThread(new Runnable() {
public void run() {
AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
Thread.currentThread().setContextClassLoader(ccl);
r.run();
return null;
}
}, acc);
}
});
}

}

PrivilegedThreadFactory继承DefaultThreadFactory,通过ClassLoader和AccessControlContext创建定制化的Thread。

Executors#callable()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/**
* Returns a {@link Callable} object that, when
* called, runs the given task and returns the given result. This
* can be useful when applying methods requiring a
* <tt>Callable</tt> to an otherwise resultless action.
* @param task the task to run
* @param result the result to return
* @return a callable object
* @throws NullPointerException if task null
*/
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}

/**
* Returns a {@link Callable} object that, when
* called, runs the given task and returns <tt>null</tt>.
* @param task the task to run
* @return a callable object
* @throws NullPointerException if task null
*/
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}

/**
* Returns a {@link Callable} object that, when
* called, runs the given privileged action and returns its result.
* @param action the privileged action to run
* @return a callable object
* @throws NullPointerException if action null
*/
public static Callable<Object> callable(final PrivilegedAction<?> action) {
if (action == null)
throw new NullPointerException();
return new Callable<Object>() {
public Object call() { return action.run(); }};
}

/**
* Returns a {@link Callable} object that, when
* called, runs the given privileged exception action and returns
* its result.
* @param action the privileged exception action to run
* @return a callable object
* @throws NullPointerException if action null
*/
public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {
if (action == null)
throw new NullPointerException();
return new Callable<Object>() {
public Object call() throws Exception { return action.run(); }};
}

/**
* Returns a {@link Callable} object that will, when
* called, execute the given <tt>callable</tt> under the current
* access control context. This method should normally be
* invoked within an {@link AccessController#doPrivileged} action
* to create callables that will, if possible, execute under the
* selected permission settings holding within that action; or if
* not possible, throw an associated {@link
* AccessControlException}.
* @param callable the underlying task
* @return a callable object
* @throws NullPointerException if callable null
*
*/
public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallable<T>(callable);
}

/**
* Returns a {@link Callable} object that will, when
* called, execute the given <tt>callable</tt> under the current
* access control context, with the current context class loader
* as the context class loader. This method should normally be
* invoked within an {@link AccessController#doPrivileged} action
* to create callables that will, if possible, execute under the
* selected permission settings holding within that action; or if
* not possible, throw an associated {@link
* AccessControlException}.
* @param callable the underlying task
*
* @return a callable object
* @throws NullPointerException if callable null
* @throws AccessControlException if the current access control
* context does not have permission to both set and get context
* class loader.
*/
public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);
}

根据上面callable()分成3种callable()。

  • 转换Runnable适配器RunnableAdapter()。
  • PrivilegedAction作为参数传递(PrivilegedExceptionAction是throw Exception版本),返回Callable对象。
  • 返回PrivilegedCallable对象。

Executors#RunnableAdapter()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}

call()里运行Runnable#run()。

Executors#PrivilegedCallable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* A callable that runs under established access control settings
*/
static final class PrivilegedCallable<T> implements Callable<T> {
private final AccessControlContext acc;
private final Callable<T> task;
private T result;
private Exception exception;
PrivilegedCallable(Callable<T> task) {
this.task = task;
this.acc = AccessController.getContext();
}

public T call() throws Exception {
AccessController.doPrivileged(new PrivilegedAction<T>() {
public T run() {
try {
result = task.call();
} catch (Exception ex) {
exception = ex;
}
return null;
}
}, acc);
if (exception != null)
throw exception;
else
return result;
}
}

AccessControlContext设置好,callable才能运行。

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×