JDK1.6 Executors

介绍

Executors是JUC包提供的一个工具性质的帮助类,它针对ExecutorService、ScheduledExecutorService、ThreadFactory和Callable提供了一系列工厂方法和工具方法。

源码解析

Executors#newFixedThreadPool()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* <tt>nThreads</tt> threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if <tt>nThreads &lt;= 0</tt>
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
  • 这个方法创建了一个核心线程数量和最大线程数量一致的,并且任务队列是无界队列的线程池。
  • 由于默认核心线程不会超时,所以超时相关的参数也没有意义。
  • 如果在线程关闭之前,一个工作线程由于某种原因挂了,那么线程池会自动补上一个新的工作线程。

JDK1.6 ExecutorCompletionService

介绍

ExecutorCompletionService用于执行一批任务,然后按照任务执行完成的顺序来获取任务结果。可以在获取到了若干个执行结果后,把其他的任务取消掉(ThreadPoolExecutor中的invokeAny就是通过这货实现的)。
比如这样的场景:你的业务需要调用10个接口来获取一些信息,业务规定只需要其中任意2个接口的信息,那么就可以使用ExecutorCompletionService,获取前两个成功完成的任务结果,然后将其他的任务取消。

源码分析

ExecutorCompletionService实现CompletionService接口。

