JDK1.6 LinkedBlockingDeque

介绍

LinkedBlockingDeque是一种基于双向链表实现的有界的(可选的,不指定默认int最大值)阻塞双端队列。
双端队列一般适用于工作密取模式,即每个消费者都拥有自己的双端队列,如果某个消费者完成了自己队列的全部任务,可以到其他消费者双端队列尾部秘密获取任务来处理。

源码分析

LinkedBlockingDeque

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {

/*
* Implemented as a simple doubly-linked list protected by a
* single lock and using conditions to manage blocking.
*
* To implement weakly consistent iterators, it appears we need to
* keep all Nodes GC-reachable from a predecessor dequeued Node.
* That would cause two problems:
* - allow a rogue Iterator to cause unbounded memory retention
* - cause cross-generational linking of old Nodes to new Nodes if
* a Node was tenured while live, which generational GCs have a
* hard time dealing with, causing repeated major collections.
* However, only non-deleted Nodes need to be reachable from
* dequeued Nodes, and reachability does not necessarily have to
* be of the kind understood by the GC. We use the trick of
* linking a Node that has just been dequeued to itself. Such a
* self-link implicitly means to advance to head.
*/

/*
* We have "diamond" multiple interface/abstract class inheritance
* here, and that introduces ambiguities. Often we want the
* BlockingDeque javadoc combined with the AbstractQueue
* implementation, so a lot of method specs are duplicated here.
*/

private static final long serialVersionUID = -387911632671998426L;

/** Doubly-linked list node class */
static final class Node<E> {
/**
* The item, or null if this node has been removed.
*/
// 1
E item;

/**
* One of:
* - the real predecessor Node
* - this Node, meaning the predecessor is tail
* - null, meaning there is no predecessor
*/
// 2
Node<E> prev;

/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head
* - null, meaning there is no successor
*/
// 3
Node<E> next;
Node(E x, Node<E> p, Node<E> n) {
item = x;
prev = p;
next = n;
}
}

/** Pointer to first node */
// 4
transient Node<E> first;
/** Pointer to last node */
// 5
transient Node<E> last;
/** Number of items in the deque */
// 6
private transient int count;
/** Maximum number of items in the deque */
// 7
private final int capacity;
/** Main lock guarding all access */
// 8
final ReentrantLock lock = new ReentrantLock();
/** Condition for waiting takes */
// 9
private final Condition notEmpty = lock.newCondition();
/** Condition for waiting puts */
// 10
private final Condition notFull = lock.newCondition();

/**
* Creates a <tt>LinkedBlockingDeque</tt> with a capacity of
* {@link Integer#MAX_VALUE}.
*/
// 11
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}

/**
* Creates a <tt>LinkedBlockingDeque</tt> with the given (fixed) capacity.
*
* @param capacity the capacity of this deque
* @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
*/
public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}

标注代码分析

  1. 保存元素的域,如果为nul说明当前节点已被删除。
  2. 指向其前驱节点。如果指向自身,说明前面是队尾节点。如果为null,说明没有前驱节点。
  3. 指向其后继节点。如果指向自身,说明后面是队头节点。如果为null,说明没有后继节点。
  4. 指向队头节点。
  5. 指向队尾节点。
  6. 队列中元素数量。
  7. 队列最大容量。
  8. 队列中保护访问使用的锁。
  9. 获取元素的等待条件(队列非空)。
  10. 插入元素的等待条件(队列非满)。
  11. 不指定容量,默认为Integer.MAX_VALUE。

LinkedBlockingDeque#linkFirst()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Links e as first element, or returns false if full.
*/
private boolean linkFirst(E e) {
// assert lock.isHeldByCurrentThread();
// 1
if (count >= capacity)
return false;
Node<E> f = first;
// 2
Node<E> x = new Node<E>(e, null, f);
// 3
first = x;
if (last == null)
// 4
last = x;
else
// 5
f.prev = x;
// 6
++count;
// 7
notEmpty.signal();
return true;
}

标注代码分析

  1. 如果队列已满,返回false。
  2. 新建节点x,用来存放数据e,将e插入到队头节点前面。
  3. 然后将e设置为队头节点。
  4. 如果没有队尾节点,那么将x设置为队尾节点。
  5. 如果有队尾节点,那么将f的prev指向x,完成节点拼接。
  6. 累加当前元素计数。
  7. 有元素入队,唤醒在notEmpty上等待的获取元素的线程。

linkFirst()将一个元素插入到队头,并成为新的队头元素。

LinkedBlockingDeque#linkLast()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Links e as last element, or returns false if full.
*/
private boolean linkLast(E e) {
// assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false;
Node<E> l = last;
// 1
Node<E> x = new Node<E>(e, l, null);
last = x;
if (first == null)
// 2
first = x;
else
// 3
l.next = x;
// 4
++count;
// 5
notEmpty.signal();
return true;
}

标注代码分析

  1. 新建节点x,用来存放数据e,将e插入到队尾节点后面。
  2. 如果没有队头节点,那么将x设置为队头节点。
  3. 如果有队头节点,那么将l的next指向x,完成节点拼接。
  4. 累加当前元素计数。
  5. 有元素入队,唤醒在notEmpty上等待的获取元素的线程。

linkLast()将一个元素插入到队尾,并成为新的队尾元素。

