JDK1.6 ThreadPoolExecutor

介绍

ThreadPoolExecutor是JUC包中提供的线程池,使用ThreadPoolExecutor的好处一方面是能重用线程资源,避免重复创建线程带来的开销;另一方面是ThreadPoolExecutor提供了内部资源(线程、任务)的管理功能,方便我们监控线程池工作状态。

源码分析

ThreadPoolExecutor类结构图

Executor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface Executor {

/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the <tt>Executor</tt> implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution.
* @throws NullPointerException if command is null
*/
// 1
void execute(Runnable command);
}

标注代码分析

  1. 根据注释很好理解。command可以在新线程里执行,可以在线程池里执行,可以被调用线程执行等等。

ExecutorService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 1 
void shutdown();
// 2
List<Runnable> shutdownNow();
// 3
boolean isShutdown();
// 4
boolean isTerminated();
// 5
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 6
<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);
// 7
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 8
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;

标注代码分析

  1. 关闭之前,已经提交的任务会被执行。新任务被拒绝。服务被关闭,则调用这个方法没有影响。
  2. 尝试停止所有正在执行的任务,停止处理正在等待的任务,并返回等待执行任务列表。并不能确保一定可以停止正在执行的任务。比如,通常的实现方式是中断执行任务的线程,但如果任务执行过程中并不响应中断,那就无法停止这个任务。
  3. 是否已经关闭。
  4. 关闭后,是否所有的任务都执行完毕。
  5. 在关闭、超时、任务中断后,任务执行完毕前,都阻塞。
  6. 提交任务。
  7. 执行任务集合。
  8. 执行一批任务,如果其中有一个任务完成,就返回结果。其他没有完成的任务会被取消。

AbstractExecutorService#newTaskFor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Returns a <tt>RunnableFuture</tt> for the given runnable and default
* value.
*
* @param runnable the runnable task being wrapped
* @param value the default value for the returned future
* @return a <tt>RunnableFuture</tt> which when run will run the
* underlying runnable and which, as a <tt>Future</tt>, will yield
* the given value as its result and provide for cancellation of
* the underlying task.
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

/**
* Returns a <tt>RunnableFuture</tt> for the given callable task.
*
* @param callable the callable task being wrapped
* @return a <tt>RunnableFuture</tt> which when run will call the
* underlying callable and which, as a <tt>Future</tt>, will yield
* the callable's result as its result and provide for
* cancellation of the underlying task.
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

将runnable、callable转换成FutureTask。

AbstractExecutorService#submit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Object> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

先转成RunnableFuture,然后提交到Executor,然后返回RunnableFuture(异步任务)。

AbstractExecutorService#doInvokeAny()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/**
* the main mechanics of invokeAny.
*/
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);

// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.

try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee = null;
long lastTime = (timed)? System.nanoTime() : 0;
Iterator<? extends Callable<T>> it = tasks.iterator();

// Start one task for sure; the rest incrementally
// 1
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
// 2
for (;;) {
// 3
Future<T> f = ecs.poll();
if (f == null) {
// 4
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
// 5
else if (active == 0)
break;
// 6
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
else // 7
f = ecs.take();
}// 8
if (f != null) {
--active;
try {
return f.get();
} catch (InterruptedException ie) {
throw ie;
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}

if (ee == null)
ee = new ExecutionException();
throw ee;

} finally {
// 9
for (Future<T> f : futures)
f.cancel(true);
}
}

标注代码分析

  1. 首先提交1个任务,任务数量-1。
  2. 轮询。
  3. ExecutorCompletionService中获取任务。
  4. 如果Future f=null,还有剩余任务(ntasks > 0),提交任务到ExecutorCompletionService里,active+1。
  5. 没有在提交的任务,执行任务数量。(active = 0)
  6. 超时处理。
  7. 可能ExecutorCompletionService中的任务执行完毕,赋值给Future f。
  8. f!=null,说明任务执行完成,active-1,返回Future#f#get()
  9. 没执行完的任务取消掉。

AbstractExecutorService#invokeAny()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}

AbstractExecutorService#invokeAll()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
// 1
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
// 2
for (Future<T> f : futures) {
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
// 3
done = true;
return futures;
} finally {
// 4
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}

