- 介绍
- 源码分析
- ConcurrentLinkedQueue
- ConcurrentLinkedQueue#add()
- ConcurrentLinkedQueue#offer()
- ConcurrentLinkedQueue#succ()
- ConcurrentLinkedQueue#HOPS
- ConcurrentLinkedQueue#poll()
- ConcurrentLinkedQueue#updateHead()
- ConcurrentLinkedQueue#peek()
- ConcurrentLinkedQueue#first()
- ConcurrentLinkedQueue#isEmpty()
- ConcurrentLinkedQueue#size()
- 总结
介绍
- ConcurrentLinkedQueue是一种基于单向链表实现的无界的线程安全队列。队列中的元素遵循先入先出(FIFO)的规则。新元素插入到队列的尾部,从队列头部取出元素。
- ConcurrentLinkedQueue内部采用一种wait-free(无等待)算法来实现。
源码分析
ConcurrentLinkedQueue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
private static final long serialVersionUID = 196745693267521676L;
/*
* ...略
*/
private static class Node<E> {
private volatile E item;
private volatile Node<E> next;
Node(E item) {
// Piggyback on imminent casNext()
lazySetItem(item);
}
E getItem() {
return item;
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void setItem(E val) {
item = val;
}
void lazySetItem(E val) {
UNSAFE.putOrderedObject(this, itemOffset, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
Node<E> getNext() {
return next;
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE =
sun.misc.Unsafe.getUnsafe();
private static final long nextOffset =
objectFieldOffset(UNSAFE, "next", Node.class);
private static final long itemOffset =
objectFieldOffset(UNSAFE, "item", Node.class);
}
/**
* A node from which the first live (non-deleted) node (if any)
* can be reached in O(1) time.
* Invariants:
* - all live nodes are reachable from head via succ()
* - head != null
* - (tmp = head).next != tmp || tmp != head
* Non-invariants:
* - head.item may or may not be null.
* - it is permitted for tail to lag behind head, that is, for tail
* to not be reachable from head!
*/
// 1
private transient volatile Node<E> head = new Node<E>(null);
/**
* A node from which the last node on list (that is, the unique
* node with node.next == null) can be reached in O(1) time.
* Invariants:
* - the last node is always reachable from tail via succ()
* - tail != null
* Non-invariants:
* - tail.item may or may not be null.
* - it is permitted for tail to lag behind head, that is, for tail
* to not be reachable from head!
* - tail.next may or may not be self-pointing to tail.
*/
// 2
private transient volatile Node<E> tail = head;
/**
* Creates a {@code ConcurrentLinkedQueue} that is initially empty.
*/
public ConcurrentLinkedQueue() {}
标注代码分析
- 节点head可以指向live nodes(live节点);head != null,head.item == null||head.item != null;节点head有可能无法指向节点tail。
- tail != null;节点tail是最后1个节点;节点head有可能无法指向节点tail;tail.next可能指向自己或者不指向自己。
ConcurrentLinkedQueue#add()
1 | /** |
ConcurrentLinkedQueue#offer()
1 | /** |
标注代码分析
- 获取p的后继节点。(如果p的next指向自身,返回head节点)
- 如果自旋次数大于HOPS,且t不是尾节点,跳出2层循环重试。
- 如果自旋字数小于HOPS或者t是尾节点,将p指向next。
- 如果next为null,尝试将p的next节点设置为n,然后自旋。
- 如果设置成功且自旋次数大于HOPS,尝试将n设置为尾节点,失败也没关系。
- 如果第5步尝试将p的next节点设置为n失败,那么将p指向p的后继节点,然后自旋。
没有竞争
初始状态时head和tail都指向一个节点,这时来了一个新节点n1,代码走向上面注释第1行,得到的next为null,然后走向第5行,然后将p(也就是tail节点)的next节点设置为n1,成功返回。注意这里并没有调整tail指针,head和tail还是指向之前的节点。然后再来一个新节点n2,还是走向第1行,得到的next是n1,然后走向第8行,将p指向n1,自旋一次,又来到第1行,得到的next为null,然后走向第5行,将p(也就是n1)的next节点设置为n2。由于已经自旋一次,所以这时还会走向第6行,将tail指向n2。
有竞争
假设在注释第5行竞争失败,那么会走向第8行,将p向队尾推进一步,然后重试;如果重试了多次后(超过一次),在注释第一行获取的next仍然不为空,且同时尾节点也被其他线程推进了,那说明当前节点离尾节点太远了,跳出循环,重新定位尾节点然后再试,这样可以减少自旋次数。
ConcurrentLinkedQueue#succ()
1 | /** |
标注代码分析
- 如果p节点的next节点指向自身,那么返回head节点;否则返回p的next节点。
ConcurrentLinkedQueue#HOPS
1
2
3
4
5
6/**
* We don't bother to update head or tail pointers if fewer than
* HOPS links from "true" location. We assume that volatile
* writes are significantly more expensive than volatile reads.
*/
private static final int HOPS = 1;
节点head\tail离true的距离小于HOPS时,不会去更新节点head\tail。
ConcurrentLinkedQueue#poll()
1 | public E poll() { |
标注代码分析
- 获取p节点上的元素item。
- 如果item不为null,尝试将p的item设置为null。
- 如果自旋次数大于HOPS,尝试更新头节点。
- 获取p的后继节点。(如果p的next指向自身,那么返回head节点)
- 如果p的后继节点为null,尝试将p设置为头节点,然后跳出循环。
- 没有成功获取元素,返回null。
ConcurrentLinkedQueue#updateHead()
1 | /** |
标注代码分析
- CAS将head设值p,将h的next指向自身。
ConcurrentLinkedQueue#peek()
1 | public E peek() { |
peek()只获取元素,不移除节点。
ConcurrentLinkedQueue#first()
1 | /** |
peek()逻辑基本一致。但之所以没有利用first来实现peek,而是单独写peek,是因为可以减少一次volatile读(result)。
ConcurrentLinkedQueue#isEmpty()
1 | /** |
ConcurrentLinkedQueue#size()
1 | /** |
isEmpty()、size()、contains()、remove()基于first()实现。
总结
ConcurrentLinkedQueue#iterator()
1 | /** |
由注释可以知道ConcurrentLinkedQueue的迭代器是弱一致。
ConcurrentLinkedQueue内部在插入或者移除元素时,不会即时设置头尾节点,而是有一个缓冲期(一个节点长的距离),这样能减少一些CAS操作;其次在设置头尾节点的时候,也不会CAS-Loop直到成功,只尝试一次,失败也没关系,下一次操作或者其他线程在操作时会帮忙推进头尾节点(将头尾指针指向正确位置)。