JDK1.6 ConcurrentLinkedQueue

介绍

  • ConcurrentLinkedQueue是一种基于单向链表实现的无界的线程安全队列。队列中的元素遵循先入先出(FIFO)的规则。新元素插入到队列的尾部,从队列头部取出元素。
  • ConcurrentLinkedQueue内部采用一种wait-free(无等待)算法来实现。

    源码分析

    ConcurrentLinkedQueue

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
    implements Queue<E>, java.io.Serializable {
    private static final long serialVersionUID = 196745693267521676L;

    /*
    * ...略
    */

    private static class Node<E> {
    private volatile E item;
    private volatile Node<E> next;

    Node(E item) {
    // Piggyback on imminent casNext()
    lazySetItem(item);
    }

    E getItem() {
    return item;
    }

    boolean casItem(E cmp, E val) {
    return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void setItem(E val) {
    item = val;
    }

    void lazySetItem(E val) {
    UNSAFE.putOrderedObject(this, itemOffset, val);
    }

    void lazySetNext(Node<E> val) {
    UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    Node<E> getNext() {
    return next;
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe mechanics

    private static final sun.misc.Unsafe UNSAFE =
    sun.misc.Unsafe.getUnsafe();
    private static final long nextOffset =
    objectFieldOffset(UNSAFE, "next", Node.class);
    private static final long itemOffset =
    objectFieldOffset(UNSAFE, "item", Node.class);

    }
    /**
    * A node from which the first live (non-deleted) node (if any)
    * can be reached in O(1) time.
    * Invariants:
    * - all live nodes are reachable from head via succ()
    * - head != null
    * - (tmp = head).next != tmp || tmp != head
    * Non-invariants:
    * - head.item may or may not be null.
    * - it is permitted for tail to lag behind head, that is, for tail
    * to not be reachable from head!
    */
    // 1
    private transient volatile Node<E> head = new Node<E>(null);

    /**
    * A node from which the last node on list (that is, the unique
    * node with node.next == null) can be reached in O(1) time.
    * Invariants:
    * - the last node is always reachable from tail via succ()
    * - tail != null
    * Non-invariants:
    * - tail.item may or may not be null.
    * - it is permitted for tail to lag behind head, that is, for tail
    * to not be reachable from head!
    * - tail.next may or may not be self-pointing to tail.
    */
    // 2
    private transient volatile Node<E> tail = head;


    /**
    * Creates a {@code ConcurrentLinkedQueue} that is initially empty.
    */
    public ConcurrentLinkedQueue() {}

标注代码分析

  1. 节点head可以指向live nodes(live节点);head != null,head.item == null||head.item != null;节点head有可能无法指向节点tail。
  2. tail != null;节点tail是最后1个节点;节点head有可能无法指向节点tail;tail.next可能指向自己或者不指向自己。

ConcurrentLinkedQueue#add()

1
2
3
4
5
6
7
8
9
/**
* Inserts the specified element at the tail of this queue.
*
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
return offer(e);
}

ConcurrentLinkedQueue#offer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Inserts the specified element at the tail of this queue.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
Node<E> n = new Node<E>(e);
retry:
for (;;) {
Node<E> t = tail;
Node<E> p = t;
for (int hops = 0; ; hops++) {
// 1
Node<E> next = succ(p);
if (next != null) {
// 2
if (hops > HOPS && t != tail)
continue retry;
// 3
p = next;
} else if (p.casNext(null, n)) {// 4
// 5
if (hops >= HOPS)
casTail(t, n); // Failure is OK.
return true;
} else {// 6
p = succ(p);
}
}
}
}

标注代码分析

  1. 获取p的后继节点。(如果p的next指向自身,返回head节点)
  2. 如果自旋次数大于HOPS,且t不是尾节点,跳出2层循环重试。
  3. 如果自旋字数小于HOPS或者t是尾节点,将p指向next。
  4. 如果next为null,尝试将p的next节点设置为n,然后自旋。
  5. 如果设置成功且自旋次数大于HOPS,尝试将n设置为尾节点,失败也没关系。
  6. 如果第5步尝试将p的next节点设置为n失败,那么将p指向p的后继节点,然后自旋。

没有竞争

初始状态时head和tail都指向一个节点,这时来了一个新节点n1,代码走向上面注释第1行,得到的next为null,然后走向第5行,然后将p(也就是tail节点)的next节点设置为n1,成功返回。注意这里并没有调整tail指针,head和tail还是指向之前的节点。然后再来一个新节点n2,还是走向第1行,得到的next是n1,然后走向第8行,将p指向n1,自旋一次,又来到第1行,得到的next为null,然后走向第5行,将p(也就是n1)的next节点设置为n2。由于已经自旋一次,所以这时还会走向第6行,将tail指向n2。

有竞争

假设在注释第5行竞争失败,那么会走向第8行,将p向队尾推进一步,然后重试;如果重试了多次后(超过一次),在注释第一行获取的next仍然不为空,且同时尾节点也被其他线程推进了,那说明当前节点离尾节点太远了,跳出循环,重新定位尾节点然后再试,这样可以减少自旋次数。

ConcurrentLinkedQueue#succ()

1
2
3
4
5
6
7
8
9
10
/**
* Returns the successor of p, or the head node if p.next has been
* linked to self, which will only be true if traversing with a
* stale pointer that is now off the list.
*/
final Node<E> succ(Node<E> p) {
Node<E> next = p.getNext();
// 1
return (p == next) ? head : next;
}

标注代码分析

  1. 如果p节点的next节点指向自身,那么返回head节点;否则返回p的next节点。

    ConcurrentLinkedQueue#HOPS

    1
    2
    3
    4
    5
    6
    /**
    * We don't bother to update head or tail pointers if fewer than
    * HOPS links from "true" location. We assume that volatile
    * writes are significantly more expensive than volatile reads.
    */
    private static final int HOPS = 1;

节点head\tail离true的距离小于HOPS时,不会去更新节点head\tail。

ConcurrentLinkedQueue#poll()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public E poll() {
Node<E> h = head;
Node<E> p = h;
for (int hops = 0; ; hops++) {
// 1
E item = p.getItem();
// 2
if (item != null && p.casItem(item, null)) {
if (hops >= HOPS) {
Node<E> q = p.getNext();
// 3
updateHead(h, (q != null) ? q : p);
}
return item;
}
// 4
Node<E> next = succ(p);
// 5
if (next == null) {
updateHead(h, p);
break;
}
p = next;
}
// 6
return null;
}

标注代码分析

  1. 获取p节点上的元素item。
  2. 如果item不为null,尝试将p的item设置为null。
  3. 如果自旋次数大于HOPS,尝试更新头节点。
  4. 获取p的后继节点。(如果p的next指向自身,那么返回head节点)
  5. 如果p的后继节点为null,尝试将p设置为头节点,然后跳出循环。
  6. 没有成功获取元素,返回null。

ConcurrentLinkedQueue#updateHead()

1
2
3
4
5
6
7
8
9
/**
* Try to CAS head to p. If successful, repoint old head to itself
* as sentinel for succ(), below.
*/
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
// 1
h.lazySetNext(h);
}

标注代码分析

  1. CAS将head设值p,将h的next指向自身。

ConcurrentLinkedQueue#peek()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public E peek() {
Node<E> h = head;
Node<E> p = h;
E item;
for (;;) {
item = p.getItem();
if (item != null)
break;
Node<E> next = succ(p);
if (next == null) {
break;
}
p = next;
}
updateHead(h, p);
return item;
}

peek()只获取元素,不移除节点。

ConcurrentLinkedQueue#first()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Returns the first live (non-deleted) node on list, or null if none.
* This is yet another variant of poll/peek; here returning the
* first node, not element. We could make peek() a wrapper around
* first(), but that would cost an extra volatile read of item,
* and the need to add a retry loop to deal with the possibility
* of losing a race to a concurrent poll().
*/
Node<E> first() {
Node<E> h = head;
Node<E> p = h;
Node<E> result;
for (;;) {
E item = p.getItem();
if (item != null) {
result = p;
break;
}
Node<E> next = succ(p);
if (next == null) {
result = null;
break;
}
p = next;
}
updateHead(h, p);
return result;
}

peek()逻辑基本一致。但之所以没有利用first来实现peek,而是单独写peek,是因为可以减少一次volatile读(result)。

ConcurrentLinkedQueue#isEmpty()

1
2
3
4
5
6
7
8
/**
* Returns {@code true} if this queue contains no elements.
*
* @return {@code true} if this queue contains no elements
*/
public boolean isEmpty() {
return first() == null;
}

ConcurrentLinkedQueue#size()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Returns the number of elements in this queue. If this queue
* contains more than {@code Integer.MAX_VALUE} elements, returns
* {@code Integer.MAX_VALUE}.
*
* <p>Beware that, unlike in most collections, this method is
* <em>NOT</em> a constant-time operation. Because of the
* asynchronous nature of these queues, determining the current
* number of elements requires an O(n) traversal.
*
* @return the number of elements in this queue
*/
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p)) {
if (p.getItem() != null) {
// Collections.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
}
}
return count;
}

isEmpty()、size()、contains()、remove()基于first()实现。

总结

ConcurrentLinkedQueue#iterator()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Returns an iterator over the elements in this queue in proper sequence.
* The returned iterator is a "weakly consistent" iterator that
* will never throw {@link java.util.ConcurrentModificationException
* ConcurrentModificationException},
* and guarantees to traverse elements as they existed upon
* construction of the iterator, and may (but is not guaranteed to)
* reflect any modifications subsequent to construction.
*
* @return an iterator over the elements in this queue in proper sequence
*/
public Iterator<E> iterator() {
return new Itr();
}

由注释可以知道ConcurrentLinkedQueue的迭代器是弱一致。
ConcurrentLinkedQueue内部在插入或者移除元素时,不会即时设置头尾节点,而是有一个缓冲期(一个节点长的距离),这样能减少一些CAS操作;其次在设置头尾节点的时候,也不会CAS-Loop直到成功,只尝试一次,失败也没关系,下一次操作或者其他线程在操作时会帮忙推进头尾节点(将头尾指针指向正确位置)。

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×