- 介绍
- 源码分析
- Executor
- ExecutorService
- AbstractExecutorService#newTaskFor
- AbstractExecutorService#submit
- AbstractExecutorService#doInvokeAny()
- AbstractExecutorService#invokeAny()
- AbstractExecutorService#invokeAll()
- ThreadPoolExecutor
- ThreadPoolExecutor 构造函数
- Executors#defaultThreadFactory()
- Executors#DefaultThreadFactory
- ThreadPoolExecutor#execute()
- ThreadPoolExecutor#addIfUnderCorePoolSize()
- ThreadPoolExecutor#addThread()
- ThreadPoolExecutor#Worker
- ThreadPoolExecutor#beforeExecute()
- ThreadPoolExecutor#afterExecute()
- ThreadPoolExecutor#getTask()
- ThreadPoolExecutor#workerCanExit()
- ThreadPoolExecutor#interruptIdleWorkers()
- ThreadPoolExecutor#workerDone()
- ThreadPoolExecutor#tryTerminate()
- ThreadPoolExecutor#ensureQueuedTaskHandled()
- ThreadPoolExecutor#addIfUnderMaximumPoolSize()
- ThreadPoolExecutor#shutdown()
- ThreadPoolExecutor#shutdownNow()
- ThreadPoolExecutor#drainQueue()
- ThreadPoolExecutor#awaitTermination()
- ThreadPoolExecutor#finalize()
- ThreadPoolExecutor#prestartCoreThread()
- ThreadPoolExecutor#prestartAllCoreThreads()
- ThreadPoolExecutor#setCorePoolSize()
- ThreadPoolExecutor#setMaximumPoolSize()
- 实现RejectedExecutionHandler接口
介绍
ThreadPoolExecutor是JUC包中提供的线程池,使用ThreadPoolExecutor的好处一方面是能重用线程资源,避免重复创建线程带来的开销;另一方面是ThreadPoolExecutor提供了内部资源(线程、任务)的管理功能,方便我们监控线程池工作状态。
源码分析
ThreadPoolExecutor类结构图
Executor
1 | public interface Executor { |
标注代码分析
- 根据注释很好理解。command可以在新线程里执行,可以在线程池里执行,可以被调用线程执行等等。
ExecutorService
1 | // 1 |
标注代码分析
- 关闭之前,已经提交的任务会被执行。新任务被拒绝。服务被关闭,则调用这个方法没有影响。
- 尝试停止所有正在执行的任务,停止处理正在等待的任务,并返回等待执行任务列表。并不能确保一定可以停止正在执行的任务。比如,通常的实现方式是中断执行任务的线程,但如果任务执行过程中并不响应中断,那就无法停止这个任务。
- 是否已经关闭。
- 关闭后,是否所有的任务都执行完毕。
- 在关闭、超时、任务中断后,任务执行完毕前,都阻塞。
- 提交任务。
- 执行任务集合。
- 执行一批任务,如果其中有一个任务完成,就返回结果。其他没有完成的任务会被取消。
AbstractExecutorService#newTaskFor
1 | /** |
将runnable、callable转换成FutureTask。
AbstractExecutorService#submit
1 | public Future<?> submit(Runnable task) { |
先转成RunnableFuture,然后提交到Executor,然后返回RunnableFuture(异步任务)。
AbstractExecutorService#doInvokeAny()
1 | /** |
标注代码分析
- 首先提交1个任务,任务数量-1。
- 轮询。
- ExecutorCompletionService中获取任务。
- 如果Future f=null,还有剩余任务(ntasks > 0),提交任务到ExecutorCompletionService里,active+1。
- 没有在提交的任务,执行任务数量。(active = 0)
- 超时处理。
- 可能ExecutorCompletionService中的任务执行完毕,赋值给Future f。
- f!=null,说明任务执行完成,active-1,返回Future#f#get()
- 没执行完的任务取消掉。
AbstractExecutorService#invokeAny()
1 | public <T> T invokeAny(Collection<? extends Callable<T>> tasks) |
AbstractExecutorService#invokeAll()
1 | public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) |
标注代码分析
- 添加任务,执行任务
- 等待所有的任务都执行完成。
- done判断任务执行完成。
- done=false,所有任务并没有执行完成,取消没有完成的任务。
ThreadPoolExecutor
1 | public class ThreadPoolExecutor extends AbstractExecutorService { |
标注代码分析
- 线程池运行状态。
- 正在运行。会接受新任务,并会处理任务队列中的任务。
- 已经关闭。不接受新任务,但仍然会处理任务队列中的任务。
- 已经停止。不接受新任务,不处理任务队列中的任务,会中断正在执行的任务。
- STOP的基础上,在加上所有的任务都已经结束。
- 任务队列,将任务交给工作线程处理。
- 在更新内部数据(如:线程数量,运行状态,工作线程集等)时要使用的锁。
- 用于支持awaitTermination的等待条件。
- 包含所有工作类的集合。只能在持有mainLock的情况下使用。
- 空闲工作线程等待任务的超时时间,单位:纳秒。当前线程数大于核心线程数时,超出的线程会使用这个超时时间。如果设置了allowCoreThreadTimeOut,核心线程数也会使用这个超时时间。否则,线程会一直等待新任务,不会超时。
- 如果为false(默认情况下),核心线程就算空闲也会一直存活。如果为true,等待任务的核心线程会使用keepAliveTime作为超时时间,如果超时,线程被回收。
- 核心线程数量,只能在持有mainLock的情况下修改。volatile可以保证可见性。
- 最大线程数量,只能在持有mainLock的情况下修改。
- 当前线程数量,只能在持有mainLock的情况下修改。
- 当线程池饱和或者关闭时,负责处理新来任务的处理,称为拒绝任务处理器。
- 线程工厂。用于创建新线程。
- 最大的线程数量记录。
- 统计任务完成数量的计数器。在工作线程终止(termination)的时候才会更新。
- 默认的拒绝任务处理。
ThreadPoolExecutor 构造函数
1 |
|
- 内部的keepAliveTime都使用纳秒,所以构造方法中会有1个时间转换。
- 不指定线程工厂,会使用Executors.defaultThreadFactory()。
Executors#defaultThreadFactory()
1 | public static ThreadFactory defaultThreadFactory(){ |
Executors#DefaultThreadFactory
1 | /** |
通过DefaultThreadFactory创建Thread Name,Thread Group的Thread,由poolNumber.getAndIncrement() 知道线程序号是递增的。
ThreadPoolExecutor#execute()
1 | /** |
标注代码分析
- 如果当前线程数量大于等于核心线程数量(poolSize >= corePoolSize),执行#2。如果当前线程数量小于核心线程数量(poolSize < corePoolSize),那么尝试添加1个工作线程(addIfUnderCorePoolSize),同时让这个工作线程处理当前提交的任务,提交任务流程结束;如果添加工作线程失败,那么进入#2。
- 首先判断当前线程池状态是否为正在运行,如果正在运行,就将当前任务放入任务队列中。
- 如果当前线程池状态不是正在运行,或者workQueue.offer(command)操作失败,所以任务队列 == 0。
- x
- 添加一个工作线程,同时让这个工作线程处理当前提交的任务,但不能超时最大工作线程数。
- addIfUnderMaximumPoolSize(command)如果添加成功,提交任务流程结束;如果添加失败,使用拒绝任务处理器来处理任务。
ThreadPoolExecutor#addIfUnderCorePoolSize()
1 | /** |
标注代码分析
- 新增线程需要加锁。
- 当前线程数量小于核心线程数量,当前线程池处于运行状态,那么添加1个工作线程。
- 任务执行。
ThreadPoolExecutor#addThread()
1 | /** |
标注代码分析
- 创建1个worker任务。
- 根据threadFactory创建1个Thread。
- 新建的Thread赋值给worker#thread。
- worker集合添加worker对象。
- 线程池中线程数量+1。
- 线程池最大值重设值为nt。
ThreadPoolExecutor#Worker
1 | ... |
根据注释可以知道,Worker是Runnable,Worker用到ReentrantLock防止pool在运行中中断,worker thread确保不会被中断,除非pool停止。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123private final class Worker implements Runnable {
/**
* The runLock is acquired and released surrounding each task
* execution. It mainly protects against interrupts that are
* intended to cancel the worker thread from instead
* interrupting the task being run.
*/
// 1
private final ReentrantLock runLock = new ReentrantLock();
/**
* Initial task to run before entering run loop. Possibly null.
*/
// 2
private Runnable firstTask;
/**
* Per thread completed task counter; accumulated
* into completedTaskCount upon termination.
*/
// 3
volatile long completedTasks;
/**
* Thread this worker is running in. Acts as a final field,
* but cannot be set until thread is created.
*/
// 4
Thread thread;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
}
// 5
boolean isActive() {
return runLock.isLocked();
}
/**
* Interrupts thread if not running a task.
*/
// 6
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock()) {
try {
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}
/**
* Interrupts thread even if running a task.
*/
// 7
void interruptNow() {
thread.interrupt();
}
/**
* Runs a single task between before/after methods.
*/
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
/*
* Ensure that unless pool is stopping, this thread
* does not have its interrupt set. This requires a
* double-check of state in case the interrupt was
* cleared concurrently with a shutdownNow -- if so,
* the interrupt is re-enabled.
*/
// 8
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
thread.interrupt();
/*
* Track execution state to ensure that afterExecute
* is called only if task completed or threw
* exception. Otherwise, the caught runtime exception
* will have been thrown by afterExecute itself, in
* which case we don't want to call it again.
*/
// 9
boolean ran = false;
beforeExecute(thread, task);
try {
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}
/**
* Main run loop
*/
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
// 10
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
}
标注代码分析
- 防止中断线程,取消worker线程
- 初始任务,可能是null
- 线程完成任务数量
- worker运行在这Thread上
- 调用ReentrantLock#isLocked(),作为监控Thread状态,非sync操作。Thread hold this lock,true表示当前线程持有这锁,Thread活跃状态。
- 如果worker是闲置,中断当前worker#thread
- 任务运行中,中断thread
- 当pool是停止,中断当前thread。
- ran=false,执行task#run()后出现异常,则会调用afterExecute(task, ex),如果task#run()出现异常,则不会调用afterExecute(task, ex)。ThreadPoolExecutor#beforeExecute()和ThreadPoolExecutor#afterExecute()作为子类实现。
- 检查firstTask不为空,ThreadPoolExecutor#getTask()赋值task,运行task。
ThreadPoolExecutor#beforeExecute()
1 | /** |
ThreadPoolExecutor#afterExecute()
1 | /** |
ThreadPoolExecutor#getTask()
1 | /** |
标注代码分析
- 线程池停止、终端,则返回null。
- 线程池关闭,获取队列里的任务。
- 如果设置超时获取队列中的任务,通过超时poll(long)获取。
- 阻塞等待获取任务。
- 检测是否可以退出这for循环。ThreadPoolExecutor#workerCanExit()代码,可以知道线程池中断、workQueue是空、设置超时时间都可以作为退出条件。
- 如果线程池中断、关闭,workers集合中可能还有空闲Thread,中断workers集合中的任务。
ThreadPoolExecutor#workerCanExit()
1 | /** |
ThreadPoolExecutor#interruptIdleWorkers()
1 | /** |
ThreadPoolExecutor#workerDone()
1 | /** |
标注代码分析
- 加锁,线程任务完成记录+1,从worker集合中移除当前work。
- 如果线程池当前数量=0;如果线程状态是停止、关闭,中断线程池。如果线程状态是运行且workQueue阻塞队列有任务,则至少1个活线程。
ThreadPoolExecutor#tryTerminate()
1 | /** |
标注代码分析
- 线程池数量为0。
- 当前线程池状态赋值为局部变量state。
- 当线程池是RUNNING或者SHUTDOWN状态,且workQueue不为空。新建1个null的线程,修改线程池状态为RUNNING。
- 线程池是STOP,设置当前线程池状态是TERMINATED。同时唤醒1条等待线程。
ThreadPoolExecutor#ensureQueuedTaskHandled()
1 | /** |
标注代码分析
- 拒绝标志符变量。
- 线程池不是RUNNING,则从阻塞队列workQueue删除当前任务。设置拒绝标识符。
- 线程池状态不是STOP,workQueue非空,ThreadPoolExecutor#execute()中的runState != RUNNING ,可以知道线程池状态是SHUTDOWN。和ensureQueuedTaskHandled#tryTerminate()一样,必需有1个活线程执行任务。
- 执行拒绝方法RejectedExecutionHandler#rejectedExecution,或者运行null runnable。
ThreadPoolExecutor#addIfUnderMaximumPoolSize()
1 | /** |
标注代码分析
- 线程池中线程数量<最大线程池线程数量,线程池状态RUNNING,添加1个worker任务。创建thread,赋值给worker#thread。
ThreadPoolExecutor#shutdown()
1 | public void shutdown() { |
标注代码分析
- 检测是否允许关闭。
- 是否有些线程允许被修改。
- 设置线程运行被修改。
- 设置线程池状态SHUTDOWN。
- worker阻塞队列终端线程。
- 根据注释
Some workers may have been killed but we remain in non-shutdown state (which may entail tryTerminate from workerDone starting a new worker to maintain liveness.
知道保留1个null的thread。
ThreadPoolExecutor#shutdownNow()
1 | public List<Runnable> shutdownNow() { |
标注代码分析
- 情况worker阻塞队列,返回阻塞队列任务集合。
shutdownNow()相对shutdown()多1个drainQueue()。
ThreadPoolExecutor#drainQueue()
1 | /** |
标注代码分析
- 清空workQueue阻塞队列,并把workQueue阻塞队列数据放到新建list集合。
- 根据注释
If the queue is a DelayQueue or any other kind of queue for which poll or drainTo may fail to remove some elements
drainTo()可能会失败,所以需要遍历workQueue阻塞队列,再次清除1遍。 - 根据注释
guarantee atomicity wrt other threads using this queue, we need to create a new iterator for each element removed.
保证原子性。每次循环都创建新的iterator对象。
ThreadPoolExecutor#awaitTermination()
1 | public boolean awaitTermination(long timeout, TimeUnit unit) |
标注代码分析
- 线程状态TERMINATED,直接返回true。
- 时间负数,线程中断失败。
- 等待(或等待超时),等待线程被唤醒或者中断。
ThreadPoolExecutor#finalize()
1 | /** |
覆写finalize(),根据注释executor没有强引用的时候,可以执行shutdown()。可以参考finalize章节。
ThreadPoolExecutor#prestartCoreThread()
1 | /** |
如果poolSize < corePoolSize,创建1个thread,返回true。根据注释知道,poolSize >= corePoolSize,线程池里线程都启动,所以返回false。
ThreadPoolExecutor#prestartAllCoreThreads()
1 | /** |
prestartAllCoreThreads()相对prestartCoreThread()多while循环。在poolSize < corePoolSize时候,不断创建runnable = null的线程。
ThreadPoolExecutor#setCorePoolSize()
1 | public void setCorePoolSize(int corePoolSize) { |
标注代码分析
- new corePoolSize > corePoolSize,新增extra数量的runnable = null的Thread。
- new corePoolSize < corePoolSize,中断worker阻塞队列中闲置的线程。
ThreadPoolExecutor#setMaximumPoolSize()
1 | public void setMaximumPoolSize(int maximumPoolSize) { |
标注代码分析
- 根据注释可以理解,如果new maximumPoolSize > maximumPoolSize,则直接设值。如果new maximumPoolSize < maximumPoolSize,中断workers阻塞队列中的闲置Thread。
实现RejectedExecutionHandler接口
ThreadPoolExecutor#AbortPolicy
拒绝当前任务,抛出RejectedExecutionException。ThreadPoolExecutor#CallerRunsPolicy
线程池未关闭,直接运行当前任务。ThreadPoolExecutor#DiscardPolicy
丢弃当前任务,什么都不操作。ThreadPoolExecutor#DiscardOldestPolicy
丢弃Queue的第1个任务,也就是最早添加的Runnable,运行当前任务。