深入浅出集合框架(二):为并发而生的 ConcurrentHashMap

前言

ConcurrentHashMap 从 JDK 1.5 开始随 java.util.concurrent 包一起引入 JDK 中,主要为了解决 HashMap 线程不安全和 Hashtable 效率不高的问题。

HashMap 是我们日常开发中最常见的一种容器,根据键值对键的哈希值来确定值对键在集合中的存储位置,因此具有良好的存取和查找功能。但众所周知,它在高并发的情境下是线程不安全的。尤其是在 JDK 1.8 之前,rehash 的过程中采用头插法转移结点,高并发下,多个线程同时操作一条链表将直接导致闭链,易出现逆序且环形链表死循环问题,导致死循环并占满 CPU。JDK 1.8 以来,对 HashMap 的内部进行了很大的改进,采用数组 + 链表 + 红黑树的形式来进行数据的存储。rehash 的过程也进行了改动,基于复制的算法思想,不直接操作原链,而是定义了两条链表分别完成对原链的结点分离操作,在多线程的环境下,采用了尾插法,扩容后,新数组中的链表顺序依然与旧数组中的链表顺序保持一致,所有即使是多线程的情况下也是安全的。JDK 1.8 中的 HashMap 虽然不会导致死循环,但是因为 HashMap 多线程下内存不共享的问题,两个线程同时指向一个 hash 桶数组时,会导致数据覆盖的问题,所以 HashMap 是依旧是线程不安全的。

HashTable 是线程安全的容器,它在所有涉及到多线程的操作都加上了 synchronized 关键字来锁住整个 table,这就意味着所有的线程都在竞争一把锁,在多线程的环境下,它是安全的,但是无疑是效率低下的,因此 Hashtable 已经是 Java 中的遗留容器,已经不推荐使用。

因此在多线程条件下,需要满足线程安全,我们可使用 Collections.synchronizedMap 方法构造出一个同步的 Map,使 HashMap 具有线程安全的能力;或者直接使用线程安全的 ConcurrentHashMap。本篇文章将要介绍的 ConcurrentHashMap 是 HashMap 的并发版本,它是线程安全的,并且在高并发的情境下,性能优于 HashMap 很多。

背景

  • 线程不安全的 HashMap

由于 HashMap 是非线程安全的容器,遇到多线程操作同一容器的场景,可能会导致数据不一致: JDK 1.7 中 HashMap 采用了数组 + 链表的数据结构,有线程安全问题(统计不准确,丢失数据,环形链表死循环导致 Cpu 100%),JDK 1.8 中 HashMap 采用了数组 + 链表 + 红黑树的结构,有线程安全问题(统计不准确,丢失数据)。

  • 效率低下的 HashTable 容器

HashTable 容器使用 synchronized 关键字来保证线程安全,但在线程竞争激烈的情况下 HashTable 的效率非常低下。因为当一个线程访问 HashTable 的同步方法时,其他线程访问 HashTable 的同步方法时,可能会进入阻塞或轮询状态。如线程 1 使用 put 进行添加元素,线程 2 不但不能使用 put 方法添加元素,并且也不能使用 get 方法来获取元素,所以竞争越激烈效率越低。

JDK 1.7 基于分段锁的 ConcurrentHashMap

数据结构

HashTable 容器在竞争激烈的并发环境下表现出效率低下的原因,是因为所有访问 HashTable 的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是 ConcurrentHashMap 所使用的锁分段技术。JDK1.7 中的 ConcurrentHashMap 的底层数据结构是数组 + 链表。与 HashMap 不同的是,ConcurrentHashMap 最外层不是一个大的数组,而是一个 Segment 的数组。每个 Segment 包含一个与 HashMap 数据结构差不多的链表数组。

JDK 1.7 中 ConcurrentHashMap 采用 Segment 分段锁的数据结构,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。有些方法需要跨段,比如 size() 和 containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。这种做法,就称之为“分离锁”(lock striping)[1]

ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成。Segment 是一种可重入锁 ReentrantLock,在 ConcurrentHashMap 里扮演锁的角色,HashEntry 则用于存储键值对数据。一个 ConcurrentHashMap 里包含一个 Segment 数组,Segment 的结构和 HashMap 类似,是一种数组和链表结构, 一个 Segment 里包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素, 每个 Segment 守护者一个 HashEntry 数组里的元素, 当对 HashEntry 数组的数据进行修改时,必须首先获得它对应的 Segment 锁。

