/* * A variant of the "two lock queue" algorithm. The putLock gates * entry to put (and offer), and has an associated condition for * waiting puts. Similarly for the takeLock. The "count" field * that they both rely on is maintained as an atomic to avoid * needing to get both locks in most cases. Also, to minimize need * for puts to get takeLock and vice-versa, cascading notifies are * used. When a put notices that it has enabled at least one take, * it signals taker. That taker in turn signals others if more * items have been entered since the signal. And symmetrically for * takes signalling puts. Operations such as remove(Object) and * iterators acquire both locks. * * ..... */
/** * Linked list node class */ staticclassNode<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */
Node<E> next; Node(E x) { item = x; } }
/** The capacity bound, or Integer.MAX_VALUE if none */ privatefinalint capacity;
/** Current number of elements */ privatefinal AtomicInteger count = new AtomicInteger(0);
/** Head of linked list */ privatetransient Node<E> head;
/** Tail of linked list */ privatetransient Node<E> last;
/** Lock held by take, poll, etc */ privatefinal ReentrantLock takeLock = new ReentrantLock();
/** * Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */ privatevoidsignalNotEmpty(){ final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
/** * Signals a waiting put. Called only from take/poll. */ privatevoidsignalNotFull(){ final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
/** * Creates a node and links it at end of queue. * @param x the item */ privatevoidenqueue(E x){ // assert putLock.isHeldByCurrentThread(); last = last.next = new Node<E>(x); }
/** * Removes a node from head of queue. * @return the node */ private E dequeue(){ // assert takeLock.isHeldByCurrentThread(); Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
/** * Lock to prevent both puts and takes. */ voidfullyLock(){ putLock.lock(); takeLock.lock(); }
/** * Unlock to allow both puts and takes. */ voidfullyUnlock(){ takeLock.unlock(); putLock.unlock(); }
/** * Tells whether both locks are held by current thread. */ booleanisFullyLocked(){ return (putLock.isHeldByCurrentThread() && takeLock.isHeldByCurrentThread()); }
/** * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of * {@link Integer#MAX_VALUE}. */ publicLinkedBlockingQueue(){ this(Integer.MAX_VALUE); }
/** * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity. * * @param capacity the capacity of this queue * @throws IllegalArgumentException if <tt>capacity</tt> is not greater * than zero */ publicLinkedBlockingQueue(int capacity){ if (capacity <= 0) thrownew IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
/** * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of * {@link Integer#MAX_VALUE}, initially containing the elements of the * given collection, * added in traversal order of the collection's iterator. * * @param c the collection of elements to initially contain * @throws NullPointerException if the specified collection or any * of its elements are null */ publicLinkedBlockingQueue(Collection<? extends E> c){ this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) thrownew NullPointerException(); if (n == capacity) thrownew IllegalStateException("Queue full"); enqueue(e); ++n; } count.set(n); } finally { putLock.unlock(); } } ...
/** * Inserts the specified element at the tail of this queue, waiting if * necessary for space to become available. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ publicvoidput(E e)throws InterruptedException { if (e == null) thrownew NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from * capacity. Similarly for all other uses of count in * other wait guards. */ // 1 while (count.get() == capacity) { notFull.await(); } enqueue(e); c = count.getAndIncrement(); // 2 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // 3 if (c == 0) signalNotEmpty(); }
标注代码分析
检查当前容量,如果当前没有线程,则等待。
c+1表示新增element,如果还没有达到capacity,唤醒notFull条件上等待的线程。
c初始-1,由-1到0,队列是空的情况下插入了1个元素,唤醒notNull条件上等待的线程。
LinkedBlockingQueue#enqueue()
1 2 3 4 5 6 7 8
/** * Creates a node and links it at end of queue. * @param x the item */ privatevoidenqueue(E x){ // assert putLock.isHeldByCurrentThread(); last = last.next = new Node<E>(x); }
LinkedBlockingQueue#signalNotEmpty()
1 2 3 4 5 6 7 8 9 10 11 12 13
/** * Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */ privatevoidsignalNotEmpty(){ final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
public E take()throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
LinkedBlockingQueue#dequeue()
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/** * Removes a node from head of queue. * @return the node */ private E dequeue(){ // assert takeLock.isHeldByCurrentThread(); Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
LinkedBlockingQueue#signalNotFull()
1 2 3 4 5 6 7 8 9 10 11 12
/** * Signals a waiting put. Called only from take/poll. */ privatevoidsignalNotFull(){ final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
/** * Removes a single instance of the specified element from this queue, * if it is present. More formally, removes an element <tt>e</tt> such * that <tt>o.equals(e)</tt>, if this queue contains one or more such * elements. * Returns <tt>true</tt> if this queue contained the specified element * (or equivalently, if this queue changed as a result of the call). * * @param o element to be removed from this queue, if present * @return <tt>true</tt> if this queue changed as a result of the call */ publicbooleanremove(Object o){ if (o == null) returnfalse; fullyLock(); try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); returntrue; } } returnfalse; } finally { fullyUnlock(); } }
LinkedBlockingQueue#fullyLock()
1 2 3 4 5 6 7
/** * Lock to prevent both puts and takes. */ voidfullyLock(){ putLock.lock(); takeLock.lock(); }
LinkedBlockingQueue#fullyUnlock()
1 2 3 4 5 6 7
/** * Unlock to allow both puts and takes. */ voidfullyUnlock(){ takeLock.unlock(); putLock.unlock(); }