JDK1.7 FutureTask

功能简介

  • FutureTask是一种异步任务(或异步计算),举个栗子,主线程的逻辑中需要使用某个值,但这个值需要负责的运算得来,那么主线程可以提前建立一个异步任务来计算这个值(在其他的线程中计算),然后去做其他事情,当需要这个值的时候再通过刚才建立的异步任务来获取这个值,有点并行的意思,这样可以缩短整个主线程逻辑的执行时间。
  • 1.7与1.6版本不同,1.7的FutureTask不再基于AQS来构建,而是在内部采用简单的Treiber Stack来保存等待线程。

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class FutureTask<V> implements RunnableFuture<V> {  


private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
// 1
private Callable<V> callable;
// 2
private Object outcome;
// 3
private volatile Thread runner;
// 4
private volatile WaitNode waiters;

内部状态可能得迁转过程

  • NEW -> COMPLETING -> NORMAL //正常完成
  • NEW -> COMPLETING -> EXCEPTIONAL //发生异常
  • NEW -> CANCELLED //取消
  • NEW -> INTERRUPTING -> INTERRUPTED //中断

标注代码分析

  1. 内部的callable,运行完成后设置为null。
  2. 如果正常完成,就是执行结果,通过get方法获取;如果发生异常,就是具体的异常对象,通过get方法抛出。本身没有volatile修饰, 依赖state的读写来保证可见性。
  3. 执行内部callable的线程。
  4. 存放等待线程的Treiber Stack。

JDK1.7 Condition

介绍

Condition是一个多线程间协调通信的工具类。使得某个或者某些线程一起等待某个条件(Condition),只有当该条件具备(signal()或者signalAll()被带调用)时,这些等待线程才会被唤醒,从而重新争夺锁。
ConditionObject监视器方法(wait()notify()notifyAll())分解成截然不同的对象,以便通过将这些对象与任意Lock实现组合使用,为每个对象提供多个等待set()wait-set)。其中,Lock替代synchronized和语句的使用,Condition替代Object监视器方法的使用。
Lock对应Synchronized,使用之前都要先获取锁。

Object Condition
休眠 wait await
唤醒个线程 notify signal
唤醒所有线程 notifyAll signalAll

Condition它更强大的地方在于。能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,就是多个监视器的意思。在不同的情况下使用不同的Condition

Fork-Join其他方法

ForkJoinTask

ForkJoinTask本身也是Future的实现,所以也会有取消过程。