标注代码分析

  1. 添加任务,执行任务
  2. 等待所有的任务都执行完成。
  3. done判断任务执行完成。
  4. done=false,所有任务并没有执行完成,取消没有完成的任务。

ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
public class ThreadPoolExecutor extends AbstractExecutorService {

/**
* Permission for checking shutdown
*/
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");


/**
* runState provides the main lifecyle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TERMINATED: Same as STOP, plus all threads have terminated
*
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TERMINATED
* When both queue and pool are empty
* STOP -> TERMINATED
* When pool is empty
*/
// 1
volatile int runState;
// 2
static final int RUNNING = 0;
// 3
static final int SHUTDOWN = 1;
// 4
static final int STOP = 2;
// 5
static final int TERMINATED = 3;

/**
* The queue used for holding tasks and handing off to worker
* threads. Note that when using this queue, we do not require
* that workQueue.poll() returning null necessarily means that
* workQueue.isEmpty(), so must sometimes check both. This
* accommodates special-purpose queues such as DelayQueues for
* which poll() is allowed to return null even if it may later
* return non-null when delays expire.
*/
// 6
private final BlockingQueue<Runnable> workQueue;

/**
* Lock held on updates to poolSize, corePoolSize,
* maximumPoolSize, runState, and workers set.
*/
// 7
private final ReentrantLock mainLock = new ReentrantLock();

/**
* Wait condition to support awaitTermination
*/
// 8
private final Condition termination = mainLock.newCondition();

/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
// 9
private final HashSet<Worker> workers = new HashSet<Worker>();

/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*/
// 10
private volatile long keepAliveTime;

/**
* If false (default) core threads stay alive even when idle. If
* true, core threads use keepAliveTime to time out waiting for
* work.
*/
// 11
private volatile boolean allowCoreThreadTimeOut;

/**
* Core pool size, updated only while holding mainLock, but
* volatile to allow concurrent readability even during updates.
*/
// 12
private volatile int corePoolSize;

/**
* Maximum pool size, updated only while holding mainLock but
* volatile to allow concurrent readability even during updates.
*/
// 13
private volatile int maximumPoolSize;

/**
* Current pool size, updated only while holding mainLock but
* volatile to allow concurrent readability even during updates.
*/
// 14
private volatile int poolSize;

/**
* Handler called when saturated or shutdown in execute.
*/
// 15
private volatile RejectedExecutionHandler handler;

/**
* Factory for new threads. All threads are created using this
* factory (via method addThread). All callers must be prepared
* for addThread to fail by returning null, which may reflect a
* system or user's policy limiting the number of threads. Even
* though it is not treated as an error, failure to create threads
* may result in new tasks being rejected or existing ones
* remaining stuck in the queue. On the other hand, no special
* precautions exist to handle OutOfMemoryErrors that might be
* thrown while trying to create threads, since there is generally
* no recourse from within this class.
*/
// 16
private volatile ThreadFactory threadFactory;

/**
* Tracks largest attained pool size.
*/
// 17
private int largestPoolSize;

/**
* Counter for completed tasks. Updated only on termination of
* worker threads.
*/
// 18
private long completedTaskCount;

/**
* The default rejected execution handler
*/
// 19
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();

标注代码分析