926638-20170809132445011-2033999443

存储结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {

// 将整个 hashmap 分成几个小的 map,每个 segment 都是一个锁;与 hashtable 相比,这么设计的目的是对于 put, remove 等操作,可以减少并发冲突,对不属于同一个片段的节点可以并发操作,大大提高了性能
final Segment<K,V>[] segments;

// 本质上 Segment 类就是一个小的 hashmap,里面 table 数组存储了各个节点的数据,继承了 ReentrantLock, 可以作为互拆锁使用
static final class Segment<K,V> extends ReentrantLock implements Serializable {

// count 用来统计该段数据的个数,它是 volatile,它用来协调修改和读取操作,以保证读取操作能够读取到几乎最新的修改。协调方式是这样的,每次修改操作做了结构上的改变,如增加 / 删除节点(修改节点的值不算结构上的改变),都要写 count 值,每次读取操作开始都要读取 count 的值。这利用了 Java 5 中对 volatile 语义的增强,对同一个 volatile 变量的写和读存在 happens-before 关系。
transient volatileint count;

// modCount 统计段结构改变的次数,主要是为了检测对多个段进行遍历过程中某个段是否发生改变
transient int modCount;

// threashold 用来表示需要进行 rehash 的界限值
transient int threshold;

// table 数组存储段中节点,每个数组元素是个 hash 链,用 HashEntry 表示。table 也是 volatile,这使得能够读取到最新的 table 值而不需要同步
transient volatile HashEntry<K,V>[] table;

// loadFactor 表示负载因子。
final float loadFactor;
}

// 基本节点,存储 Key,value 值,可以看到除了 value 不是 final 的,其它值都是 final 的,这意味着不能从 hash 链的中间或尾部添加或删除节点,因为这需要修改 next 引用值,所有的节点的修改只能从头部开始。对于 put 操作,可以一律添加到 Hash 链的头部。但是对于 remove 操作,可能需要从中间删除一个节点,这就需要将要删除节点的前面所有节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。为了确保读操作能够看到最新的值,将 value 设置成 volatile,这避免了加锁。
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
final HashEntry<K,V> next;
}
}

构造方法

它把区间按照并发级别(concurrentLevel),分成了若干个 segment。默认情况下内部按并发级别为 16 来创建。对于每个 segment 的容量,默认情况也是 16。concurrentLevel,segment 可以通过构造函数设定的。通过按位与的哈希算法来定位 segments 数组的索引,必须保证 segments 数组的长度是 2 的 N 次方(power-of-two size),所以必须计算出一个是大于或等于 concurrencyLevel 的最小的 2 的 N 次方值来作为 segments 数组的长度。假如concurrencyLevel等于14,15或16,ssize都会等于16,即容器里锁的个数也是16。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
// 校验参数
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 并发级别数大于最大Segment数量
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K, V> s0 =
new Segment<K, V>(loadFactor, (int) (cap * loadFactor),
(HashEntry<K, V>[]) new HashEntry[cap]);
Segment<K, V>[] ss = (Segment<K, V>[]) new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}

f7e40dd35ad859e5cafbfd091341e14e061

构造方法

寻址方式

对于读操作,通过 Key 哈希值的高 N 位对 Segment 个数取模从而得到该 Key 应该属于哪个 Segment,接着如同操作 HashMap 一样操作这个 Segment。Segment 继承自 ReentrantLock,所以我们可以很方便的对每一个 Segment 上锁。对于写操作,并不要求同时获取所有 Segment 的锁,因为那样相当于锁住了整个 Map。它会先获取该键值对所在的 Segment 的锁,获取成功后就可以像操作一个普通的 HashMap 一样操作该 Segment,并保证该 Segment 的安全性。为了保证不同的值均匀分布到不同的 Segment,需要通过如下方法计算哈希值。

1
2
3
4
5
6
7
8
9
10
11
12
13
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}

同步方式

Segment 继承自 ReentrantLock,所以我们可以很方便的对每一个 Segment 上锁。

对于读操作,获取 Key 所在的 Segment 时,需要保证可见性。具体实现上可以使用 volatile 关键字,也可使用锁。但使用锁开销太大,而使用 volatile 时每次写操作都会让所有 CPU 内缓存无效,也有一定开销。ConcurrentHashMap 使用如下方法保证可见性,取得最新的 Segment。

