JDK1.6 ConcurrentHashMap

简介

ConcurrentHashMap是一种线程安全的HashMap。相对于HashTable和Collections.synchronizedMap(),ConcurrentHashMap具有更好的性能和伸缩性,是由于其使用了分段锁的策略,将内部数据分为多个段,每个段单独加锁,而不是整个HashMap加锁,这样能减少很多不必要的锁争用。

源码分析

类结构图

ConcurrentMap#putIfAbsent()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* If the specified key is not already associated
* with a value, associate it with the given value.
* This is equivalent to
* <pre>
* if (!map.containsKey(key))
* return map.put(key, value);
* else
* return map.get(key);</pre>
* except that the action is performed atomically.
* ...
* ...
*/
V putIfAbsent(K key, V value);

如果map中已经存在给定的key,返回map中key对应的value;如果不存在给定的key,插入给定的key和value。
这个是一个原子操作,逻辑相当于如下代码。

1
2
3
4
if (!map.containsKey(key))
return map.put(key, value);
else
return map.get(key);

ConcurrentMap#remove()

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Removes the entry for a key only if currently mapped to a given value.
* This is equivalent to
* <pre>
* if (map.containsKey(key) &amp;&amp; map.get(key).equals(value)) {
* map.remove(key);
* return true;
* } else return false;</pre>
* except that the action is performed atomically.
* ...
* ...
*/
boolean remove(Object key, Object value);

如果map中存在给定的key,并且map中对应的value等于给定的value,那么删除这个key和value。
这是一个原子操作,逻辑相当于如下代码。

1
2
3
4
if (map.containsKey(key) && map.get(key).equals(value)) {
map.remove(key);
return true;
} else return false;

ConcurrentMap#replace(K key, V oldValue, V newValue)

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Replaces the entry for a key only if currently mapped to a given value.
* This is equivalent to
* <pre>
* if (map.containsKey(key) &amp;&amp; map.get(key).equals(oldValue)) {
* map.put(key, newValue);
* return true;
* } else return false;</pre>
* except that the action is performed atomically.
* ...
* ...
*/
boolean replace(K key, V oldValue, V newValue);

如果map中存在给定的key,并且map中对应的value也等于给定的oldValue,那么将这个key对应的value替换成newValue。
这是一个原子操作,逻辑相当于如下代码。

1
2
3
4
if (map.containsKey(key) &amp;&amp; map.get(key).equals(oldValue)) {
map.put(key, newValue);
return true;
} else return false;

ConcurrentMap#replace(K key, V value)

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Replaces the entry for a key only if currently mapped to some value.
* This is equivalent to
* <pre>
* if (map.containsKey(key)) {
* return map.put(key, value);
* } else return null;</pre>
* except that the action is performed atomically.
* ...
* ...
*/
V replace(K key, V value);

如果map中已经存在给定的key,那么将这个key对应的value替换成给定的value。
这是一个原子操作,逻辑相当于如下代码。

1
2
3
if (map.containsKey(key)) {
return map.put(key, value);
} else return null;

ConcurrentHashMap属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Mask value for indexing into segments. The upper bits of a
* key's hash code are used to choose the segment.
*/
// 1
final int segmentMask;

/**
* Shift value for indexing within segments.
*/
// 2
final int segmentShift;

/**
* The segments, each of which is a specialized hash table
*/
// 3
final Segment<K,V>[] segments;

transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;
transient Collection<V> values;

标注代码分析

  1. 用于key的hash code计算,在segment数组中选择合适的segment。
  2. segment#hash索引的偏移量。
  3. segment是一个特殊的hash table。

ConcurrentHashMap#Segment<K,V>

Segment结构图

ConcurrentHashMap#Segment<K,V>属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* The number of elements in this segment's region.
*/
// 1
transient volatile int count;

/**
* Number of updates that alter the size of the table. This is
* used during bulk-read methods to make sure they see a
* consistent snapshot: If modCounts change during a traversal
* of segments computing size or checking containsValue, then
* we might have an inconsistent view of state so (usually)
* must retry.
*/
// 2
transient int modCount;

/**
* The table is rehashed when its size exceeds this threshold.
* (The value of this field is always <tt>(int)(capacity *
* loadFactor)</tt>.)
*/
// 3
transient int threshold;

/**
* The per-segment table.
*/
transient volatile HashEntry<K,V>[] table;

/**
* The load factor for the hash table. Even though this value
* is same for all segments, it is replicated to avoid needing
* links to outer object.
* @serial
*/
// 4
final float loadFactor;