  1. 线程池运行状态。
  2. 正在运行。会接受新任务,并会处理任务队列中的任务。
  3. 已经关闭。不接受新任务,但仍然会处理任务队列中的任务。
  4. 已经停止。不接受新任务,不处理任务队列中的任务,会中断正在执行的任务。
  5. STOP的基础上,在加上所有的任务都已经结束。
  6. 任务队列,将任务交给工作线程处理。
  7. 在更新内部数据(如:线程数量,运行状态,工作线程集等)时要使用的锁。
  8. 用于支持awaitTermination的等待条件。
  9. 包含所有工作类的集合。只能在持有mainLock的情况下使用。
  10. 空闲工作线程等待任务的超时时间,单位:纳秒。当前线程数大于核心线程数时,超出的线程会使用这个超时时间。如果设置了allowCoreThreadTimeOut,核心线程数也会使用这个超时时间。否则,线程会一直等待新任务,不会超时。
  11. 如果为false(默认情况下),核心线程就算空闲也会一直存活。如果为true,等待任务的核心线程会使用keepAliveTime作为超时时间,如果超时,线程被回收。
  12. 核心线程数量,只能在持有mainLock的情况下修改。volatile可以保证可见性。
  13. 最大线程数量,只能在持有mainLock的情况下修改。
  14. 当前线程数量,只能在持有mainLock的情况下修改。
  15. 当线程池饱和或者关闭时,负责处理新来任务的处理,称为拒绝任务处理器。
  16. 线程工厂。用于创建新线程。
  17. 最大的线程数量记录。
  18. 统计任务完成数量的计数器。在工作线程终止(termination)的时候才会更新。
  19. 默认的拒绝任务处理。

ThreadPoolExecutor 构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
  • 内部的keepAliveTime都使用纳秒,所以构造方法中会有1个时间转换。
  • 不指定线程工厂,会使用Executors.defaultThreadFactory()。

Executors#defaultThreadFactory()

1
2
3
public static ThreadFactory defaultThreadFactory(){
return new DefaultThreadFactory();
}

Executors#DefaultThreadFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

通过DefaultThreadFactory创建Thread Name,Thread Group的Thread,由poolNumber.getAndIncrement() 知道线程序号是递增的。

ThreadPoolExecutor#execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current <tt>RejectedExecutionHandler</tt>.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
* for execution
* @throws NullPointerException if command is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 1
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
// 2
if (runState == RUNNING && workQueue.offer(command)) {
// 3
if (runState != RUNNING || poolSize == 0)
// 4
ensureQueuedTaskHandled(command);
}
// 5
else if (!addIfUnderMaximumPoolSize(command))
// 6
reject(command); // is shutdown or saturated
}
}

标注代码分析

  1. 如果当前线程数量大于等于核心线程数量(poolSize >= corePoolSize),执行#2。如果当前线程数量小于核心线程数量(poolSize < corePoolSize),那么尝试添加1个工作线程(addIfUnderCorePoolSize),同时让这个工作线程处理当前提交的任务,提交任务流程结束;如果添加工作线程失败,那么进入#2。
  2. 首先判断当前线程池状态是否为正在运行,如果正在运行,就将当前任务放入任务队列中。
  3. 如果当前线程池状态不是正在运行,或者workQueue.offer(command)操作失败,所以任务队列 == 0。
  4. x
  5. 添加一个工作线程,同时让这个工作线程处理当前提交的任务,但不能超时最大工作线程数。
  6. addIfUnderMaximumPoolSize(command)如果添加成功,提交任务流程结束;如果添加失败,使用拒绝任务处理器来处理任务。

ThreadPoolExecutor#addIfUnderCorePoolSize()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Creates and starts a new thread running firstTask as its first
* task, only if fewer than corePoolSize threads are running
* and the pool is not shut down.
* @param firstTask the task the new thread should run first (or
* null if none)
* @return true if successful
*/
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
// 1
mainLock.lock();
try {
// 2
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
// 3
t.start();
return true;
}

标注代码分析

  1. 新增线程需要加锁。
  2. 当前线程数量小于核心线程数量,当前线程池处于运行状态,那么添加1个工作线程。
  3. 任务执行。