CompletionService()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public interface CompletionService<V> {
/**
* Submits a value-returning task for execution and returns a Future
* representing the pending results of the task. Upon completion,
* this task may be taken or polled.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
// 1
Future<V> submit(Callable<V> task);

/**
* Submits a Runnable task for execution and returns a Future
* representing that task. Upon completion, this task may be
* taken or polled.
*
* @param task the task to submit
* @param result the result to return upon successful completion
* @return a Future representing pending completion of the task,
* and whose <tt>get()</tt> method will return the given
* result value upon completion
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
// 2
Future<V> submit(Runnable task, V result);

/**
* Retrieves and removes the Future representing the next
* completed task, waiting if none are yet present.
*
* @return the Future representing the next completed task
* @throws InterruptedException if interrupted while waiting
*/
// 3
Future<V> take() throws InterruptedException;


/**
* Retrieves and removes the Future representing the next
* completed task or <tt>null</tt> if none are present.
*
* @return the Future representing the next completed task, or
* <tt>null</tt> if none are present
*/
// 4
Future<V> poll();

/**
* Retrieves and removes the Future representing the next
* completed task, waiting if necessary up to the specified wait
* time if none are yet present.
*
* @param timeout how long to wait before giving up, in units of
* <tt>unit</tt>
* @param unit a <tt>TimeUnit</tt> determining how to interpret the
* <tt>timeout</tt> parameter
* @return the Future representing the next completed task or
* <tt>null</tt> if the specified waiting time elapses
* before one is present
* @throws InterruptedException if interrupted while waiting
*/
// 5
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

标注代码分析

  1. 提交一个有返回值的任务。
  2. 提交一个Runnable和一个返回值。
  3. 获取并移除下一个完成的任务,如果当前没有任务完成,阻塞等待。
  4. 获取并移除下一个完成的任务,如果当前没有任务完成,返回null。
  5. 获取并移除下一个完成的任务,如果当前没有任务完成,阻塞等待,如果在超时前仍然没有任务完成,返回null。

JDK1.6 CyclicBarrier

介绍

  1. CyclicBarrier是一种可重复使用的栅栏机制,可以让一组线程在某个点上相互等待,这个点就可以类比为栅栏。并且这个栅栏是可重复使用的,这点可以和前面分析过的CountDownLatch做对比,CountDownLatch只能用一次。
  2. CyclicBarrier还支持在所有线程到达栅栏之后,在所有线程从等待状态转到可运行状态之前,执行一个命令(或者说是动作)。当然,在某些情况下,栅栏可以被打破。比如某个线程无法在规定的时间内到达栅栏。

JDK1.6 CountDownLatch

介绍CountDownLatch

一种锁,称为闭锁。可以让一个或多个线程等待另外一个或多个线程执行完毕后再执行。
CountDownLatch是基于AQS构建,使用共享模式。
CountDownLatch中提供一个count值来表示要等待的(其他任务)完成次数,常规用法有两种:Count(1)和Count(N)。举个栗子,百米赛跑,N个选手,每个选手可以看成是一个线程。起跑前,选手准备(线程启动,然后在Count(1)上阻塞),当发令枪响后(相当于Count(1)闭锁释放),选手一起起跑(相当于线程通过Count(1)继续执行),当所有选手都通过终点(相当于Count(N)闭锁释放),然后再统计成绩。

JDK1.6 ConcurrentSkipListSet

ConcurrentSkipListSet基于ConcurrentSkipListMap实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ConcurrentSkipListSet<E>
extends AbstractSet<E>
implements NavigableSet<E>, Cloneable, java.io.Serializable {

private static final long serialVersionUID = -2479143111061671589L;

/**
* The underlying map. Uses Boolean.TRUE as value for each
* element. This field is declared final for the sake of thread
* safety, which entails some ugliness in clone()
*/
private final ConcurrentNavigableMap<E,Object> m;

/**
* Constructs a new, empty set that orders its elements according to
* their {@linkplain Comparable natural ordering}.
*/
public ConcurrentSkipListSet() {
m = new ConcurrentSkipListMap<E,Object>();
}
...

JDK1.6 ConcurrentSkipListMap

介绍

ConcurrentSkipListMap是一种线程安全的有序的Map。一般我们使用有序Map,不要求线程安全的情况下,可以使用TreeMap,要求线程安全的话,就可以使用ConcurrentSkipListMap。
ConcurrentSkipListMap内部的数据结构是SkipList(跳表),内部Entry顺序是由实现了Comparable的key或者构造时指定的Comparator来保证。和TreeMap一样,对ConcurrentSkipListMap中元素的put()、get()和remove()等操作的平均时间复杂度也是O(log(n))。

源码分析

数据结构

源码中也提供图形化的注释。
有3中节点:Head节点、Index节点和普通的Node节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
* Head nodes          Index nodes
* +-+ right +-+ +-+
* |2|---------------->| |--------------------->| |->null
* +-+ +-+ +-+
* | down | |
* v v v
* +-+ +-+ +-+ +-+ +-+ +-+
* |1|----------->| |->| |------>| |----------->| |------>| |->null
* +-+ +-+ +-+ +-+ +-+ +-+
* v | | | | |
* Nodes next v v v v v
* +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+
* | |->|A|->|B|->|C|->|D|->|E|->|F|->|G|->|H|->|I|->|J|->|K|->null
* +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+

ConcurrentSkipListMap#Node(K key, Object value, Node<K,V> next)

1
2
3
4
5
6
7
8
/**
* Creates a new regular node.
*/
Node(K key, Object value, Node<K,V> next) {
this.key = key;
this.value = value;
this.next = next;
}

创建一个普通节点。

ConcurrentSkipListMap#Node(Node<K,V> next)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
        /**
* Creates a new marker node. A marker is distinguished by
* having its value field point to itself. Marker nodes also
* have null keys, a fact that is exploited in a few places,
* but this doesn't distinguish markers from the base-level
* header node (head.node), which also has a null key.
*/
Node(Node<K,V> next) {
this.key = null;
this.value = this;
this.next = next;
}
```
创建一个标记节点。key==null无法区分标记节点和base-level链表头节点,因为base-level链表头节点的key也是null

<!-- more -->
## ConcurrentSkipListMap#Node#原子更新
指定的volatile引用字段进行原子更新。
``` java
/** Updater for casNext */
static final AtomicReferenceFieldUpdater<Node, Node>
nextUpdater = AtomicReferenceFieldUpdater.newUpdater
(Node.class, Node.class, "next");

/** Updater for casValue */
static final AtomicReferenceFieldUpdater<Node, Object>
valueUpdater = AtomicReferenceFieldUpdater.newUpdater
(Node.class, Object.class, "value");

nextUpdater和valueUpdater分别对next、value进行更新

ConcurrentSkipListMap#Node#helpDelete()

1
2
3
4
5
6
7
8
9
void helpDelete(Node<K,V> b, Node<K,V> f) {
if (f == next && this == b.next) {
// 1
if (f == null || f.value != f) // not already marked
appendMarker(f);
else // 2
b.casNext(this, f.next);
}
}

标注代码分析

  1. 后继节点f是null,新增一个节点标记(marker)。
  2. 删除节点b与f.next之间的节点,b#next指向f#next。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
         +------+       +------+      +------+
    ... | b |------>| n |----->| f | ...
    +------+ +------+ +------+

    +------+ +------+ +------+ +------+
    ... | b |------>| n |----->|marker|------>| f | ...
    +------+ +------+ +------+ +------+

    +------+ +------+
    ... | b |----------------------------------->| f | ...
    +------+ +------+
  3. 当前节点为n,n的前驱节点为b,n的后继节点为f。

  4. 现在要删除节点n,那么首先要对n进行标记。要删除节点n,首先是往节点n后面追加一个标记节点。
  5. 直接将节点n和后面的标记节点一起删除。

ConcurrentSkipListMap#Index属性

Index节点表示跳表的层级。

1
2
3
4
5
6
7
8
9
10
11
12
// 1
final Node<K,V> node;
// 2
final Index<K,V> down;
// 3
volatile Index<K,V> right;

/** Updater for casRight */
// 4
static final AtomicReferenceFieldUpdater<Index, Index>
rightUpdater = AtomicReferenceFieldUpdater.newUpdater
(Index.class, Index.class, "right");

标注代码分析

  1. 索引指向的节点, 纵向上所有索引指向链表最下面的节点。
  2. 下level层的Index。
  3. 右边的Index。
  4. 原子更新right属性。
1
2
3
4
5
final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
Node<K,V> n = node;
newSucc.right = succ;
return n.value != null && casRight(succ, newSucc);
}

在index本身和succ之间插入一个新的节点newSucc。

1
2
3
final boolean unlink(Index<K,V> succ) {
return !indexesDeletedNode() && casRight(succ, succ.right);
}

将当前的节点index设置其的right为 succ.right等于删除succ节点。

ConcurrentSkipListMap#HeadIndex

1
2
3
4
5
6
7
8
9
10
/**
* Nodes heading each level keep track of their level.
*/
static final class HeadIndex<K,V> extends Index<K,V> {
final int level;
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}

头索引节点,只是增加level属性用来标示索引层级。

ConcurrentSkipListMap属性

1
2
3
4
5
6
7
8
9
10
11
/**
* Special value used to identify base-level header
*/
// 1
private static final Object BASE_HEADER = new Object();

/**
* The topmost head index of the skiplist.
*/
// 2
private transient volatile HeadIndex<K,V> head;

标注代码分析

  1. 用来定义base-level的头结点。
  2. 跳表最高层的head index。

ConcurrentSkipListMap构造方法

1
2
3
4
5
6
7
8
/**
* Constructs a new, empty map, sorted according to the
* {@linkplain Comparable natural ordering} of the keys.
*/
public ConcurrentSkipListMap() {
this.comparator = null;
initialize();
}

keys排序,并且调用ConcurrentSkipListMap#initialize()。

ConcurrentSkipListMap#initialize()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 /**
* Initializes or resets state. Needed by constructors, clone,
* clear, readObject. and ConcurrentSkipListSet.clone.
* (Note that comparator must be separately initialized.)
*/
final void initialize() {
keySet = null;
entrySet = null;
values = null;
descendingMap = null;
randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
// 1
head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
null, null, 1);
}