标注代码分析

  1. 记录segment中的元素数量。其他操作会利用count的volatile读写来保证可见性,避免使用锁。
  2. 统计跟踪修改,用来保证一些批量操作的一致性。如果modCount计算siez()或检查value的遍历过程中发生变化,那么可能会有一个不一致的状态,必须重新检测状态。
  3. 当哈希表的容量超过这个阀值会扩容,里面的元素会重新散列。 capacity * loadFactor
  4. 哈希表的加载因子。

ConcurrentHashMap#Segment<K,V>构造

1
2
3
4
Segment(int initialCapacity, float lf) {
loadFactor = lf;
setTable(HashEntry.<K,V>newArray(initialCapacity));
}

初始化加载因子,初始化HashEntry数组,数组容量为initialCapacity。
哈希表内部一般会有初始容量size和加载因子loadFactor,当哈希表中的元素数量达到(size * loadFactor)的时候,就会触发哈希表进行rehash()。假设哈希表使用链表法来解决哈希冲突,那么如果加载因子太大,会导致哈希表中每个桶里面的链表平均长度过长,这样会影响查询性能;但如果加载因子过小,又会浪费太多内存空间。时间和空间的权衡,需要按实际情况来选择合适的加载因子。

ConcurrentHashMap#Segment<K,V>#setTable

1
2
3
4
5
6
7
8
/**
* Sets table to new HashEntry array.
* Call only while holding lock or in constructor.
*/
void setTable(HashEntry<K,V>[] newTable) {
threshold = (int)(newTable.length * loadFactor);
table = newTable;
}

初始化threshold、HashEntry<K,V>[] table。

ConcurrentHashMap#HashEntry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;

HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
this.key = key;
this.hash = hash;
this.next = next;
this.value = value;
}

@SuppressWarnings("unchecked")
static final <K,V> HashEntry<K,V>[] newArray(int i) {
return new HashEntry[i];
}
}

HashEntry#newArray()生成HashEntry数组,数组容量是i。
volatile V value是volatile根据Java Menory Mode的Happen-Before保证可见性。

ConcurrentHashMap#Constants

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* The default initial capacity for this table,
* used when not otherwise specified in a constructor.
*/
// 1
static final int DEFAULT_INITIAL_CAPACITY = 16;

/**
* The default load factor for this table, used when not
* otherwise specified in a constructor.
*/
// 2
static final float DEFAULT_LOAD_FACTOR = 0.75f;

/**
* The default concurrency level for this table, used when not
* otherwise specified in a constructor.
*/
// 3
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**
* The maximum capacity, used if a higher value is implicitly
* specified by either of the constructors with arguments. MUST
* be a power of two <= 1<<30 to ensure that entries are indexable
* using ints.
*/
// 4
static final int MAXIMUM_CAPACITY = 1 << 30;

/**
* The maximum number of segments to allow; used to bound
* constructor arguments.
*/
// 5
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

/**
* Number of unsynchronized retries in size and containsValue
* methods before resorting to locking. This is used to avoid
* unbounded retries if tables undergo continuous modification
* which would make it impossible to obtain an accurate result.
*/
// 6
static final int RETRIES_BEFORE_LOCK = 2;

标注代码分析

  1. 默认capacity值,segment中hashTable长度。
  2. 默认加载因子。
  3. 默认Segment#table的并发级别,影响Segment#table容量。
  4. Segment#table的最大容量。
  5. 允许的最大的Segment#table容量。
  6. 在size()和containsValue(),加锁之前的尝试操作次数。

ConcurrentHashMap构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();

if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;

// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
// 1
ssize <<= 1;
}
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);

if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = 1;
// 2
while (cap < c)
cap <<= 1;
//3
for (int i = 0; i < this.segments.length; ++i)
this.segments[i] = new Segment<K,V>(cap, loadFactor);
}

标注代码分析

  1. ssize最后是比concurrencyLevel大的最小的2的幂。如果concurrencyLevel是50,那么ssize是64,segmentShift是26,segmentMask是 00000000 00000000 00000000 00111111。
  2. cap是比总体容量平均分到每个segment的数量大的最小的2的幂。
  3. 初始化segments[]。

ConcurrentMap内部包含一个segment的数组;而segment本身又是一个哈希表(HashEntry<K,V>[] table),并且自带锁(继承ReentrantLock);内部哈希表使用链表法解决哈希冲突,每个数组元素是一个单链表(HashEntry)。

ConcurrentHashMap#put()

1
2
3
4
5
6
7
8
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
// 1
int hash = hash(key.hashCode());
// 2
return segmentFor(hash).put(key, hash, value, false);
}