LinkedBlockingDeque#unlinkFirst()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Removes and returns first element, or null if empty.
*/
private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
// 1
Node<E> f = first;
// 2
if (f == null)
return null;
// 3
Node<E> n = f.next;
// 4
E item = f.item;
// 5
f.item = null;
// 6
f.next = f; // help GC
// 7
first = n;
if (n == null)
// 8
last = null;
else
// 9
n.prev = null;
// 10
--count;
// 11
notFull.signal();
// 12
return item;
}

标注代码分析

  1. 获取队头节点f。
  2. 如果没有队头节点,返回null。
  3. 获取f的后继节点。
  4. 获取f的元素item。
  5. 将f的item域置空。
  6. 将f的next域指向自身,帮助GC。
  7. 将f的后继节点n设置为新的队头节点。
  8. 如果n为空,说明队列为空了,把队尾节点也置空一下。
  9. 如果n不为空,现在n是队头节点,需要将其prev域置空。
  10. 递减当前元素计数。
  11. 有元素出队了,唤醒在notFull上等待的插入元素的线程。
  12. 返回元素item。

unlinkFirst()就是将现有的队头节点移除,并将其后继节点设置为新的队头节点,并返回移除的队头节点中保存的元素。

LinkedBlockingDeque#unlinkLast()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Removes and returns last element, or null if empty.
*/
private E unlinkLast() {
// assert lock.isHeldByCurrentThread();
// 1
Node<E> l = last;
// 2
if (l == null)
return null;
// 3
Node<E> p = l.prev;
// 4
E item = l.item;
// 5
l.item = null;
// 6
l.prev = l; // help GC
// 7
last = p;
if (p == null)
// 8
first = null;
else
// 9
p.next = null;
// 10
--count;
// 11
notFull.signal();
// 12
return item;
}

标注代码分析

  1. 获取队尾节点l。
  2. 如果没有队尾节点,返回null。
  3. 获取l的前驱节点。
  4. 获取l的元素item。
  5. 将l的item域置空。
  6. 将f的prev域指向自身,帮助GC。
  7. 将l的前驱节点p设置为新的队尾节点。
  8. 如果p为空,说明队列为空了,把队头节点也置空一下。
  9. 如果p不为空,现在p是队尾节点,需要将其next域置空。
  10. 递减当前元素计数
  11. 有元素出队,唤醒在notFull上等待的插入元素的线程。
  12. 返回元素item。

unlinkLast()就是将现有的队尾节点移除,并将其前驱节点设置为新的队尾节点,并返回移除的队尾节点中保存的元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Unlinks x
*/
void unlink(Node<E> x) {
// assert lock.isHeldByCurrentThread();
Node<E> p = x.prev;
Node<E> n = x.next;
if (p == null) {
// 1
unlinkFirst();
} else if (n == null) {
// 2
unlinkLast();
} else {
// 3
p.next = n;
// 4
n.prev = p;
// 5
x.item = null;
// Don't mess with x's links. They may still be in use by
// an iterator.
// 6
--count;
// 7
notFull.signal();
}
}

标注代码分析

  1. 如果x没有前驱节点,那么x就是队头节点,所以调用一下unlinkFirst就可以了。
  2. 如果x没有后继节点,那么x就是队尾节点,所以调用一下unlinkLast就可以了。
  3. 将x的前驱节点p的next指向x的后继节点n。
  4. 将x的后继节点n的prev指向x的前驱节点n。
  5. 置空x的item域。注意没有清理x本身的prev和next域,因为它们可能正在被某个迭代器使用中。
  6. 递减当前元素计数
  7. 有元素出队,唤醒在notFull上等待的插入元素的线程。

LinkedBlockingDeque#putFirst()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* @throws NullPointerException {@inheritDoc}
* @throws InterruptedException {@inheritDoc}
*/
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 1
while (!linkFirst(e))
notFull.await();
} finally {
lock.unlock();
}
}

标注代码分析

  1. 如果插入元素到队头失败,在notFull条件上等待。

LinkedBlockingDeque#putLast()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* @throws NullPointerException {@inheritDoc}
* @throws InterruptedException {@inheritDoc}
*/
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 1
while (!linkLast(e))
notFull.await();
} finally {
lock.unlock();
}
}

标注代码分析

  1. 如果插入元素到队尾失败,在notFull条件上等待。

LinkedBlockingDeque#takeFirst()

1
2
3
4
5
6
7
8
9
10
11
12
13
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
// 1
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}

标注代码分析

  1. 如果从队头获取并删除元素失败,在notEmpty条件上等待。

LinkedBlockingDeque#takeLast()

1
2
3
4
5
6
7
8
9
10
11
12
13
public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
// 1
while ( (x = unlinkLast()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}

标注代码分析

  1. 如果从队尾获取并删除元素失败,在notEmpty条件上等待。

LinkedBlockingDeque#iterator()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Returns an iterator over the elements in this deque in proper sequence.
* The elements will be returned in order from first (head) to last (tail).
* The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
* will never throw {@link ConcurrentModificationException},
* and guarantees to traverse elements as they existed upon
* construction of the iterator, and may (but is not guaranteed to)
* reflect any modifications subsequent to construction.
*
* @return an iterator over the elements in this deque in proper sequence
*/
public Iterator<E> iterator() {
return new Itr();
}

根据注释,可以知道迭代器是弱一致性,支持双向迭代。类似LinkedBlockingDeque#descendingIterator()也是弱一致性,支持双向迭代器。与iterator()的差别在返回顺序。

LinkedBlockingDeque#Itr

1
2
3
4
5
/** Forward iterator */
private class Itr extends AbstractItr {
Node<E> firstNode() { return first; }
Node<E> nextNode(Node<E> n) { return n.next; }
}

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×