ForkJoinTask#cancel()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed or could not be
* cancelled for some other reason. If successful, and this task
* has not started when {@code cancel} is called, execution of
* this task is suppressed. After this method returns
* successfully, unless there is an intervening call to {@link
* #reinitialize}, subsequent calls to {@link #isCancelled},
* {@link #isDone}, and {@code cancel} will return {@code true}
* and calls to {@link #join} and related methods will result in
* {@code CancellationException}.
*
* <p>This method may be overridden in subclasses, but if so, must
* still ensure that these properties hold. In particular, the
* {@code cancel} method itself must not throw exceptions.
*
* <p>This method is designed to be invoked by <em>other</em>
* tasks. To terminate the current task, you can just return or
* throw an unchecked exception from its computation method, or
* invoke {@link #completeExceptionally}.
*
* @param mayInterruptIfRunning this value has no effect in the
* default implementation because interrupts are not used to
* control cancellation.
*
* @return {@code true} if this task is now cancelled
*/
public boolean cancel(boolean mayInterruptIfRunning) {
return setCompletion(CANCELLED) == CANCELLED;
}

ForkJoinTask#setCompletion()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Marks completion and wakes up threads waiting to join this task,
* also clearing signal request bits.
*
* @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
* @return completion status on exit
*/
private int setCompletion(int completion) {
for (int s;;) {
if ((s = status) < 0)
return s;
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
if (s != 0)
synchronized (this) { notifyAll(); }
return completion;
}
}
}

设置任务运行状态为取消,然后唤醒在任务上等待的线程。

非Fork/Join源码分析

ForkJoinPool属性

部分参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* Main pool control -- a long packed with:
* AC: Number of active running workers minus target parallelism (16 bits)
* TC: Number of total workers minus target parallelism (16bits)
* ST: true if pool is terminating (1 bit)
* EC: the wait count of top waiting thread (15 bits)
* ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits)
*
* When convenient, we can extract the upper 32 bits of counts and
* the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
* (int)ctl. The ec field is never accessed alone, but always
* together with id and st. The offsets of counts by the target
* parallelism and the positionings of fields makes it possible to
* perform the most common checks via sign tests of fields: When
* ac is negative, there are not enough active workers, when tc is
* negative, there are not enough total workers, when id is
* negative, there is at least one waiting worker, and when e is
* negative, the pool is terminating. To deal with these possibly
* negative fields, we use casts in and out of "short" and/or
* signed shifts to maintain signedness.
*/
volatile long ctl;

// bit positions/shifts for fields
private static final int AC_SHIFT = 48;
private static final int TC_SHIFT = 32;
private static final int ST_SHIFT = 31;
private static final int EC_SHIFT = 16;

// bounds
private static final int MAX_ID = 0x7fff; // max poolIndex
private static final int SMASK = 0xffff; // mask short bits
private static final int SHORT_SIGN = 1 << 15;
private static final int INT_SIGN = 1 << 31;

// masks
private static final long STOP_BIT = 0x0001L << ST_SHIFT;
private static final long AC_MASK = ((long)SMASK) << AC_SHIFT;
private static final long TC_MASK = ((long)SMASK) << TC_SHIFT;

// units for incrementing and decrementing
private static final long TC_UNIT = 1L << TC_SHIFT;
private static final long AC_UNIT = 1L << AC_SHIFT;

// masks and units for dealing with u = (int)(ctl >>> 32)
private static final int UAC_SHIFT = AC_SHIFT - 32;
private static final int UTC_SHIFT = TC_SHIFT - 32;
private static final int UAC_MASK = SMASK << UAC_SHIFT;
private static final int UTC_MASK = SMASK << UTC_SHIFT;
private static final int UAC_UNIT = 1 << UAC_SHIFT;
private static final int UTC_UNIT = 1 << UTC_SHIFT;

// masks and units for dealing with e = (int)ctl
private static final int E_MASK = 0x7fffffff; // no STOP_BIT
private static final int EC_UNIT = 1 << EC_SHIFT;

ForkJoinPool的总控制信息,包含在一个long(64bit)里面。

  • AC: 表示当前活动的工作线程的数量减去并行度得到的数值。(16 bits)
  • TC: 表示全部工作线程的数量减去并行度得到的数值。(16bits)
  • ST: 表示当前ForkJoinPool是否正在关闭。(1 bit)
  • EC: 表示Treiber stack顶端的等待工作线程的等待次数。(15 bits)
  • ID: Treiber stack顶端的等待工作线程的下标取反。(16 bits)
1111111111111111 1111111111111111 1 111111111111111 1111111111111111
AC TC ST EC ID
  • 如果AC为负数,说明没有足够的活动工作线程。
  • 如果TC为负数,说明工作线程数量没达到最大工作线程数量。
  • 如果ID为负数,说明至少有一个等待的工作线程。
  • 如果(int)ctl为负数,说明ForkJoinPool正在关闭。

Fork-Join源码分析

这篇源码分析基于JDK7。

Fork

通过分析一个Fork-Join任务的执行过程来分析Fork-Join的相关代码,主要侧重于分裂(Fork)/合并(Join)过程。

SumTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class SumTask extends RecursiveTask<Long>{  
private static final int THRESHOLD = 10;

private long start;
private long end;

public SumTask(long n) {
this(1,n);
}

private SumTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
if((end - start) <= THRESHOLD){
for(long l = start; l <= end; l++){
sum += l;
}
}else{
long mid = (start + end) >>> 1;
SumTask left = new SumTask(start, mid);
SumTask right = new SumTask(mid + 1, end);
left.fork();
right.fork();
sum = left.join() + right.join();
}
return sum;
}
private static final long serialVersionUID = 1L;
}

ForkJoinTask#fork()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Arranges to asynchronously execute this task. While it is not
* necessarily enforced, it is a usage error to fork a task more
* than once unless it has completed and been reinitialized.
* Subsequent modifications to the state of this task or any data
* it operates on are not necessarily consistently observable by
* any thread other than the one executing it unless preceded by a
* call to {@link #join} or related methods, or a call to {@link
* #isDone} returning {@code true}.
*
* <p>This method may be invoked only from within {@code
* ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
*
* @return {@code this}, to simplify usage
*/
public final ForkJoinTask<V> fork() {
((ForkJoinWorkerThread) Thread.currentThread())
.pushTask(this);
return this;
}

ForkJoinWorkerThread#pushTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Pushes a task. Call only from this thread.
*
* @param t the task. Caller must ensure non-null.
*/
final void pushTask(ForkJoinTask<?> t) {
ForkJoinTask<?>[] q; int s, m;
if ((q = queue) != null) { // ignore if queue removed
// 1
long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
// 2
UNSAFE.putOrderedObject(q, u, t);
// 3
queueTop = s + 1; // or use putOrderedInt
if ((s -= queueBase) <= 2)
pool.signalWork();
else if (s == m)
// 4
growQueue();
}
}

标注代码分析

  1. 这里首先根据当前的queueTop对队列(数组)长度取模来算出放置任务的下标,然后再通过下标算出偏移地址,提供给Unsafe使用。
  2. 设置任务。
  3. 修改queueTop
  4. 如果队列满了,扩展一下队列容量。

    ForkJoinWorkerThread#growQueue()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    /**
    * Creates or doubles queue array. Transfers elements by
    * emulating steals (deqs) from old array and placing, oldest
    * first, into new array.
    */
    private void growQueue() {
    ForkJoinTask<?>[] oldQ = queue;
    int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
    // 1
    if (size > MAXIMUM_QUEUE_CAPACITY)
    throw new RejectedExecutionException("Queue capacity exceeded");
    // 2
    if (size < INITIAL_QUEUE_CAPACITY)
    size = INITIAL_QUEUE_CAPACITY;
    ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
    int mask = size - 1;
    int top = queueTop;
    int oldMask;
    if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
    for (int b = queueBase; b != top; ++b) {
    long u = ((b & oldMask) << ASHIFT) + ABASE;
    Object x = UNSAFE.getObjectVolatile(oldQ, u);
    if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
    UNSAFE.putObjectVolatile
    (q, ((b & mask) << ASHIFT) + ABASE, x);
    }
    }
    }

标注代码分析

  1. 容量为原来的2倍,不超过MAXIMUM_QUEUE_CAPACITY(1 << 24)
  2. 最小为INITIAL_QUEUE_CAPACITY(1 << 13)

JDK1.7 Proxy

Proxy 1.7

以下源码分析取核心代码

1 变量定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
   /** prefix for all proxy class names */
// #1
private final static String proxyClassNamePrefix = "$Proxy";

/** parameter types of a proxy class constructor */
private final static Class[] constructorParams =
{ InvocationHandler.class };

/** maps a class loader to the proxy class cache for that loader */
// #2
private static Map loaderToCache = new WeakHashMap();

/** marks that a particular proxy class is currently being generated */
// #3
private static Object pendingGenerationMarker = new Object();

/** next number to use for generation of unique proxy class names */
private static long nextUniqueNumber = 0;
private static Object nextUniqueNumberLock = new Object();

/** set of all generated proxy classes, for isProxyClass implementation */
// #4
private static Map proxyClasses = Collections.synchronizedMap(new WeakHashMap());

标注代码分析:

  1. 定义代理类名字前缀。
  2. 定义弱引用,以ClassLoader loader做为key,定义一个class load 缓存。
  3. 为了生成代理类,初始化的cache时候,一直存在的“临时”对象。
  4. proxyClass的缓存,key存储proxy class,value存储null,用于判断是否是代理类。

    2 newProxyInstance

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
     public static Object newProxyInstance(ClassLoader loader,
    Class<?>[] interfaces,
    InvocationHandler h)
    .....

    /*
    5. Look up or generate the designated proxy class.
    */
    // #1
    Class cl = getProxyClass(loader, interfaces);

    /*
    6. Invoke its constructor with the designated invocation handler.
    */
    try {
    // #2
    Constructor cons = cl.getConstructor(constructorParams);
    return (Object) cons.newInstance(new Object[] { h });
    } catch (NoSuchMethodException e) {
    throw new InternalError(e.toString());
    } catch (IllegalAccessException e) {
    throw new InternalError(e.toString());
    } catch (InstantiationException e) {
    throw new InternalError(e.toString());
    } catch (InvocationTargetException e) {
    throw new InternalError(e.toString());
    }
    }
  5. 生成proxy class的$Proxy0

  6. 生成$Proxy0的构造函数。
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×