标注代码分析

  1. 重新计算hash值,根据hash值确认segment。
  2. 执行segment#put。

ConcurrentHashMap#hash()

1
2
3
4
5
6
7
8
9
10
private static int hash(int h) {
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}

key#hashCode再次hash一次,使得hash值更加散列。因为ConcurrentHashMap中哈希表的长度都是2的幂,会增加一些冲突几率,比如两个hashCode高位不同但低位相同,对哈希表长度取模时正好忽略了这些高位,造成冲突。这里是采用了Wang/Jenkins哈希算法的一个变种。

ConcurrentHashMap#segmentFor()

1
2
3
4
5
6
7
8
/**
* Returns the segment that should be used for key with given hash
* @param hash the hash code for the key
* @return the segment
*/
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}

& segmentMask除去高位确定segments下标。

ConcurrentHashMap#Segment#put()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 1
lock();
try {
int c = count;
// 2
if (c++ > threshold) // ensure capacity
rehash();
HashEntry<K,V>[] tab = table;
// 3
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
// 4
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;

V oldValue;
// 5
if (e != null) {
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
else {
oldValue = null;
// 6
++modCount;
// 7
tab[index] = new HashEntry<K,V>(key, hash, first, value);
// 8
count = c; // write-volatile
}
return oldValue;
} finally {
// 9
unlock();
}
}

标注代码分析

  1. Segment继承ReentrantLock,put()加锁。锁是Segment对象上。
  2. 超过扩容阀值,那么进行rehash(),扩容。
  3. hash & (tab.length - 1)取模高效一种方式。获取链表HashEntry下标。
  4. 遍历链表,判断key是否相同,找到系统的key,结束循环或者e = null。
  5. 链表HashEntry#key与key相同,判断是否覆盖旧value。
  6. 没有找到链表HashEntry#key与key相同,则新增一个节点,modCount作为ConcurrentHashMap#size()、ConcurrentHashMap#isEmpty()、、ConcurrentHashMap#containsValue()重新检测segments数组状态。
  7. 在HashEntry[index]上HashEntry新建1个节点。
  8. 操作执行成功,保证volatile的写可见性。
  9. 解锁。

ConcurrentHashMap#Segment#rehash()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
void rehash() {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity >= MAXIMUM_CAPACITY)
return;

HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);
threshold = (int)(newTable.length * loadFactor);
int sizeMask = newTable.length - 1;
for (int i = 0; i < oldCapacity ; i++) {
// We need to guarantee that any existing reads of old Map can
// proceed. So we cannot yet null out each bin.
HashEntry<K,V> e = oldTable[i];
// 1
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;

// Single node on list
// 2
if (next == null)
newTable[idx] = e;

else {
// Reuse trailing consecutive sequence at same slot
// 3
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
// 4
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 5
newTable[lastIdx] = lastRun;
// 6
// Clone all remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
int k = p.hash & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(p.key, p.hash,
n, p.value);
}
}
}
}
table = newTable;
}

标注代码分析

  1. 获取Segment#table的HashEntry<K,V>对象,对象不存在,直接返回newTable。
  2. e#next对象是否存在,如果不存在,赋值当前e对象到newTable新位置。newTable新位置下标是根据e.hash & sizeMask计算出来。
  3. HashEntry e#next存在,假设当前e和e的下标作为newTable的最后一个链表HashEntry对象。
  4. for循环寻找节点e#next的后续节点,寻找最后1个HashEntry对象以及下标。
  5. 确定newTable数组下标边界,且赋值最后1个newTable数组对象HashEntry。
  6. 从e对象循环查询到最后1个对象last,从而克隆原table数据到newTable。

ConcurrentHashMap#get()

1
2
3
4
public V get(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
}

ConcurrentHashMap#Segment#get()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
V get(Object key, int hash) {
// 1
if (count != 0) { // read-volatile
// 2
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
// 3
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return
// 4
readValueUnderLock(e); // recheck
}
// 5
e = e.next;
}
}
return null;
}

标注代码分析

  1. volatile的读可见性。
  2. 根据hash获取正确的链表HashEntry对象e。
  3. 判断当前e#hash和e#key是否一致。一致则返回e#value。
  4. 读取操作加锁,e#value=null有可能是在Segment#table的HashEntry初始化时候发生。
  5. 如果e不匹配,寻找e#next。

ConcurrentHashMap#Segment#getFirst()

1
2
3
4
5
6
7
/**
* Returns properly casted first entry of bin for given hash.
*/
HashEntry<K,V> getFirst(int hash) {
HashEntry<K,V>[] tab = table;
return tab[hash & (tab.length - 1)];
}