1
2
3
4
private Segment<K, V> segmentForHash(int h) {
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
return (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u);
}

对于写操作,并不要求同时获取所有 Segment 的锁,因为那样相当于锁住了整个 Map。它会先获取该 Key-Value 对所在的 Segment 的锁,获取成功后就可以像操作一个普通的 HashMap 一样操作该 Segment,并保证该 Segment 的安全性。
同时由于其它 Segment 的锁并未被获取,因此理论上可支持 concurrencyLevel(等于 Segment 的个数)个线程安全的并发读写。

获取锁时,并不直接使用 lock 来获取,因为该方法获取锁失败时会挂起(参考可重入锁)。事实上,它使用了自旋锁,如果 tryLock 获取锁失败,说明锁被其它线程占用,此时通过循环再次以 tryLock 的方式申请锁。如果在循环过程中该 Key 所对应的链表头被修改,则重置 retry 次数。如果 retry 次数超过一定值,则使用 lock 方法申请锁。

这里使用自旋锁是因为自旋锁的效率比较高,但是它消耗 CPU 资源比较多,因此在自旋次数超过阈值时切换为互斥锁。

size 操作

put、remove 和 get 操作只需要关心一个 Segment,而 size 操作需要遍历所有的 Segment 才能算出整个 Map 的大小。一个简单的方案是,先锁住所有 Sgment,计算完后再解锁。但这样做,在做 size 操作时,不仅无法对 Map 进行写操作,同时也无法进行读操作,不利于对 Map 的并行操作。

为更好支持并发操作,ConcurrentHashMap 会在不上锁的前提逐个 Segment 计算 3 次 size,如果某相邻两次计算获取的所有 Segment 的更新次数(每个 Segment 都与 HashMap 一样通过 modCount 跟踪自己的修改次数,Segment 每修改一次其 modCount 加一)相等,说明这两次计算过程中无更新操作,则这两次计算出的总 size 相等,可直接作为最终结果返回。如果这三次计算过程中 Map 有更新,则对所有 Segment 加锁重新计算 Size。该计算方法代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public int size() {
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}

Java 8 基于 CAS 的 ConcurrentHashMap

数据结构

JDK 1.7 中 ConcurrentHashMap 为实现并行访问,引入了 Segment 这一结构,实现了分段锁,理论上最大并发度与 Segment 个数相等。JDK 1.8 中 ConcurrentHashMap 为进一步提高并发性,摒弃了分段锁的方案,而是直接使用一个大的数组。

JDK 1.8 中 ConcurrentHashMap 参考了 JDK 1.8 HashMap 的实现,采用了数组 + 链表 + 红黑树的实现方式进行数据存储,提高哈希碰撞下的寻址性能,在链表长度超过一定阈值(8)时将链表(寻址时间复杂度为 O(N))转换为红黑树(寻址时间复杂度为 O(log(N)));进一步提高并发性,取消了基于 Segment 的分段锁思想,改用 CAS[2] + synchronized 控制并发操作,在某些方面提升了性能。对于读操作,通过 Key 的哈希值与数组长度取模确定该 Key 在数组中的索引。对于写操作,如果 Key 对应的数组元素为 null,则通过 CAS 操作将其设置为当前值。如果 Key 对应的数组元素不为 null,则对该元素使用 synchronized 关键字申请锁,然后进行操作。

926638-20170809132741792-1171090777

存储结构

Node :Node 是最核心的内部类,它包装了 key-value 键值对,所有插入 ConcurrentHashMap 的数据都包装在这里面。它与 HashMap 中的定义很相似,但是但是有一些差别它对 value 和 next 属性设置了 volatile 同步锁,它不允许调用 setValue 方法直接改变 Node 的 value 域,它增加了 find 方法辅助 map.get() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
// 带有同步锁的 value[value 和 next 都会在扩容时发生变化,所以加上 volatile 来保持可见性和禁止重排序]
volatile V val;
// 带有同步锁的 next 指针
volatile ConcurrentHashMap.Node<K,V> next;

Node(int hash, K key, V val, ConcurrentHashMap.Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}

public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
// 不允许直接改变 value 的值
public final V setValue(V value) {
throw new UnsupportedOperationException();
}

public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}

/**
* Virtualized support for map.get(); overridden in subclasses.
*/
ConcurrentHashMap.Node<K,V> find(int h, Object k) {
ConcurrentHashMap.Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}