标注代码分析

  1. 生成头节点,该节点value是BASE_HEADER,level是1。

ConcurrentSkipListMap#put()

1
2
3
4
5
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
return doPut(key, value, false);
}

ConcurrentSkipListMap#doPut

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private V doPut(K kkey, V value, boolean onlyIfAbsent) {
// 1
Comparable<? super K> key = comparable(kkey);
for (;;) {
// 2
Node<K,V> b = findPredecessor(key);
Node<K,V> n = b.next;
for (;;) {
if (n != null) {
// 3
Node<K,V> f = n.next;
// 4
if (n != b.next) // inconsistent read
break;;
Object v = n.value;
// 5
if (v == null) { // n is deleted
n.helpDelete(b, f);
break;
}
// 6
if (v == n || b.value == null) // b is deleted
break;
int c = key.compareTo(n.key);
// 7
if (c > 0) {
b = n;
n = f;
continue;
}
if (c == 0) {
// 8
if (onlyIfAbsent || n.casValue(v, value))
return (V)v;
else
break; // restart if lost race to replace value
}
// else c < 0; fall through
}
// 9
Node<K,V> z = new Node<K,V>(kkey, value, n);
if (!b.casNext(n, z))
break; // restart if lost race to append to b
int level = randomLevel();
// 10
if (level > 0)
insertIndex(z, level);
return null;
}
}
}

