/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue. At any point, at most * <tt>nThreads</tt> threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks. The threads in the pool will exist * until it is explicitly {@link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if <tt>nThreads <= 0</tt> */ publicstatic ExecutorService newFixedThreadPool(int nThreads){ returnnew ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
publicinterfaceCompletionService<V> { /** * Submits a value-returning task for execution and returns a Future * representing the pending results of the task. Upon completion, * this task may be taken or polled. * * @param task the task to submit * @return a Future representing pending completion of the task * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if the task is null */ // 1 Future<V> submit(Callable<V> task);
/** * Submits a Runnable task for execution and returns a Future * representing that task. Upon completion, this task may be * taken or polled. * * @param task the task to submit * @param result the result to return upon successful completion * @return a Future representing pending completion of the task, * and whose <tt>get()</tt> method will return the given * result value upon completion * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if the task is null */ // 2 Future<V> submit(Runnable task, V result);
/** * Retrieves and removes the Future representing the next * completed task, waiting if none are yet present. * * @return the Future representing the next completed task * @throws InterruptedException if interrupted while waiting */ // 3 Future<V> take()throws InterruptedException;
/** * Retrieves and removes the Future representing the next * completed task or <tt>null</tt> if none are present. * * @return the Future representing the next completed task, or * <tt>null</tt> if none are present */ // 4 Future<V> poll();
/** * Retrieves and removes the Future representing the next * completed task, waiting if necessary up to the specified wait * time if none are yet present. * * @param timeout how long to wait before giving up, in units of * <tt>unit</tt> * @param unit a <tt>TimeUnit</tt> determining how to interpret the * <tt>timeout</tt> parameter * @return the Future representing the next completed task or * <tt>null</tt> if the specified waiting time elapses * before one is present * @throws InterruptedException if interrupted while waiting */ // 5 Future<V> poll(long timeout, TimeUnit unit)throws InterruptedException; }
/** * The underlying map. Uses Boolean.TRUE as value for each * element. This field is declared final for the sake of thread * safety, which entails some ugliness in clone() */ privatefinal ConcurrentNavigableMap<E,Object> m;
/** * Constructs a new, empty set that orders its elements according to * their {@linkplain Comparable natural ordering}. */ publicConcurrentSkipListSet(){ m = new ConcurrentSkipListMap<E,Object>(); } ...
/** * Creates a new marker node. A marker is distinguished by * having its value field point to itself. Marker nodes also * have null keys, a fact that is exploited in a few places, * but this doesn't distinguish markers from the base-level * header node (head.node), which also has a null key. */ Node(Node<K,V> next) { this.key = null; this.value = this; this.next = next; } ``` 创建一个标记节点。key==null无法区分标记节点和base-level链表头节点,因为base-level链表头节点的key也是null。
/** * Nodes heading each level keep track of their level. */ staticfinalclassHeadIndex<K,V> extendsIndex<K,V> { finalint level; HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) { super(node, down, right); this.level = level; } }
头索引节点,只是增加level属性用来标示索引层级。
ConcurrentSkipListMap属性
1 2 3 4 5 6 7 8 9 10 11
/** * Special value used to identify base-level header */ // 1 privatestaticfinal Object BASE_HEADER = new Object();
/** * The topmost head index of the skiplist. */ // 2 privatetransientvolatile HeadIndex<K,V> head;
标注代码分析
用来定义base-level的头结点。
跳表最高层的head index。
ConcurrentSkipListMap构造方法
1 2 3 4 5 6 7 8
/** * Constructs a new, empty map, sorted according to the * {@linkplain Comparable natural ordering} of the keys. */ publicConcurrentSkipListMap(){ this.comparator = null; initialize(); }
keys排序,并且调用ConcurrentSkipListMap#initialize()。
ConcurrentSkipListMap#initialize()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/** * Initializes or resets state. Needed by constructors, clone, * clear, readObject. and ConcurrentSkipListSet.clone. * (Note that comparator must be separately initialized.) */ finalvoidinitialize(){ keySet = null; entrySet = null; values = null; descendingMap = null; randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero // 1 head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null), null, null, 1); }
标注代码分析
生成头节点,该节点value是BASE_HEADER,level是1。
ConcurrentSkipListMap#put()
1 2 3 4 5
public V put(K key, V value){ if (value == null) thrownew NullPointerException(); return doPut(key, value, false); }
private V doPut(K kkey, V value, boolean onlyIfAbsent){ // 1 Comparable<? super K> key = comparable(kkey); for (;;) { // 2 Node<K,V> b = findPredecessor(key); Node<K,V> n = b.next; for (;;) { if (n != null) { // 3 Node<K,V> f = n.next; // 4 if (n != b.next) // inconsistent read break;; Object v = n.value; // 5 if (v == null) { // n is deleted n.helpDelete(b, f); break; } // 6 if (v == n || b.value == null) // b is deleted break; int c = key.compareTo(n.key); // 7 if (c > 0) { b = n; n = f; continue; } if (c == 0) { // 8 if (onlyIfAbsent || n.casValue(v, value)) return (V)v; else break; // restart if lost race to replace value } // else c < 0; fall through } // 9 Node<K,V> z = new Node<K,V>(kkey, value, n); if (!b.casNext(n, z)) break; // restart if lost race to append to b int level = randomLevel(); // 10 if (level > 0) insertIndex(z, level); returnnull; } } }
/** * Adds given index nodes from given level down to 1. * @param idx the topmost index node being inserted * @param h the value of head to use to insert. This must be * snapshotted by callers to provide correct insertion level * @param indexLevel the level of the index */ privatevoidaddIndex(Index<K,V> idx, HeadIndex<K,V> h, int indexLevel){ // Track next level to insert in case of retries int insertionLevel = indexLevel; Comparable<? super K> key = comparable(idx.node.key); if (key == null) thrownew NullPointerException();
// Similar to findPredecessor, but adding index nodes along // path to key. for (;;) { int j = h.level; Index<K,V> q = h; Index<K,V> r = q.right; Index<K,V> t = idx; for (;;) { if (r != null) { Node<K,V> n = r.node; // compare before deletion check avoids needing recheck int c = key.compareTo(n.key); if (n.value == null) { if (!q.unlink(r)) break; r = q.right; continue; } if (c > 0) { q = r; r = r.right; continue; } }
if (j == insertionLevel) { // Don't insert index if node already deleted if (t.indexesDeletedNode()) { findNode(key); // cleans up return; } if (!q.link(r, t)) break; // restart if (--insertionLevel == 0) { // need final deletion check before return if (t.indexesDeletedNode()) findNode(key); return; } }
if (--j >= insertionLevel && j < indexLevel) t = t.down; q = q.down; r = q.right; } } }
/** * Returns a base-level node with key strictly less than given key, * or the base-level header if there is no such node. Also * unlinks indexes to deleted nodes found along the way. Callers * rely on this side-effect of clearing indices to deleted nodes. * @param key the key * @return a predecessor of key */ private Node<K,V> findPredecessor(Comparable<? super K> key){ if (key == null) thrownew NullPointerException(); // don't postpone errors for (;;) { Index<K,V> q = head; Index<K,V> r = q.right; for (;;) { if (r != null) { Node<K,V> n = r.node; K k = n.key; if (n.value == null) { if (!q.unlink(r)) break; // restart r = q.right; // reread r continue; } if (key.compareTo(k) > 0) { q = r; r = r.right; continue; } } Index<K,V> d = q.down; if (d != null) { q = d; r = d.right; } else return q.node; } } }
/** * Returns a random level for inserting a new node. * Hardwired to k=1, p=0.5, max 31 (see above and * Pugh's "Skip List Cookbook", sec 3.4). * * This uses the simplest of the generators described in George * Marsaglia's "Xorshift RNGs" paper. This is not a high-quality * generator but is acceptable here. */ privateintrandomLevel(){ int x = randomSeed; x ^= x << 13; x ^= x >>> 17; randomSeed = x ^= x << 5; if ((x & 0x8001) != 0) // test highest and lowest bits return0; int level = 1; while (((x >>>= 1) & 1) != 0) ++level; return level; }
/** * Creates and adds index nodes for the given node. * @param z the node * @param level the level of the index */ privatevoidinsertIndex(Node<K,V> z, int level){ HeadIndex<K,V> h = head; int max = h.level;
if (level <= max) { Index<K,V> idx = null; for (int i = 1; i <= level; ++i) idx = new Index<K,V>(z, idx, null); addIndex(idx, h, level);
} else { // Add a new level /* * To reduce interference by other threads checking for * empty levels in tryReduceLevel, new levels are added * with initialized right pointers. Which in turn requires * keeping levels in an array to access them while * creating new head index nodes from the opposite * direction. */ level = max + 1; Index<K,V>[] idxs = (Index<K,V>[])new Index[level+1]; Index<K,V> idx = null; for (int i = 1; i <= level; ++i) idxs[i] = idx = new Index<K,V>(z, idx, null);
HeadIndex<K,V> oldh; int k; for (;;) { oldh = head; int oldLevel = oldh.level; if (level <= oldLevel) { // lost race to add level k = level; break; } HeadIndex<K,V> newh = oldh; Node<K,V> oldbase = oldh.node; for (int j = oldLevel+1; j <= level; ++j) newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j); if (casHead(oldh, newh)) { k = oldLevel; break; } } addIndex(idxs[k], oldh, k); } }
/** * Adds given index nodes from given level down to 1. * @param idx the topmost index node being inserted * @param h the value of head to use to insert. This must be * snapshotted by callers to provide correct insertion level * @param indexLevel the level of the index */ privatevoidaddIndex(Index<K,V> idx, HeadIndex<K,V> h, int indexLevel){ // Track next level to insert in case of retries int insertionLevel = indexLevel; Comparable<? super K> key = comparable(idx.node.key); if (key == null) thrownew NullPointerException();
// Similar to findPredecessor, but adding index nodes along // path to key. for (;;) { int j = h.level; Index<K,V> q = h; Index<K,V> r = q.right; Index<K,V> t = idx; for (;;) { if (r != null) { Node<K,V> n = r.node; // compare before deletion check avoids needing recheck int c = key.compareTo(n.key); if (n.value == null) { if (!q.unlink(r)) break; r = q.right; continue; } if (c > 0) { q = r; r = r.right; continue; } }
if (j == insertionLevel) { // Don't insert index if node already deleted if (t.indexesDeletedNode()) { findNode(key); // cleans up return; } if (!q.link(r, t)) break; // restart if (--insertionLevel == 0) { // need final deletion check before return if (t.indexesDeletedNode()) findNode(key); return; } }
if (--j >= insertionLevel && j < indexLevel) t = t.down; q = q.down; r = q.right; } } }
private Node<K,V> findNode(Comparable<? super K> key){ for (;;) { Node<K,V> b = findPredecessor(key); Node<K,V> n = b.next; for (;;) { if (n == null) returnnull; Node<K,V> f = n.next; if (n != b.next) // inconsistent read break; Object v = n.value; if (v == null) { // n is deleted n.helpDelete(b, f); break; } if (v == n || b.value == null) // b is deleted break; int c = key.compareTo(n.key); if (c == 0) return n; if (c < 0) returnnull; b = n; n = f; } } }
private V getUsingFindNode(Comparable<? super K> key){ /* * Loop needed here and elsewhere in case value field goes * null just as it is about to be returned, in which case we * lost a race with a deletion, so must retry. */ for (;;) { Node<K,V> n = findNode(key); if (n == null) returnnull; Object v = n.value; if (v != null) return (V)v; } }
final V doRemove(Object okey, Object value){ Comparable<? super K> key = comparable(okey); for (;;) { Node<K,V> b = findPredecessor(key); Node<K,V> n = b.next; for (;;) { if (n == null) returnnull; Node<K,V> f = n.next; if (n != b.next) // inconsistent read break; Object v = n.value; if (v == null) { // n is deleted n.helpDelete(b, f); break; } if (v == n || b.value == null) // b is deleted break; int c = key.compareTo(n.key); if (c < 0) returnnull; if (c > 0) { b = n; n = f; continue; } if (value != null && !value.equals(v)) returnnull; if (!n.casValue(v, null)) break; if (!n.appendMarker(f) || !b.casNext(n, f)) findNode(key); // Retry via findNode else { findPredecessor(key); // Clean index if (head.right == null) tryReduceLevel(); } return (V)v; } } }
ConcurrentSkipListMap#tryReduceLevel()
1 2 3 4 5 6 7 8 9 10 11 12 13 14
privatevoidtryReduceLevel(){ HeadIndex<K,V> h = head; HeadIndex<K,V> d; HeadIndex<K,V> e; if (h.level > 3 && (d = (HeadIndex<K,V>)h.down) != null && (e = (HeadIndex<K,V>)d.down) != null && e.right == null && d.right == null && h.right == null && casHead(h, d) && // try to set h.right != null) // recheck casHead(d, h); // try to backout }
publicbooleancontainsValue(Object value){ if (value == null) thrownew NullPointerException(); for (Node<K,V> n = findFirst(); n != null; n = n.next) { V v = n.getValidValue(); if (v != null && value.equals(v)) returntrue; } returnfalse; }
base_level链的第一个节点,然后一直往后找,比较value值。
ConcurrentSkipListMap#findFirst()
1 2 3 4 5 6 7 8 9 10 11
Node<K,V> findFirst(){ for (;;) { Node<K,V> b = head.node; Node<K,V> n = b.next; if (n == null) returnnull; if (n.value != null) return n; n.helpDelete(b, n.next); } }
Node<K,V> findLast(){ /* * findPredecessor can't be used to traverse index level * because this doesn't use comparisons. So traversals of * both levels are folded together. */ Index<K,V> q = head; for (;;) { Index<K,V> d, r; if ((r = q.right) != null) { if (r.indexesDeletedNode()) { q.unlink(r); q = head; // restart } else q = r; } elseif ((d = q.down) != null) { q = d; } else { Node<K,V> b = q.node; Node<K,V> n = b.next; for (;;) { if (n == null) return (b.isBaseHeader())? null : b; Node<K,V> f = n.next; // inconsistent read if (n != b.next) break; Object v = n.value; if (v == null) { // n is deleted n.helpDelete(b, f); break; } if (v == n || b.value == null) // b is deleted break; b = n; n = f; } q = head; // restart } } }
ConcurrentSkipListMap#lowerEntry()
1 2 3
public Map.Entry<K,V> lowerEntry(K key){ return getNear(key, LT); }
找到一个比给定key小的所有key里面最大的key对应的Entry,里面调用了getNear()。
ConcurrentSkipListMap#getNear()
1 2 3 4 5 6 7 8 9 10
AbstractMap.SimpleImmutableEntry<K,V> getNear(K key, int rel){ for (;;) { Node<K,V> n = findNear(key, rel); if (n == null) returnnull; AbstractMap.SimpleImmutableEntry<K,V> e = n.createSnapshot(); if (e != null) return e; } }
} /** * A node from which the first live (non-deleted) node (if any) * can be reached in O(1) time. * Invariants: * - all live nodes are reachable from head via succ() * - head != null * - (tmp = head).next != tmp || tmp != head * Non-invariants: * - head.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! */ // 1 privatetransientvolatile Node<E> head = new Node<E>(null);
/** * A node from which the last node on list (that is, the unique * node with node.next == null) can be reached in O(1) time. * Invariants: * - the last node is always reachable from tail via succ() * - tail != null * Non-invariants: * - tail.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! * - tail.next may or may not be self-pointing to tail. */ // 2 privatetransientvolatile Node<E> tail = head;
/** * Creates a {@code ConcurrentLinkedQueue} that is initially empty. */ publicConcurrentLinkedQueue(){}
/** * If the specified key is not already associated * with a value, associate it with the given value. * This is equivalent to * <pre> * if (!map.containsKey(key)) * return map.put(key, value); * else * return map.get(key);</pre> * except that the action is performed atomically. * ... * ... */ V putIfAbsent(K key, V value);