TreeNode:树节点类,当链表长度过长的时候,会转换为 TreeNode。但是与 HashMap 不相同的是,它并不是直接转换为红黑树,而是把这些结点包装成 TreeNode 放在 TreeBin 对象中,由 TreeBin 完成对红黑树的包装。而且 TreeNode 在 ConcurrentHashMap 集成自 Node 类,而并非 HashMap 中的集成自 LinkedHashMap.Entry<K,V> 类,也就是说 TreeNode 带有 next 指针,这样做的目的是方便基于 TreeBin 的访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* Nodes for use in TreeBins
*/
static final class TreeNode<K,V> extends ConcurrentHashMap.Node<K,V> {
// 存储当前节点的父节点
ConcurrentHashMap.TreeNode<K,V> parent; // red-black tree links
// 存储当前节点的左孩子
ConcurrentHashMap.TreeNode<K,V> left;
// 存储当前节点的右孩子
ConcurrentHashMap.TreeNode<K,V> right;
// 存储当前节点的前一个节点
ConcurrentHashMap.TreeNode<K,V> prev; // needed to unlink next upon deletion
// 存储当前节点的颜色(红、黑)
boolean red;

TreeNode(int hash, K key, V val, ConcurrentHashMap.Node<K,V> next,
ConcurrentHashMap.TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}

ConcurrentHashMap.Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}

/**
* Returns the TreeNode (or null if not found) for the given key
* starting at given root.
*/
final ConcurrentHashMap.TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
if (k != null) {
ConcurrentHashMap.TreeNode<K,V> p = this;
do {
int ph, dir; K pk; ConcurrentHashMap.TreeNode<K,V> q;
ConcurrentHashMap.TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
}
return null;
}
}

构造方法

concurrencyLevel,能够同时更新 ConccurentHashMap 且不产生锁竞争的最大线程数,在 JDK 1.8 之前实际上就是 ConcurrentHashMap 中的分段锁个数,即 Segment[] 的数组长度。在 JDK 1.8 里,仅仅是为了兼容旧版本而保留,唯一的作用就是保证构造 map 时初始容量不小于 concurrencyLevel。

1
2
3
4
5
6
7
8
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

基本属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// node 数组最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认初始值,必须是 2 的幕数
private static final int DEFAULT_CAPACITY = 16;
// 数组可能最大值,需要与 toArray()相关方法关联
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 并发级别,遗留下来的,为兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 负载因子
private static final float LOAD_FACTOR = 0.75f;
// 链表转红黑树阀值 > 8 链表转换为红黑树
static final int TREEIFY_THRESHOLD = 8;
// 树转链表阀值,小于等于 6(tranfer 时,lc、hc=0 两个计数器分别 ++ 记录原 bin、新 binTreeNode 数量,<=UNTREEIFY_THRESHOLD 则 untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
// 2^15-1,help resize 的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 32-16=16,sizeCtl 中记录 size 大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// forwarding nodes 的 hash 值
static final int MOVED = -1;
// 树根节点的 hash 值
static final int TREEBIN = -2;
// ReservationNode 的 hash 值
static final int RESERVED = -3;
// 可用处理器数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
// 存放 node 的数组
transient volatile Node<K, V>[] table;
/* 控制标识符,用来控制 table 的初始化和扩容的操作,不同的值有不同的含义
* 当为负数时:-1 代表正在初始化,-N 代表有 N-1 个线程正在 进行扩容
* 当为 0 时:代表当时的 table 还没有被初始化
* 当为正数时:表示初始化或者下一次进行扩容的大小
*/
private transient volatile int sizeCtl;

寻址方式

JDK 1.8 的 ConcurrentHashMap 同样是通过 Key 的哈希值与数组长度取模确定该 Key 在数组中的索引。同样为了避免不太好的 Key 的 hashCode 设计,它通过如下方法计算得到 Key 的最终哈希值。不同的是,JDK 1.8 的 ConcurrentHashMap 作者认为引入红黑树后,即使哈希冲突比较严重,寻址效率也足够高,所以作者并未在哈希值的计算上做过多设计,只是将 Key 的 hashCode 值与其高 16 位作异或并保证最高位为 0(从而保证最终结果为正整数)。

1
2
3
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

同步方式