标注代码分析

  1. 以key为参数生成Comparable接口的子类,用于比较key,确定前驱节点和后续节点位置。
  2. 通过key找到要插入位置的前驱节点。key的前驱节点是b,b的后续节点是n,key的Node节点是n(key会和n#key做判断,才能确定是不是节点n)。
  3. n#next值是f,f是n的后续节点。
  4. b#next != n 可能是put冲突,造成读不一样。结束本次put()。
  5. 节点n被删除,删除节点b和节点f之间的节点。
  6. 节点b被删除,无法找到前驱节点,退出本次循环。
  7. n#key和key比较,c > 0当前的节点应该排在n的后面,节点b和节点n重新赋值,后移1个节点。
  8. c = 0,key的节点是n,CAS进行替换value,如果CAS失败结束本次循环。
  9. 新建1个节点z,b#next和n比较,如果相等,b#next=z。
  10. 插入成功后,随机生成一个层级。

ConcurrentSkipListMap#comparable()

1
2
3
4
5
6
7
8
private Comparable<? super K> comparable(Object key) throws ClassCastException {
if (key == null)
throw new NullPointerException();
if (comparator != null)
return new ComparableUsingComparator<K>((K)key, comparator);
else
return (Comparable<? super K>)key;
}

如果指定Comparator,就使用Comparator;没指定Comparator,就使用Key的自然序(Key需要实现Comparable接口)。

ConcurrentSkipListMap#addIndex()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* Adds given index nodes from given level down to 1.
* @param idx the topmost index node being inserted
* @param h the value of head to use to insert. This must be
* snapshotted by callers to provide correct insertion level
* @param indexLevel the level of the index
*/
private void addIndex(Index<K,V> idx, HeadIndex<K,V> h, int indexLevel) {
// Track next level to insert in case of retries
int insertionLevel = indexLevel;
Comparable<? super K> key = comparable(idx.node.key);
if (key == null) throw new NullPointerException();

// Similar to findPredecessor, but adding index nodes along
// path to key.
for (;;) {
int j = h.level;
Index<K,V> q = h;
Index<K,V> r = q.right;
Index<K,V> t = idx;
for (;;) {
if (r != null) {
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = key.compareTo(n.key);
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
if (c > 0) {
q = r;
r = r.right;
continue;
}
}

if (j == insertionLevel) {
// Don't insert index if node already deleted
if (t.indexesDeletedNode()) {
findNode(key); // cleans up
return;
}
if (!q.link(r, t))
break; // restart
if (--insertionLevel == 0) {
// need final deletion check before return
if (t.indexesDeletedNode())
findNode(key);
return;
}
}

if (--j >= insertionLevel && j < indexLevel)
t = t.down;
q = q.down;
r = q.right;
}
}
}

ConcurrentSkipListMap#findPredecessor()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Returns a base-level node with key strictly less than given key,
* or the base-level header if there is no such node. Also
* unlinks indexes to deleted nodes found along the way. Callers
* rely on this side-effect of clearing indices to deleted nodes.
* @param key the key
* @return a predecessor of key
*/
private Node<K,V> findPredecessor(Comparable<? super K> key) {
if (key == null)
throw new NullPointerException(); // don't postpone errors
for (;;) {
Index<K,V> q = head;
Index<K,V> r = q.right;
for (;;) {
if (r != null) {
Node<K,V> n = r.node;
K k = n.key;
if (n.value == null) {
if (!q.unlink(r))
break; // restart
r = q.right; // reread r
continue;
}
if (key.compareTo(k) > 0) {
q = r;
r = r.right;
continue;
}
}
Index<K,V> d = q.down;
if (d != null) {
q = d;
r = d.right;
} else
return q.node;
}
}
}

从最高层的头节点开始找,给定key大于当前节点,就往右找,否则就往下找,一直找到最底层链上的节点,这个节点就是给定key在base_level上的前驱节点。

ConcurrentSkipListMap#randomLevel()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Returns a random level for inserting a new node.
* Hardwired to k=1, p=0.5, max 31 (see above and
* Pugh's "Skip List Cookbook", sec 3.4).
*
* This uses the simplest of the generators described in George
* Marsaglia's "Xorshift RNGs" paper. This is not a high-quality
* generator but is acceptable here.
*/
private int randomLevel() {
int x = randomSeed;
x ^= x << 13;
x ^= x >>> 17;
randomSeed = x ^= x << 5;
if ((x & 0x8001) != 0) // test highest and lowest bits
return 0;
int level = 1;
while (((x >>>= 1) & 1) != 0) ++level;
return level;
}

50%的几率返回0,25%的几率返回1,12.5%的几率返回2,最大返回31。

ConcurrentSkipListMap#insertIndex()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* Creates and adds index nodes for the given node.
* @param z the node
* @param level the level of the index
*/
private void insertIndex(Node<K,V> z, int level) {
HeadIndex<K,V> h = head;
int max = h.level;

if (level <= max) {
Index<K,V> idx = null;
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);
addIndex(idx, h, level);

} else { // Add a new level
/*
* To reduce interference by other threads checking for
* empty levels in tryReduceLevel, new levels are added
* with initialized right pointers. Which in turn requires
* keeping levels in an array to access them while
* creating new head index nodes from the opposite
* direction.
*/
level = max + 1;
Index<K,V>[] idxs = (Index<K,V>[])new Index[level+1];
Index<K,V> idx = null;
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);

HeadIndex<K,V> oldh;
int k;
for (;;) {
oldh = head;
int oldLevel = oldh.level;
if (level <= oldLevel) { // lost race to add level
k = level;
break;
}
HeadIndex<K,V> newh = oldh;
Node<K,V> oldbase = oldh.node;
for (int j = oldLevel+1; j <= level; ++j)
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
if (casHead(oldh, newh)) {
k = oldLevel;
break;
}
}
addIndex(idxs[k], oldh, k);
}
}