返回Segment#table数组HashEntry对象。

ConcurrentHashMap#Segment#readValueUnderLock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Reads value field of an entry under lock. Called if value
* field ever appears to be null. This is possible only if a
* compiler happens to reorder a HashEntry initialization with
* its table assignment, which is legal under memory model
* but is not known to ever occur.
*/
V readValueUnderLock(HashEntry<K,V> e) {
lock();
try {
return e.value;
} finally {
unlock();
}
}

根据注释可以理解,e#value是null,可能发生在HashEntry初始化阶段。

ConcurrentHashMap#Segment#remove()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
V remove(Object key, int hash, Object value) {
lock();
try {
int c = count - 1;
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;

V oldValue = null;
if (e != null) {
V v = e.value;
if (value == null || value.equals(v)) {
oldValue = v;
// All entries following removed node can stay
// in list, but all preceding ones need to be
// cloned.
++modCount;
// 1
HashEntry<K,V> newFirst = e.next;
for (HashEntry<K,V> p = first; p != e; p = p.next)
newFirst = new HashEntry<K,V>(p.key, p.hash,
newFirst, p.value);
// 2
tab[index] = newFirst;
count = c; // write-volatile
}
}
return oldValue;
} finally {
unlock();
}
}
1
2
3
4
5
6
情况1
first(e) e#next
p newfirst
情况2
first ... e1(e) e1#next
p newfirst

标注代码分析

  1. 分为2种情况。情况1,while循环第1次就判断e正确,e=first,且e=frist=p,不执行for循环。e#index替换e原来的位置tab[index]。情况2,while循环第1次未找到正确e,e=e#next之后e!=first,如代码图e的新位置是e1,p=first,p!=e1,新建HashEntry对象,key、value、hash都是p,但是next是e1#next
    p!=e1,就循环的把p-e1之间差额,新建HashEntry,同时把next指向e1#next。直到p=e1时候退出循环,这时候newFirst是e1的上一个节点。替换e1的位置也就是index。
  2. e上一个节点HashEntry替换e在tab的位置index。

ConcurrentHashMap#size()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* Returns the number of key-value mappings in this map. If the
* map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
* <tt>Integer.MAX_VALUE</tt>.
*
* @return the number of key-value mappings in this map
*/
public int size() {
final Segment<K,V>[] segments = this.segments;
// 1
long sum = 0;
long check = 0;
int[] mc = new int[segments.length];
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
// 2
for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
check = 0;
sum = 0;
int mcsum = 0;
// 3
for (int i = 0; i < segments.length; ++i) {
sum += segments[i].count;
mcsum += mc[i] = segments[i].modCount;
}
// 4
if (mcsum != 0) {
for (int i = 0; i < segments.length; ++i) {
check += segments[i].count;
if (mc[i] != segments[i].modCount) {
check = -1; // force retry
break;
}
}
}
// 5
if (check == sum)
break;
}
// 6
if (check != sum) { // Resort to locking all segments
sum = 0;
for (int i = 0; i < segments.length; ++i)
segments[i].lock();
for (int i = 0; i < segments.length; ++i)
sum += segments[i].count;
for (int i = 0; i < segments.length; ++i)
segments[i].unlock();
}
if (sum > Integer.MAX_VALUE)
return Integer.MAX_VALUE;
else
return (int)sum;
}

标注代码分析

  1. sum是所有segment#HashEntry#count总和。check是segment#HashEntry有变化时候的count总和。mc是segment#HashEntry变化数字。
  2. 执行2次无锁检查。新建mcsum属性,segment#HashEntry变化数字总和。
  3. 循环segment数组,计算sum、mcsum、mc。
  4. 如果mcsum!=0,说明有segment#HashEntry数据变化,计算check。如果mc[]!=segment[]#modcount,segment#HashEntry数据又变化(modCount在ConcurrentHashMap#put()、ConcurrentHashMap#remove()、ConcurrentHashMap#clear()才会发生变化),结束本次for循环,再执行for循环后面计算无意义。
  5. 如果2次计算count一致,那么数量就一致,返回count。
  6. 2次无锁统计count不一致,segment[]每个HashEntry对象都加锁,执行count统计。

    总结

    count

    All (synchronized) write operations should write to the “count” field after structurally changing any bin.

bin是HashEntry,HashEntry结构发生变化(添加或者删除),才会写count,保证count可见性。
HashEntry属性中value是volatile,所以本身就保证可见性。覆盖value(、ConcurrentHashMap#replace())时候,不需要重写count。

lock()

锁粒度在segment[]的HashEntry上,所以可以保证segment可以并发操作。

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×