ThreadPoolExecutor#addThread()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Creates and returns a new thread running firstTask as its first
* task. Call only while holding mainLock.
*
* @param firstTask the task the new thread should run first (or
* null if none)
* @return the new thread, or null if threadFactory fails to create thread
*/
private Thread addThread(Runnable firstTask) {
// 1
Worker w = new Worker(firstTask);
// 2
Thread t = threadFactory.newThread(w);
if (t != null) {
// 3
w.thread = t;
// 4
workers.add(w);
// 5
int nt = ++poolSize;
// 6
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}

标注代码分析

  1. 创建1个worker任务。
  2. 根据threadFactory创建1个Thread。
  3. 新建的Thread赋值给worker#thread。
  4. worker集合添加worker对象。
  5. 线程池中线程数量+1。
  6. 线程池最大值重设值为nt。

ThreadPoolExecutor#Worker

1
2
3
4
5
6
...
When starting to run a task, unless the pool is stopped, each
worker thread ensures that it is not interrupted, and uses
runLock to prevent the pool from interrupting it in the midst
of execution.
...

根据注释可以知道,Worker是Runnable,Worker用到ReentrantLock防止pool在运行中中断,worker thread确保不会被中断,除非pool停止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
private final class Worker implements Runnable {
/**
* The runLock is acquired and released surrounding each task
* execution. It mainly protects against interrupts that are
* intended to cancel the worker thread from instead
* interrupting the task being run.
*/
// 1
private final ReentrantLock runLock = new ReentrantLock();

/**
* Initial task to run before entering run loop. Possibly null.
*/
// 2
private Runnable firstTask;

/**
* Per thread completed task counter; accumulated
* into completedTaskCount upon termination.
*/
// 3
volatile long completedTasks;

/**
* Thread this worker is running in. Acts as a final field,
* but cannot be set until thread is created.
*/
// 4
Thread thread;

Worker(Runnable firstTask) {
this.firstTask = firstTask;
}
// 5
boolean isActive() {
return runLock.isLocked();
}

/**
* Interrupts thread if not running a task.
*/
// 6
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock()) {
try {
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}

/**
* Interrupts thread even if running a task.
*/
// 7
void interruptNow() {
thread.interrupt();
}

/**
* Runs a single task between before/after methods.
*/
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
/*
* Ensure that unless pool is stopping, this thread
* does not have its interrupt set. This requires a
* double-check of state in case the interrupt was
* cleared concurrently with a shutdownNow -- if so,
* the interrupt is re-enabled.
*/
// 8
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
thread.interrupt();
/*
* Track execution state to ensure that afterExecute
* is called only if task completed or threw
* exception. Otherwise, the caught runtime exception
* will have been thrown by afterExecute itself, in
* which case we don't want to call it again.
*/
// 9
boolean ran = false;
beforeExecute(thread, task);
try {
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}

/**
* Main run loop
*/
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
// 10
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
}

标注代码分析

  1. 防止中断线程,取消worker线程
  2. 初始任务,可能是null
  3. 线程完成任务数量
  4. worker运行在这Thread上
  5. 调用ReentrantLock#isLocked(),作为监控Thread状态,非sync操作。Thread hold this lock,true表示当前线程持有这锁,Thread活跃状态。
  6. 如果worker是闲置,中断当前worker#thread
  7. 任务运行中,中断thread
  8. 当pool是停止,中断当前thread。
  9. ran=false,执行task#run()后出现异常,则会调用afterExecute(task, ex),如果task#run()出现异常,则不会调用afterExecute(task, ex)。ThreadPoolExecutor#beforeExecute()和ThreadPoolExecutor#afterExecute()作为子类实现。
  10. 检查firstTask不为空,ThreadPoolExecutor#getTask()赋值task,运行task。

ThreadPoolExecutor#beforeExecute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Method invoked prior to executing the given Runnable in the
* given thread. This method is invoked by thread <tt>t</tt> that
* will execute task <tt>r</tt>, and may be used to re-initialize
* ThreadLocals, or to perform logging.
*
* <p>This implementation does nothing, but may be customized in
* subclasses. Note: To properly nest multiple overridings, subclasses
* should generally invoke <tt>super.beforeExecute</tt> at the end of
* this method.
*
* @param t the thread that will run task r.
* @param r the task that will be executed.
*/
protected void beforeExecute(Thread t, Runnable r) { }

ThreadPoolExecutor#afterExecute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Method invoked upon completion of execution of the given Runnable.
* This method is invoked by the thread that executed the task. If
* non-null, the Throwable is the uncaught <tt>RuntimeException</tt>
* or <tt>Error</tt> that caused execution to terminate abruptly.
*
* <p><b>Note:</b> When actions are enclosed in tasks (such as
* {@link FutureTask}) either explicitly or via methods such as
* <tt>submit</tt>, these task objects catch and maintain
* computational exceptions, and so they do not cause abrupt
* termination, and the internal exceptions are <em>not</em>
* passed to this method.
*
* <p>This implementation does nothing, but may be customized in
* subclasses. Note: To properly nest multiple overridings, subclasses
* should generally invoke <tt>super.afterExecute</tt> at the
* beginning of this method.
*
* @param r the runnable that has completed.
* @param t the exception that caused termination, or null if
* execution completed normally.
*/
protected void afterExecute(Runnable r, Throwable t) { }

ThreadPoolExecutor#getTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Gets the next task for a worker thread to run. The general
* approach is similar to execute() in that worker threads trying
* to get a task to run do so on the basis of prevailing state
* accessed outside of locks. This may cause them to choose the
* "wrong" action, such as trying to exit because no tasks
* appear to be available, or entering a take when the pool is in
* the process of being shut down. These potential problems are
* countered by (1) rechecking pool state (in workerCanExit)
* before giving up, and (2) interrupting other workers upon
* shutdown, so they can recheck state. All other user-based state
* changes (to allowCoreThreadTimeOut etc) are OK even when
* performed asynchronously wrt getTask.
*
* @return the task
*/
Runnable getTask() {
for (;;) {
try {
int state = runState;
// 1
if (state > SHUTDOWN)
return null;
Runnable r;
// 2
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)// 3
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else // 4
r = workQueue.take();
if (r != null)
return r;
// 5
if (workerCanExit()) {
// 6
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}

标注代码分析

  1. 线程池停止、终端,则返回null。
  2. 线程池关闭,获取队列里的任务。
  3. 如果设置超时获取队列中的任务,通过超时poll(long)获取。
  4. 阻塞等待获取任务。
  5. 检测是否可以退出这for循环。ThreadPoolExecutor#workerCanExit()代码,可以知道线程池中断、workQueue是空、设置超时时间都可以作为退出条件。
  6. 如果线程池中断、关闭,workers集合中可能还有空闲Thread,中断workers集合中的任务。

ThreadPoolExecutor#workerCanExit()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Check whether a worker thread that fails to get a task can
* exit. We allow a worker thread to die if the pool is stopping,
* or the queue is empty, or there is at least one thread to
* handle possibly non-empty queue, even if core timeouts are
* allowed.
*/
private boolean workerCanExit() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean canExit;
try {
canExit = runState >= STOP ||
workQueue.isEmpty() ||
(allowCoreThreadTimeOut &&
poolSize > Math.max(1, corePoolSize));
} finally {
mainLock.unlock();
}
return canExit;
}

ThreadPoolExecutor#interruptIdleWorkers()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Wakes up all threads that might be waiting for tasks so they
* can check for termination. Note: this method is also called by
* ScheduledThreadPoolExecutor.
*/
void interruptIdleWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfIdle();
} finally {
mainLock.unlock();
}
}