ConcurrentSkipListMap#addIndex()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* Adds given index nodes from given level down to 1.
* @param idx the topmost index node being inserted
* @param h the value of head to use to insert. This must be
* snapshotted by callers to provide correct insertion level
* @param indexLevel the level of the index
*/
private void addIndex(Index<K,V> idx, HeadIndex<K,V> h, int indexLevel) {
// Track next level to insert in case of retries
int insertionLevel = indexLevel;
Comparable<? super K> key = comparable(idx.node.key);
if (key == null) throw new NullPointerException();

// Similar to findPredecessor, but adding index nodes along
// path to key.
for (;;) {
int j = h.level;
Index<K,V> q = h;
Index<K,V> r = q.right;
Index<K,V> t = idx;
for (;;) {
if (r != null) {
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = key.compareTo(n.key);
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
if (c > 0) {
q = r;
r = r.right;
continue;
}
}

if (j == insertionLevel) {
// Don't insert index if node already deleted
if (t.indexesDeletedNode()) {
findNode(key); // cleans up
return;
}
if (!q.link(r, t))
break; // restart
if (--insertionLevel == 0) {
// need final deletion check before return
if (t.indexesDeletedNode())
findNode(key);
return;
}
}

if (--j >= insertionLevel && j < indexLevel)
t = t.down;
q = q.down;
r = q.right;
}
}
}

给定的节点和level值,将之前建立的从上到下的Index节点链接进来。

检测当前的idx节点有没有被删除,如果有,要调用一个ConcurrentSkipListMap#findNode()来做调整。

ConcurrentSkipListMap#findNode()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private Node<K,V> findNode(Comparable<? super K> key) {
for (;;) {
Node<K,V> b = findPredecessor(key);
Node<K,V> n = b.next;
for (;;) {
if (n == null)
return null;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
Object v = n.value;
if (v == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (v == n || b.value == null) // b is deleted
break;
int c = key.compareTo(n.key);
if (c == 0)
return n;
if (c < 0)
return null;
b = n;
n = f;
}
}
}

put()总结

  1. 首先要根据给定的key找出在base_level链表中对应的前驱节点(从结构图的左上角往右或往下一路找过来),注意put方法使用的log(n)时间主要体现在这个过程,这个查找过程中会顺便帮忙推进一些节点的删除。
  2. 找到前驱节点后,然后从这个前驱节点往后找到要插入的位置(注意当前已经在base_level上,所以只需要往后找),这个查找过程中也会顺便帮忙推进一些节点的删除。
  3. 找到了要插入的位置,尝试插入,如果竞争导致插入失败,返回到第1步重试;如果插入成功,接下来会随机生成一个level,如果这个level大于0,需要将插入的节点在垂直线上生成level(level<=maxLevel + 1)个Index节点。

ConcurrentSkipListMap#get()

1
2
3
public V get(Object key) {
return doGet(key);
}

ConcurrentSkipListMap#doGet()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private V doGet(Object okey) {
Comparable<? super K> key = comparable(okey);
Node<K,V> bound = null;
Index<K,V> q = head;
Index<K,V> r = q.right;
Node<K,V> n;
K k;
int c;
for (;;) {
Index<K,V> d;
// Traverse rights
if (r != null && (n = r.node) != bound && (k = n.key) != null) {
if ((c = key.compareTo(k)) > 0) {
q = r;
r = r.right;
continue;
} else if (c == 0) {
Object v = n.value;
return (v != null)? (V)v : getUsingFindNode(key);
} else
bound = n;
}

// Traverse down
if ((d = q.down) != null) {
q = d;
r = d.right;
} else
break;
}

// Traverse nexts
for (n = q.node.next; n != null; n = n.next) {
if ((k = n.key) != null) {
if ((c = key.compareTo(k)) == 0) {
Object v = n.value;
return (v != null)? (V)v : getUsingFindNode(key);
} else if (c < 0)
break;
}
}
return null;
}

