JDK1.6 LinkedBlockingQueue

介绍

LinkedBlockingQueue是一种基于单向链表实现的有界的(可选的,不指定默认int最大值)阻塞队列。队列中的元素遵循先入先出(FIFO)的规则。新元素插入到队列的尾部,从队列头部取出元素。(在并发程序中,基于链表实现的队列和基于数组实现的队列相比,往往具有更高的吞吐量,但性能稍差一些)

源码分析

LinkedBlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -6903933977591709194L;

/*
* A variant of the "two lock queue" algorithm. The putLock gates
* entry to put (and offer), and has an associated condition for
* waiting puts. Similarly for the takeLock. The "count" field
* that they both rely on is maintained as an atomic to avoid
* needing to get both locks in most cases. Also, to minimize need
* for puts to get takeLock and vice-versa, cascading notifies are
* used. When a put notices that it has enabled at least one take,
* it signals taker. That taker in turn signals others if more
* items have been entered since the signal. And symmetrically for
* takes signalling puts. Operations such as remove(Object) and
* iterators acquire both locks.
*
* .....
*/

/**
* Linked list node class
*/
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/

Node<E> next;
Node(E x) { item = x; }
}

/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger(0);

/** Head of linked list */
private transient Node<E> head;

/** Tail of linked list */
private transient Node<E> last;

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

/**
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}

/**
* Creates a node and links it at end of queue.
* @param x the item
*/
private void enqueue(E x) {
// assert putLock.isHeldByCurrentThread();
last = last.next = new Node<E>(x);
}

/**
* Removes a node from head of queue.
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

/**
* Lock to prevent both puts and takes.
*/
void fullyLock() {
putLock.lock();
takeLock.lock();
}

/**
* Unlock to allow both puts and takes.
*/
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}

/**
* Tells whether both locks are held by current thread.
*/
boolean isFullyLocked() {
return (putLock.isHeldByCurrentThread() &&
takeLock.isHeldByCurrentThread());
}


/**
* Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

/**
* Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if <tt>capacity</tt> is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}

/**
* Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
* {@link Integer#MAX_VALUE}, initially containing the elements of the
* given collection,
* added in traversal order of the collection's iterator.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(e);
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
...
  • takeLock对take(), poll()的锁。putLock()对put(), offer()的锁。

LinkedBlockingQueue#put()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from
* capacity. Similarly for all other uses of count in
* other wait guards.
*/
// 1
while (count.get() == capacity) {
notFull.await();
}
enqueue(e);
c = count.getAndIncrement();
// 2
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 3
if (c == 0)
signalNotEmpty();
}

标注代码分析

  1. 检查当前容量,如果当前没有线程,则等待。
  2. c+1表示新增element,如果还没有达到capacity,唤醒notFull条件上等待的线程。
  3. c初始-1,由-1到0,队列是空的情况下插入了1个元素,唤醒notNull条件上等待的线程。

LinkedBlockingQueue#enqueue()

1
2
3
4
5
6
7
8
/**
* Creates a node and links it at end of queue.
* @param x the item
*/
private void enqueue(E x) {
// assert putLock.isHeldByCurrentThread();
last = last.next = new Node<E>(x);
}

LinkedBlockingQueue#signalNotEmpty()

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

LinkedBlockingQueue#take()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

LinkedBlockingQueue#dequeue()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Removes a node from head of queue.
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

LinkedBlockingQueue#signalNotFull()

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}

take()代码原理和put()类似。

LinkedBlockingQueue#remove()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element <tt>e</tt> such
* that <tt>o.equals(e)</tt>, if this queue contains one or more such
* elements.
* Returns <tt>true</tt> if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
* @return <tt>true</tt> if this queue changed as a result of the call
*/
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}

LinkedBlockingQueue#fullyLock()

1
2
3
4
5
6
7
/**
* Lock to prevent both puts and takes.
*/
void fullyLock() {
putLock.lock();
takeLock.lock();
}

LinkedBlockingQueue#fullyUnlock()

1
2
3
4
5
6
7
/**
* Unlock to allow both puts and takes.
*/
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}

保证原子性,做remove()操作,把take(),poll(),put(),offer()都加上锁。

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×