ThreadPoolExecutor#workerDone()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Performs bookkeeping for an exiting worker thread.
* @param w the worker
*/
void workerDone(Worker w) {
final ReentrantLock mainLock = this.mainLock;
// 1
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
// 2
if (--poolSize == 0)
tryTerminate();
} finally {
mainLock.unlock();
}
}

标注代码分析

  1. 加锁,线程任务完成记录+1,从worker集合中移除当前work。
  2. 如果线程池当前数量=0;如果线程状态是停止、关闭,中断线程池。如果线程状态是运行且workQueue阻塞队列有任务,则至少1个活线程。

ThreadPoolExecutor#tryTerminate()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty), otherwise unless
* stopped, ensuring that there is at least one live thread to
* handle queued tasks.
*
* This method is called from the three places in which
* termination can occur: in workerDone on exit of the last thread
* after pool has been shut down, or directly within calls to
* shutdown or shutdownNow, if there are no live threads.
*/
private void tryTerminate() {
// 1
if (poolSize == 0) {
// 2
int state = runState;
// 3
if (state < STOP && !workQueue.isEmpty()) {
state = RUNNING; // disable termination check below
Thread t = addThread(null);
if (t != null)
t.start();
}
// 4
if (state == STOP || state == SHUTDOWN) {
runState = TERMINATED;
termination.signalAll();
terminated();
}
}
}

标注代码分析

  1. 线程池数量为0。
  2. 当前线程池状态赋值为局部变量state。
  3. 当线程池是RUNNING或者SHUTDOWN状态,且workQueue不为空。新建1个null的线程,修改线程池状态为RUNNING。
  4. 线程池是STOP,设置当前线程池状态是TERMINATED。同时唤醒1条等待线程。