对于写操作,如果 Key 对应的数组元素为 null,则通过 CAS 操作将其设置为当前值。如果 Key 对应的数组元素(也即链表表头或者树的根元素)不为 null,则对该元素使用 synchronized 关键字申请锁,然后进行操作。如果该 put 操作使得当前链表长度超过一定阈值,则将该链表转换为树,从而提高寻址效率。

对于读操作,由于数组被 volatile 关键字修饰,因此不用担心数组的可见性问题。同时每个元素是一个 Node 实例(Java 7 中每个元素是一个 HashEntry),它的 Key 值和 hash 值都由 final 修饰,不可变更,无须关心它们被修改后的可见性问题。而其 Value 及对下一个元素的引用由 volatile 修饰,可见性也有保障。

1
2
3
4
5
6
static class Node<K, V> implements Map.Entry<K, V> {
final int hash;
final K key;
volatile V val;
volatile Node<K, V> next;
}

对于 Key 对应的数组元素的可见性,由 Unsafe 的 getObjectVolatile 方法保证。它是对 tab[i] 进行原子性的读取,因为我们知道 putVal 等对 table 的桶操作是有加锁的,那么一般情况下我们对桶的读也是要加锁的,但是我们这边为什么不需要加锁呢?因为我们用了 Unsafe[3] 的 getObjectVolatile,因为 table 是 volatile 类型,所以对 tab[i] 的原子请求也是可见的。因为如果同步正确的情况下,根据 happens-before 原则,对 volatile 域的写入操作 happens-before 于每一个后续对同一域的读操作。所以不管其他线程对 table 链表或树的修改,都对 get 读取可见。

1
2
3
4
// 获得数组中位置i上的节点
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

size 操作

put 方法和 remove 方法都会通过 addCount 方法维护 Map 的 size。size 方法通过 sumCount 获取由 addCount 方法维护的 Map 的 size。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}

final long sumCount() {
// 变化的数量
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

源码分析 ConcurrentHashMap

initTable()方法

该方法的核心思想就是,只允许一个线程对表进行初始化,如果不巧有其他线程进来了,那么会让其他线程交出 CPU 等待下次系统调度。这样,保证了表同时只会被一个线程初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 如果表为空才进行初始化操作
while ((tab = table) == null || tab.length == 0) {
// sizeCtl 小于零说明已经有线程正在进行初始化操作,当前线程应该放弃 CPU 的使用
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 否则说明还未有线程对表进行初始化,那么本线程就来做这个工作,CAS 方法把 sizectl 置为-1,表示本线程正在进行初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 保险起见,再次判断下表是否为空
try {
if ((tab = table) == null || tab.length == 0) {
// sc 大于零说明容量已经初始化了,否则使用默认容量 16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 根据容量构建数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 将这个数组赋值给 table,table 是 volatile 的
table = tab = nt;
// 计算阈值,等效于 n*0.75
sc = n - (n >>> 2);
}
} finally {
// 设置阈值
sizeCtl = sc;
}
break;
}
}
return tab;
}

get(Object key)方法

get 方法比较简单,给定一个 key 来确定 value 的时候,必须满足两个条件 key 相同 、hash 值相同,对于节点可能在链表或树上的情况,需要分别去查找。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算 hash 值
int h = spread(key.hashCode());
// 根据 hash 值确定节点位置
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果搜索到的节点 key 与传入的 key 相同且不为 null, 直接返回这个节点
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 如果 eh<0 说明这个节点在树上,调用树的 find 方法寻找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 否则遍历链表 找到对应的值并返回
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

ConcurrentHashMap 的 get 操作的流程很简单,可以分为三个步骤来描述:

  • 计算 hash 值,定位到该 table 索引位置,如果是首节点符合就返回
  • 如果遇到扩容的时候,会调用标志正在扩容节点 ForwardingNode 的 find 方法,查找该节点,匹配就返回
  • 以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回 null

put(K key, V value)方法

假设 table 已经初始化完成,put 操作采用 CAS+synchronized 实现并发插入或更新操作,具体实现如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public V put(K key, V value) {
return putVal(key, value, false);
}