从最左最高的头节点开始往右或者往下(通过key的比较)遍历,一直找到base_level,然后往后一直找到给定节点,找不到的话返回null。
代码中还会看到,如果找到了节点,还会判断节点上的value是否为null。如果不为null,直接返回这个value;如果为null,说明这个节点被删除了(正在删除过程中),那么需要调用一个ConcurrentSkipListMap#getUsingFindNode()。

ConcurrentSkipListMap#getUsingFindNode()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private V getUsingFindNode(Comparable<? super K> key) {
/*
* Loop needed here and elsewhere in case value field goes
* null just as it is about to be returned, in which case we
* lost a race with a deletion, so must retry.
*/
for (;;) {
Node<K,V> n = findNode(key);
if (n == null)
return null;
Object v = n.value;
if (v != null)
return (V)v;
}
}

调用前面分析过的findNode方法来查找一个key对应的Node,注意方法中外侧还是包了一层无限循环,为的是避免由于竞争导致findNode方法返回的Node又是一个被删除的节点。

ConcurrentSkipListMap#containsKey()

1
2
3
public boolean containsKey(Object key) {
return doGet(key) != null;
}

ConcurrentSkipListMap#doGet()来实现。

ConcurrentSkipListMap#remove()

1
2
3
public V remove(Object key) {
return doRemove(key, null);
}

ConcurrentSkipListMap#doRemove()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
final V doRemove(Object okey, Object value) {
Comparable<? super K> key = comparable(okey);
for (;;) {
Node<K,V> b = findPredecessor(key);
Node<K,V> n = b.next;
for (;;) {
if (n == null)
return null;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
Object v = n.value;
if (v == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (v == n || b.value == null) // b is deleted
break;
int c = key.compareTo(n.key);
if (c < 0)
return null;
if (c > 0) {
b = n;
n = f;
continue;
}
if (value != null && !value.equals(v))
return null;
if (!n.casValue(v, null))
break;
if (!n.appendMarker(f) || !b.casNext(n, f))
findNode(key); // Retry via findNode
else {
findPredecessor(key); // Clean index
if (head.right == null)
tryReduceLevel();
}
return (V)v;
}
}
}

ConcurrentSkipListMap#tryReduceLevel()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void tryReduceLevel() {
HeadIndex<K,V> h = head;
HeadIndex<K,V> d;
HeadIndex<K,V> e;
if (h.level > 3 &&
(d = (HeadIndex<K,V>)h.down) != null &&
(e = (HeadIndex<K,V>)d.down) != null &&
e.right == null &&
d.right == null &&
h.right == null &&
casHead(h, d) && // try to set
h.right != null) // recheck
casHead(d, h); // try to backout
}

如果最高的前三个HeadIndex都为null(当前看起来),那么就将level减少1层,其实就是将h(当前的head)设置为d(head的下一层),设置完成之后,还有检测h(之前的head)的right是否为null(因为可能刚才由于竞争的原因,导致看到h的right为null),如果这会儿又不是null,那么还得回退回来,再次将head设置为h。

ConcurrentSkipListMap#containsValue()

1
2
3
4
5
6
7
8
9
10
public boolean containsValue(Object value) {
if (value == null)
throw new NullPointerException();
for (Node<K,V> n = findFirst(); n != null; n = n.next) {
V v = n.getValidValue();
if (v != null && value.equals(v))
return true;
}
return false;
}

base_level链的第一个节点,然后一直往后找,比较value值。

ConcurrentSkipListMap#findFirst()

1
2
3
4
5
6
7
8
9
10
11
Node<K,V> findFirst() {
for (;;) {
Node<K,V> b = head.node;
Node<K,V> n = b.next;
if (n == null)
return null;
if (n.value != null)
return n;
n.helpDelete(b, n.next);
}
}

就是找到head中node(BASE_HEADER节点)的next,有可能next节点被删除了,所以会做检测,删除的话,推进一下删除,然后继续获取。ConcurrentSkipListMap#size()和ConcurrentSkipListMap#isEmpty()也是基于这个方法实现的。

ConcurrentSkipListMap#size()

1
2
3
4
5
6
7
8
public int size() {
long count = 0;
for (Node<K,V> n = findFirst(); n != null; n = n.next) {
if (n.getValidValue() != null)
++count;
}
return (count >= Integer.MAX_VALUE)? Integer.MAX_VALUE : (int)count;
}

ConcurrentSkipListMap#isEmpty()

1
2
3
public boolean isEmpty() {
return findFirst() == null;
}

ConcurrentSkipListMap实现ConcurrentMap接口。覆写putIfAbsent()、 remove()、replace()。
ConcurrentSkipListMap同样实现SortedMap。

ConcurrentSkipListMap#lastKey()