ThreadPoolExecutor#ensureQueuedTaskHandled()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Rechecks state after queuing a task. Called from execute when
* pool state has been observed to change after queuing a task. If
* the task was queued concurrently with a call to shutdownNow,
* and is still present in the queue, this task must be removed
* and rejected to preserve shutdownNow guarantees. Otherwise,
* this method ensures (unless addThread fails) that there is at
* least one live thread to handle this task
* @param command the task
*/
private void ensureQueuedTaskHandled(Runnable command) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
// 1
boolean reject = false;
Thread t = null;
try {
int state = runState;
// 2
if (state != RUNNING && workQueue.remove(command))
reject = true;
else if (state < STOP &&
poolSize < Math.max(corePoolSize, 1) &&
!workQueue.isEmpty())// 3
t = addThread(null);
} finally {
mainLock.unlock();
}
// 4
if (reject)
reject(command);
else if (t != null)
t.start();
}

标注代码分析

  1. 拒绝标志符变量。
  2. 线程池不是RUNNING,则从阻塞队列workQueue删除当前任务。设置拒绝标识符。
  3. 线程池状态不是STOP,workQueue非空,ThreadPoolExecutor#execute()中的runState != RUNNING ,可以知道线程池状态是SHUTDOWN。和ensureQueuedTaskHandled#tryTerminate()一样,必需有1个活线程执行任务。
  4. 执行拒绝方法RejectedExecutionHandler#rejectedExecution,或者运行null runnable。

ThreadPoolExecutor#addIfUnderMaximumPoolSize()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Creates and starts a new thread running firstTask as its first
* task, only if fewer than maximumPoolSize threads are running
* and pool is not shut down.
* @param firstTask the task the new thread should run first (or
* null if none)
* @return true if successful
*/
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}

标注代码分析

  1. 线程池中线程数量<最大线程池线程数量,线程池状态RUNNING,添加1个worker任务。创建thread,赋值给worker#thread。

ThreadPoolExecutor#shutdown()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void shutdown() {
// 1
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkPermission(shutdownPerm);

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 2
if (security != null) { // Check if caller can modify our threads
// 3
for (Worker w : workers)
security.checkAccess(w.thread);
}
// 4
int state = runState;
if (state < SHUTDOWN)
runState = SHUTDOWN;

try {
// 5
for (Worker w : workers) {
w.interruptIfIdle();
}
} catch (SecurityException se) { // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
}
// 6
tryTerminate(); // Terminate now if pool and queue empty
} finally {
mainLock.unlock();
}
}

标注代码分析

  1. 检测是否允许关闭。
  2. 是否有些线程允许被修改。
  3. 设置线程运行被修改。
  4. 设置线程池状态SHUTDOWN。
  5. worker阻塞队列终端线程。
  6. 根据注释Some workers may have been killed but we remain in non-shutdown state (which may entail tryTerminate from workerDone starting a new worker to maintain liveness.知道保留1个null的thread。

ThreadPoolExecutor#shutdownNow()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public List<Runnable> shutdownNow() {
/*
* shutdownNow differs from shutdown only in that
* 1. runState is set to STOP,
* 2. all worker threads are interrupted, not just the idle ones, and
* 3. the queue is drained and returned.
*/
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkPermission(shutdownPerm);

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (security != null) { // Check if caller can modify our threads
for (Worker w : workers)
security.checkAccess(w.thread);
}

int state = runState;
if (state < STOP)
runState = STOP;

try {
for (Worker w : workers) {
w.interruptNow();
}
} catch (SecurityException se) { // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
}
// 1
List<Runnable> tasks = drainQueue();
tryTerminate(); // Terminate now if pool and queue empty
return tasks;
} finally {
mainLock.unlock();
}
}

标注代码分析

  1. 情况worker阻塞队列,返回阻塞队列任务集合。

shutdownNow()相对shutdown()多1个drainQueue()。

ThreadPoolExecutor#drainQueue()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Drains the task queue into a new list. Used by shutdownNow.
* Call only while holding main lock.
*/
private List<Runnable> drainQueue() {
List<Runnable> taskList = new ArrayList<Runnable>();
// 1
workQueue.drainTo(taskList);
// 2
while (!workQueue.isEmpty()) {
// 3
Iterator<Runnable> it = workQueue.iterator();
try {
if (it.hasNext()) {
Runnable r = it.next();
if (workQueue.remove(r))
taskList.add(r);
}
} catch (ConcurrentModificationException ignore) {
}
}
return taskList;
}