/**
* Implementation for put and putIfAbsent
*/
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 不允许 key 或 value 为 null
if (key == null || value == null) throw new NullPointerException();
// 计算 hash 值,两次 hash,减少 hash 冲突,可以均匀分布
int hash = spread(key.hashCode());
int binCount = 0;
// 死循环 何时插入成功 何时跳出;因为如果其他线程正在修改 tab,那么尝试就会失败,所以这边要加一个 for 循环,不断的尝试
for (Node<K, V>[] tab = table; ; ) {
Node<K, V> f;
int n, i, fh;
// 如果 table 为空的话,初始化 table,属于懒汉模式初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 根据 hash 值计算出在 table 里面的位置
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果这个位置没有值,通过 CAS 操作将其设置为当前值,不需要加锁
if (casTabAt(tab, i, null,
new Node<K, V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 当遇到表连接点时,需要进行整合表的操作
else if ((fh = f.hash) == MOVED)
// 由于检测到当前哈希表正在扩容,于是让当前线程去协助扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 这个地方设计非常的巧妙,内置锁 synchronized 锁住了 f,因为 f 是指定特定的tab[i]的;结点上锁,这里的结点可以理解为 hash 值相同组成的链表的头结点
synchronized (f) {
if (tabAt(tab, i) == f) {
// fh >= 0 说明这个节点是一个链表的节点 不是树的节点
if (fh >= 0) {
binCount = 1;
// 在这里遍历链表所有的结点
for (Node<K, V> e = f; ; ++binCount) {
K ek;
// 如果 hash 值和 key 值相同 则修改对应结点的 value 值[put 操作和 putIfAbsent 操作业务实现]
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K, V> pred = e;
// 如果遍历到了最后一个结点,那么就证明新的节点需要插入,使用尾插法把它插入在链表尾部
if ((e = e.next) == null) {
pred.next = new Node<K, V>(hash, key,
value, null);
break;
}
}
}
// 如果这个节点是树节点,就按照树的方式插入值
else if (f instanceof TreeBin) {
Node<K, V> p;
binCount = 2;
if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果链表长度已经达到临界值 8 就需要把链表转换为树结构
if (binCount >= TREEIFY_THRESHOLD)
// 转化为红黑树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 将当前 ConcurrentHashMap 的元素数量 + 1,CAS 式更新 baseCount,并判断是否需要扩容
addCount(1L, binCount);
return null;
}

ConcurrentHashMap 的 put 操作的流程就是对当前的 table 进行无条件自循环直到 put 成功,可以分成以下七步流程来概述:

  • 如果为 null 直接抛空指针异常
  • 如果没有初始化就先调用 initTable() 方法来进行初始化过程
  • 如果没有 hash 冲突就直接 casTabAt() 无锁插入
  • 如果还在进行扩容操作就 helpTransfer() 帮助其扩容
  • 如果存在 hash 冲突,就加锁来保证线程安全,这里有两种情况,一种是链表形式就直接遍历到尾端插入,一种是红黑树就按照红黑树结构插入,
  • 如果该链表的数量大于阈值 8,就要先treeifyBin()转换成黑红树的结构[若table length < 64,直接 tryPresize,两倍table.length,不转树],break 跳出循环
  • 如果添加成功就调用 addCount() 方法统计 size,并且检查是否需要扩容

总结

ConcurrentHashMap 与 HashMap 的区别

  • 线程安全性不同:ConcurrentHashMap 线程安全,而 HashMap 非线程安全
  • key 和 value 是否允许 null 值:HashMap 允许 Key 和 Value 为 null,而 ConcurrentHashMap 不允许 Key 和 Value 为 null
  • 是否允许遍历时修改集合:HashMap 不允许通过 Iterator 遍历的同时通过 HashMap 修改,而 ConcurrentHashMap 允许该行为,并且该更新对后续的遍历可见。

JDK 1.7 中 ConcurrentHashMap 和 JDK 1.8 中 ConcurrentHashMap 的实现区別?

其实可以看出 JDK 1.8 版本的 ConcurrentHashMap 的数据结构已经接近 HashMap,相对而言,ConcurrentHashMap 只是增加了同步的操作来控制并发,从 JDK 1.7 版本的 ReentrantLock + Segment + HashEntry,到 JDK 1.8 版本中 synchronized + CAS + HashEntry + 红黑树,相对而言,JDK 1.7 中 ConcurrentHashMap 和 JDK 1.8 中 ConcurrentHashMap 的实现区別总结如下:

  • 底层数据结构不同:JDK 1.7 中 ConcurrentHashMap 采用 Segment 分段锁的数据结构,JDK 1.8 中 ConcurrentHashMap 采用数组 + 链表 + 红黑树的数据结构。
  • 保证线程安全机制不同:JDK 1.7 中 ConcurrentHashMap 采用 segment 的分段锁机制实现线程安全,其中 segment 继承自 ReentrantLock。JDK 1.8 中 ConcurrentHashMap 采用 CAS+Synchronized 机制实现线程安全。
  • 锁的粒度不同:JDK 1.7 中 ConcurrentHashMap 是对需要进行数据操作的 Segment 加锁,JDK 1.8 中 ConcurrentHashMap 是对每个数组元素加锁(Node)。
  • 查询的时间复杂度不同:定位结点的 hash 算法简化会带来弊端,Hash 冲突加剧,因此在链表节点数量大于 8 时,会将链表转化为红黑树进行存储。查询元素的时间复杂度从原来的遍历链表 O(n),变成遍历红黑树 O(logN)。

参考博文

[1]. Java 进阶(六)从 ConcurrentHashMap 的演进看 Java 多线程核心技术
[2]. ConcurrentHashMap 原理分析(1.7 与 1.8)
[3]. 为并发而生的 ConcurrentHashMap(Java 8)


注脚

[1]. 分离锁(lock striping):分拆锁 (lock spliting) 就是若原先的程序中多处逻辑都采用同一个锁,但各个逻辑之间又相互独立,就可以拆 (Spliting) 为使用多个锁,每个锁守护不同的逻辑。分拆锁有时候可以被扩展,分成可大可小加锁块的集合,并且它们归属于相互独立的对象,这样的情况就是分离锁(lock striping)。(摘自《Java 并发编程实践》)

[2]. CAS:CAS 是 compare and swap 的缩写,即我们所说的比较并替换,是用于实现多线程同步的原子指令。CAS 是一种基于锁的操作,是乐观锁。
在 Java 中锁分为乐观锁和悲观锁。悲观锁是将资源锁住,线程一旦得到锁,会导致其它所有需要锁的线程挂起,等一个之前获得锁的线程释放锁之后,下一个线程才可以访问。而乐观锁采取了一种宽泛的态度,通过某种方式不加锁来处理资源,比如通过给记录加 version 来获取数据,性能较悲观锁有很大的提高。CAS 操作包含三个基本操作数 —— 内存位置(V)、预期原值(A)和新值 (B)。更新一个变量的时候,只有当变量的预期原值(A)和内存位置(V)当中的实际值相同时,才会将内存位置(V)对应的值修改为新值 (B)。CAS 是通过无限循环来获取数据的,若果在第一轮循环中,a 线程获取地址里面的值被 b 线程修改了,那么 a 线程需要自旋,到下次循环才有可能机会执行。CAS 可以有效的提升并发的效率,但同时也会引入 ABA 问题。如线程 1 从内存位置(V)中取出预期原值(A),这时候另一个线程 2 也从内存位置(V)中取出预期原值(A),并且线程 2 进行了一些操作将内存位置(V)中的值变成了新值 (B),然后线程 2 又将内存位置(V)中的数据变成预期原值(A),这时候线程 1 进行 CAS 操作发现内存位置(V)中仍然是预期原值(A),然后线程 1 操作成功。虽然线程 1 的 CAS 操作成功,但是整个过程就是有问题的。比如链表的头在变化了两次后恢复了原值,但是不代表链表就没有变化。解决方式,在对象中额外再增加一个标记来标识对象是否有过变更【AtomicMarkableReference(通过引入一个 boolean 来反映中间有没有变过)、AtomicStampedReference(通过引入一个 int 来累加来反映中间有没有变过)】。

[3]. Unsafe:Unsafe 是 Java 留给开发者的后门,用于直接操作系统内存且不受 Jvm 管辖,实现类似 C++ 风格的操作。Java 不能直接访问操作系统底层,而是通过本地方法来访问,Unsafe 类提供了硬件级别的原子操作。Oracle 官方一般不建议开发者使用 Unsafe 类,因为正如这个类的类名一样,它并不安全,使用不当会造成内存泄露。Unsafe 类在 sun.misc 包下,不属于 Java 标准。很多 Java 的基础类库,包括一些被广泛使用的高性能开发库都是基于 Unsafe 类开发,比如 Netty、Hadoop、Kafka 等。

谢谢你长得那么好看,还打赏我!😘
0%