1
2
3
4
5
6
public K lastKey() {
Node<K,V> n = findLast();
if (n == null)
throw new NoSuchElementException();
return n.key;
}

ConcurrentSkipListMap#findLast()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Node<K,V> findLast() {
/*
* findPredecessor can't be used to traverse index level
* because this doesn't use comparisons. So traversals of
* both levels are folded together.
*/
Index<K,V> q = head;
for (;;) {
Index<K,V> d, r;
if ((r = q.right) != null) {
if (r.indexesDeletedNode()) {
q.unlink(r);
q = head; // restart
}
else
q = r;
} else if ((d = q.down) != null) {
q = d;
} else {
Node<K,V> b = q.node;
Node<K,V> n = b.next;
for (;;) {
if (n == null)
return (b.isBaseHeader())? null : b;
Node<K,V> f = n.next; // inconsistent read
if (n != b.next)
break;
Object v = n.value;
if (v == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (v == n || b.value == null) // b is deleted
break;
b = n;
n = f;
}
q = head; // restart
}
}
}

ConcurrentSkipListMap#lowerEntry()

1
2
3
public Map.Entry<K,V> lowerEntry(K key) {
return getNear(key, LT);
}

找到一个比给定key小的所有key里面最大的key对应的Entry,里面调用了getNear()。

ConcurrentSkipListMap#getNear()

1
2
3
4
5
6
7
8
9
10
AbstractMap.SimpleImmutableEntry<K,V> getNear(K key, int rel) {
for (;;) {
Node<K,V> n = findNear(key, rel);
if (n == null)
return null;
AbstractMap.SimpleImmutableEntry<K,V> e = n.createSnapshot();
if (e != null)
return e;
}
}

ConcurrentSkipListMap#findNear()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Node<K,V> findNear(K kkey, int rel) {
Comparable<? super K> key = comparable(kkey);
for (;;) {
Node<K,V> b = findPredecessor(key);
Node<K,V> n = b.next;
for (;;) {
if (n == null)
return ((rel & LT) == 0 || b.isBaseHeader())? null : b;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
Object v = n.value;
if (v == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (v == n || b.value == null) // b is deleted
break;
int c = key.compareTo(n.key);
if ((c == 0 && (rel & EQ) != 0) ||
(c < 0 && (rel & LT) == 0))
return n;
if ( c <= 0 && (rel & LT) != 0)
return (b.isBaseHeader())? null : b;
b = n;
n = f;
}
}
}
1
2
3
private static final int EQ = 1;
private static final int LT = 2;
private static final int GT = 0; // Actually checked as !LT

这个方法其实和findNode类似,都是从head开始先找到base_level的上给定key的前驱节点,然后再往后找。区别是这里传入关系参数-EQ、LT、GT(GT在代码里面没直接用,而是通过没有LT来判断),我们可以通过lowerEntry方法来分析,lowerEntry方法中间接调用的findNear方法传入的是LT,所以当在findNear方法中定位到目标节点n的时候,节点关系是这样的:[b->n->f],节点key和给定key的大小关系是这样的:[b<k<=n<f],所以代码会从findNear中的出口3(见注释)返回。

ConcurrentSkipListMap#floorEntry()

1
2
3
public Map.Entry<K,V> floorEntry(K key) {
return getNear(key, LT|EQ);
}

floorEntry方法中间接调用的findNear方法传入的是LT|EQ,所以当在findNear方法中定位到目标节点n的时候,节点关系是这样的:[b->n->f],节点key和给定key的大小关系是这样的:[b<k<=n<f],这里分两种情况:1.如果k<n,那么会从出口3退出,返回b;2.如果k=n,那么会从出口2退出(满足条件(c == 0 && (rel & EQ) != 0)),返回n。

ConcurrentSkipListMap#ceilingEntry()

1
2
3
public Map.Entry<K,V> ceilingEntry(K key) {
return getNear(key, GT|EQ);
}

ceilingEntry方法中间接调用的findNear方法传入的是GT|EQ,所以当在findNear方法中定位到目标节点n的时候,节点关系是这样的:[b->n->f],节点key和给定key的大小关系是这样的:[b<k<=n<f],这里分两种情况:1.如果k<n(k>b),那么会从出口2退出(满足条件(c < 0 && (rel & LT) == 0)),返回b;2.如果k=n,那么也会从出口2退出(满足条件(c == 0 && (rel & EQ) != 0)),返回n。

ConcurrentSkipListMap#higherEntry()

1
2
3
public Map.Entry<K,V> higherEntry(K key) {
return getNear(key, GT);
}