标注代码分析

  1. 清空workQueue阻塞队列,并把workQueue阻塞队列数据放到新建list集合。
  2. 根据注释If the queue is a DelayQueue or any other kind of queue for which poll or drainTo may fail to remove some elementsdrainTo()可能会失败,所以需要遍历workQueue阻塞队列,再次清除1遍。
  3. 根据注释guarantee atomicity wrt other threads using this queue, we need to create a new iterator for each element removed.保证原子性。每次循环都创建新的iterator对象。

ThreadPoolExecutor#awaitTermination()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
// 1
if (runState == TERMINATED)
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}

标注代码分析

  1. 线程状态TERMINATED,直接返回true。
  2. 时间负数,线程中断失败。
  3. 等待(或等待超时),等待线程被唤醒或者中断。

ThreadPoolExecutor#finalize()

1
2
3
4
5
6
7
/**
* Invokes <tt>shutdown</tt> when this executor is no longer
* referenced.
*/
protected void finalize() {
shutdown();
}

覆写finalize(),根据注释executor没有强引用的时候,可以执行shutdown()。可以参考finalize章节。

ThreadPoolExecutor#prestartCoreThread()

1
2
3
4
5
6
7
8
9
10
/**
* Starts a core thread, causing it to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed. This method will return <tt>false</tt>
* if all core threads have already been started.
* @return true if a thread was started
*/
public boolean prestartCoreThread() {
return addIfUnderCorePoolSize(null);
}

如果poolSize < corePoolSize,创建1个thread,返回true。根据注释知道,poolSize >= corePoolSize,线程池里线程都启动,所以返回false。

ThreadPoolExecutor#prestartAllCoreThreads()

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Starts all core threads, causing them to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed.
* @return the number of threads started
*/
public int prestartAllCoreThreads() {
int n = 0;
while (addIfUnderCorePoolSize(null))
++n;
return n;
}

prestartAllCoreThreads()相对prestartCoreThread()多while循环。在poolSize < corePoolSize时候,不断创建runnable = null的线程。

ThreadPoolExecutor#setCorePoolSize()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int extra = this.corePoolSize - corePoolSize;
this.corePoolSize = corePoolSize;
// 1
if (extra < 0) {
int n = workQueue.size(); // don't add more threads than tasks
while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize) {
Thread t = addThread(null);
if (t != null)
t.start();
else
break;
}
}
else if (extra > 0 && poolSize > corePoolSize) { // 2
try {
Iterator<Worker> it = workers.iterator();
while (it.hasNext() &&
extra-- > 0 &&
poolSize > corePoolSize &&
workQueue.remainingCapacity() == 0)
it.next().interruptIfIdle();
} catch (SecurityException ignore) {
// Not an error; it is OK if the threads stay live
}
}
} finally {
mainLock.unlock();
}
}

标注代码分析

  1. new corePoolSize > corePoolSize,新增extra数量的runnable = null的Thread。
  2. new corePoolSize < corePoolSize,中断worker阻塞队列中闲置的线程。

ThreadPoolExecutor#setMaximumPoolSize()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int extra = this.maximumPoolSize - maximumPoolSize;
this.maximumPoolSize = maximumPoolSize;
// 1
if (extra > 0 && poolSize > maximumPoolSize) {
try {
Iterator<Worker> it = workers.iterator();
while (it.hasNext() &&
extra > 0 &&
poolSize > maximumPoolSize) {
it.next().interruptIfIdle();
--extra;
}
} catch (SecurityException ignore) {
// Not an error; it is OK if the threads stay live
}
}
} finally {
mainLock.unlock();
}
}

标注代码分析

  1. 根据注释可以理解,如果new maximumPoolSize > maximumPoolSize,则直接设值。如果new maximumPoolSize < maximumPoolSize,中断workers阻塞队列中的闲置Thread。

    实现RejectedExecutionHandler接口

    ThreadPoolExecutor#AbortPolicy

    拒绝当前任务,抛出RejectedExecutionException。

    ThreadPoolExecutor#CallerRunsPolicy

    线程池未关闭,直接运行当前任务。

    ThreadPoolExecutor#DiscardPolicy

    丢弃当前任务,什么都不操作。

    ThreadPoolExecutor#DiscardOldestPolicy

    丢弃Queue的第1个任务,也就是最早添加的Runnable,运行当前任务。

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×