/* * Implemented as a simple doubly-linked list protected by a * single lock and using conditions to manage blocking. * * To implement weakly consistent iterators, it appears we need to * keep all Nodes GC-reachable from a predecessor dequeued Node. * That would cause two problems: * - allow a rogue Iterator to cause unbounded memory retention * - cause cross-generational linking of old Nodes to new Nodes if * a Node was tenured while live, which generational GCs have a * hard time dealing with, causing repeated major collections. * However, only non-deleted Nodes need to be reachable from * dequeued Nodes, and reachability does not necessarily have to * be of the kind understood by the GC. We use the trick of * linking a Node that has just been dequeued to itself. Such a * self-link implicitly means to advance to head. */
/* * We have "diamond" multiple interface/abstract class inheritance * here, and that introduces ambiguities. Often we want the * BlockingDeque javadoc combined with the AbstractQueue * implementation, so a lot of method specs are duplicated here. */
/** Doubly-linked list node class */ staticfinalclassNode<E> { /** * The item, or null if this node has been removed. */ // 1 E item;
/** * One of: * - the real predecessor Node * - this Node, meaning the predecessor is tail * - null, meaning there is no predecessor */ // 2 Node<E> prev;
/** * One of: * - the real successor Node * - this Node, meaning the successor is head * - null, meaning there is no successor */ // 3 Node<E> next; Node(E x, Node<E> p, Node<E> n) { item = x; prev = p; next = n; } }
/** Pointer to first node */ // 4 transient Node<E> first; /** Pointer to last node */ // 5 transient Node<E> last; /** Number of items in the deque */ // 6 privatetransientint count; /** Maximum number of items in the deque */ // 7 privatefinalint capacity; /** Main lock guarding all access */ // 8 final ReentrantLock lock = new ReentrantLock(); /** Condition for waiting takes */ // 9 privatefinal Condition notEmpty = lock.newCondition(); /** Condition for waiting puts */ // 10 privatefinal Condition notFull = lock.newCondition();
/** * Creates a <tt>LinkedBlockingDeque</tt> with a capacity of * {@link Integer#MAX_VALUE}. */ // 11 publicLinkedBlockingDeque(){ this(Integer.MAX_VALUE); }
/** * Creates a <tt>LinkedBlockingDeque</tt> with the given (fixed) capacity. * * @param capacity the capacity of this deque * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1 */ publicLinkedBlockingDeque(int capacity){ if (capacity <= 0) thrownew IllegalArgumentException(); this.capacity = capacity; }
/** * Unlinks x */ voidunlink(Node<E> x){ // assert lock.isHeldByCurrentThread(); Node<E> p = x.prev; Node<E> n = x.next; if (p == null) { // 1 unlinkFirst(); } elseif (n == null) { // 2 unlinkLast(); } else { // 3 p.next = n; // 4 n.prev = p; // 5 x.item = null; // Don't mess with x's links. They may still be in use by // an iterator. // 6 --count; // 7 notFull.signal(); } }
标注代码分析
如果x没有前驱节点,那么x就是队头节点,所以调用一下unlinkFirst就可以了。
如果x没有后继节点,那么x就是队尾节点,所以调用一下unlinkLast就可以了。
将x的前驱节点p的next指向x的后继节点n。
将x的后继节点n的prev指向x的前驱节点n。
置空x的item域。注意没有清理x本身的prev和next域,因为它们可能正在被某个迭代器使用中。
递减当前元素计数
有元素出队,唤醒在notFull上等待的插入元素的线程。
LinkedBlockingDeque#putFirst()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/** * @throws NullPointerException {@inheritDoc} * @throws InterruptedException {@inheritDoc} */ publicvoidputFirst(E e)throws InterruptedException { if (e == null) thrownew NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { // 1 while (!linkFirst(e)) notFull.await(); } finally { lock.unlock(); } }
标注代码分析
如果插入元素到队头失败,在notFull条件上等待。
LinkedBlockingDeque#putLast()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/** * @throws NullPointerException {@inheritDoc} * @throws InterruptedException {@inheritDoc} */ publicvoidputLast(E e)throws InterruptedException { if (e == null) thrownew NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { // 1 while (!linkLast(e)) notFull.await(); } finally { lock.unlock(); } }
标注代码分析
如果插入元素到队尾失败,在notFull条件上等待。
LinkedBlockingDeque#takeFirst()
1 2 3 4 5 6 7 8 9 10 11 12 13
public E takeFirst()throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; // 1 while ( (x = unlinkFirst()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } }
标注代码分析
如果从队头获取并删除元素失败,在notEmpty条件上等待。
LinkedBlockingDeque#takeLast()
1 2 3 4 5 6 7 8 9 10 11 12 13
public E takeLast()throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; // 1 while ( (x = unlinkLast()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } }
标注代码分析
如果从队尾获取并删除元素失败,在notEmpty条件上等待。
LinkedBlockingDeque#iterator()
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/** * Returns an iterator over the elements in this deque in proper sequence. * The elements will be returned in order from first (head) to last (tail). * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that * will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) * reflect any modifications subsequent to construction. * * @return an iterator over the elements in this deque in proper sequence */ public Iterator<E> iterator(){ returnnew Itr(); }