higherEntry方法中间接调用的findNear方法传入的是GT,所以当在findNear方法中定位到目标节点n的时候,节点关系是这样的:[b->n->f],节点key和给定key的大小关系是这样的:[b<k<=n<f],这里分两种情况:1.如果k<n,那么会从出口2退出(满足条件(c < 0 && (rel & LT) == 0)),返回b;2.如果k=n,那么会进入下一次循环,关系变成这样:[b<n=k<f],这时会从出口2退出(满足条件(c < 0 && (rel & LT) == 0)),这时返回的是f。
上面分析的所有方法,都会遇到在findNear中遇到n==null的可能,这时关系图如下[b<k-null],k一定大于b,所以只有传入LT,才可以返回b;否则都是null。而且如果b本身是BaseHead,也只能返回null。

JDK1.6 ConcurrentLinkedQueue

介绍

  • ConcurrentLinkedQueue是一种基于单向链表实现的无界的线程安全队列。队列中的元素遵循先入先出(FIFO)的规则。新元素插入到队列的尾部,从队列头部取出元素。
  • ConcurrentLinkedQueue内部采用一种wait-free(无等待)算法来实现。

    源码分析

    ConcurrentLinkedQueue

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
    implements Queue<E>, java.io.Serializable {
    private static final long serialVersionUID = 196745693267521676L;

    /*
    * ...略
    */

    private static class Node<E> {
    private volatile E item;
    private volatile Node<E> next;

    Node(E item) {
    // Piggyback on imminent casNext()
    lazySetItem(item);
    }

    E getItem() {
    return item;
    }

    boolean casItem(E cmp, E val) {
    return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void setItem(E val) {
    item = val;
    }

    void lazySetItem(E val) {
    UNSAFE.putOrderedObject(this, itemOffset, val);
    }

    void lazySetNext(Node<E> val) {
    UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    Node<E> getNext() {
    return next;
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe mechanics

    private static final sun.misc.Unsafe UNSAFE =
    sun.misc.Unsafe.getUnsafe();
    private static final long nextOffset =
    objectFieldOffset(UNSAFE, "next", Node.class);
    private static final long itemOffset =
    objectFieldOffset(UNSAFE, "item", Node.class);

    }
    /**
    * A node from which the first live (non-deleted) node (if any)
    * can be reached in O(1) time.
    * Invariants:
    * - all live nodes are reachable from head via succ()
    * - head != null
    * - (tmp = head).next != tmp || tmp != head
    * Non-invariants:
    * - head.item may or may not be null.
    * - it is permitted for tail to lag behind head, that is, for tail
    * to not be reachable from head!
    */
    // 1
    private transient volatile Node<E> head = new Node<E>(null);

    /**
    * A node from which the last node on list (that is, the unique
    * node with node.next == null) can be reached in O(1) time.
    * Invariants:
    * - the last node is always reachable from tail via succ()
    * - tail != null
    * Non-invariants:
    * - tail.item may or may not be null.
    * - it is permitted for tail to lag behind head, that is, for tail
    * to not be reachable from head!
    * - tail.next may or may not be self-pointing to tail.
    */
    // 2
    private transient volatile Node<E> tail = head;


    /**
    * Creates a {@code ConcurrentLinkedQueue} that is initially empty.
    */
    public ConcurrentLinkedQueue() {}

标注代码分析

  1. 节点head可以指向live nodes(live节点);head != null,head.item == null||head.item != null;节点head有可能无法指向节点tail。
  2. tail != null;节点tail是最后1个节点;节点head有可能无法指向节点tail;tail.next可能指向自己或者不指向自己。

JDK1.6 ConcurrentHashMap

简介

ConcurrentHashMap是一种线程安全的HashMap。相对于HashTable和Collections.synchronizedMap(),ConcurrentHashMap具有更好的性能和伸缩性,是由于其使用了分段锁的策略,将内部数据分为多个段,每个段单独加锁,而不是整个HashMap加锁,这样能减少很多不必要的锁争用。

源码分析

类结构图

ConcurrentMap#putIfAbsent()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* If the specified key is not already associated
* with a value, associate it with the given value.
* This is equivalent to
* <pre>
* if (!map.containsKey(key))
* return map.put(key, value);
* else
* return map.get(key);</pre>
* except that the action is performed atomically.
* ...
* ...
*/
V putIfAbsent(K key, V value);

如果map中已经存在给定的key,返回map中key对应的value;如果不存在给定的key,插入给定的key和value。
这个是一个原子操作,逻辑相当于如下代码。

1
2
3
4
if (!map.containsKey(key))
return map.put(key, value);
else
return map.get(key);

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×