ConcurrentHashMap

背景:在多线程情况下,实用HashMap是线程不安全的。线程安全可以使用Hashtable,但是Hashtable的运行效率很低,之所以效率低下主要是因为其实现实用了synchronized关键字对put等操作进行了加锁,而synchronized关键字是对整个对象进行加锁,也就是说在进行put等修改Hash表的操作是,锁住了整个Hash表,从而使得其表现效率低下。所以最终诞生了ConcurrentHashMap

 

锁分段技术:HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争同一把锁。假如容器里有多把锁,每一把锁用于锁容器的一部分数据,那么当多线程访问容器里不同数据段数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个数据段的时候,其他段的诗句也能被其他线程访问

 

ConcurrentHashMap-JDK1.7

在JDK1.5-1.7版本,Java使用了分段锁机制实现了ConcurrentHashMap

简而言之,ConcurrentHashMap在对象中保存了一个Segment数组,即将整个Hash表划分为多个字段。而每个Segment元素,即每个分段则类似于一个Hashtable。这样,在执行put操作时会首先根据hash算法定位到元素属于哪个Segment,然后对该Segment加锁即可。因此ConcurrentHashMap在多线程并发变成中可以实现多线程put操作。

数据结构

整个ConcurrentHashMap由一个个Segment组成,Segment代表“部分”或“一段”的意思,所以很多地方都会将其描述为分段锁。简单理解就是,ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁,在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。结构图如下:

 

初始化

初始化方法是通过initialCapacity、loadFactor和concurrencyLevel等几个参数来初始化segment数组、段偏移量segmentShift、段掩码segmentMask和每个segment里的HashEntry数组来实现的。

  • concurrencyLevel:并行级别、并发数、segment数,怎么翻译不重要。默认值是16,也就是说ConcurrentHashMap有16个Segments,所以理论上,这个时候,最多支持同时有16个线程并发写,只要他们操作分别分布在不同的segment上。这个值可以在初始化的时候设置为其他值,但是一旦初始化之后,他是不可以扩容的。
  • initialCapacity:初始容量。这个值指的是整个ConcurrentHashMap的初始容量,实际操作的时候需要平均分给每个segment。
  • loadFactor:Segment数组不可以扩容,所以这个负载因子是给每个segment内部使用的。

 

初始化完成,我们得到了一个Segment数组
我们就当是用new ConcurrentHashMap()无参构造函数进行初始化的,那么初始化完成后:
  • Segment数组长度为16,不可以扩容
  • Segment[i]的默认大小为2,负载因子是0.75,得出值为1.5,取整后初始阈值为1,也就是当插入第一个元素不会触发扩容,插入第二个元素会进行扩容
  • 这里初始化了segment[0],其他位置还是null。初始化segment[0]是因为防止多个线程重复创建和数据错乱,保证线程安全
  • 当前segmentshift的值为 32 – 4 = 28,segmentMask为 16 – 1 = 15,姑且把他们简单翻译为移位数和掩码,这两个值主要作用是定位Segment。

 

定位Segment

根据前面的数据结构介绍可知,数据都是收到Segment分段锁保护的,所以插入或者获取元素的时候,必须要先定位到Segment。可以看到ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再散列。

 
private static int hash(int h) {​
h += (h << 15) ^ 0xffffcd7d;​
h ^= (h >>> 10);​
h += (h << 3);​
h ^= (h >>> 6);​
h += (h << 2) + (h << 14);​
return h ^ (h >>> 16);​
}

之所以要进行再扩列,目的是减少散列冲突,是元素能够均匀地分布在不同的Segment上,从而提高容器的存取效率

ConcurrentHashMap通过以下散列算法定位segment

final Segment<K,V> segmentFor(int hash) {​
return segments[(hash >>> segmentShift) & segmentMask];​
}

 

ConcurrentHashMap的操作

get操作

Segment的get操作实现非常简单和高效。先经过一次再散列,然后使用这个散列值通过散列运算定位到segment,再通过散列算法定位到元素。

public V get(Object key) {​
int hash = hash(key.hashCode());​
return segmentFor(hash).get(key, hash);​
}

get操作的高效之处在于整个get过程不需要加锁。我们直到HashTable容器的get方法是需要加锁的。那么ConcurrentHashMap的get操作是如何做到不加锁也不出问题的呢?原因是他的get方法里将要使用的共享变量都定义成volatile类型,如用于统计当前Segment大小的count字段和用于存储值的HashEntry的value

 transient volatile int count;​
volatile V value;

在定位元素的代码里我们可以发现,定位HashEntry和定位Segment的散列算法虽然一样,都与数组的长度-1再像“与”,但是相“与”的值不一样,定位Segment使用的是元素的hashcode通过再散列后得到的值的高位,而定位HashEntry直接使用的是再散列后的值。

 

 (hash >>> segmentShift) & segmentMask // 定位Segment所使用的hash算法 ​
int index = hash & (tab.length - 1); // 定位HashEntry所使用的hash算法

put操作

由于put方法里需要对共享变量进行写入操作,所以为了线程安全,再操作共享变量时必须加锁。put方法首先定位到Segment,然后在Segment里进行插入操作。插入操作需要经历两个步骤,第一步判断是否需要对Segment里的HashEntry数组进行扩容,第二步定位添加元素的位置,然后将其放在HashEntry数组里。

是否需要扩容:在插入元素前会先判断Segment里的HashEntry数组是否超过容量,如果超过阈值,则对数组进行扩容。也就是说,对比于HashMap,HashMap是插入元素后判断元素是否已经到达容,到了就进行扩容,而ConcurrentHashMap则是插入元素前先判断当前数组是否超过阈值,超过则扩容

如何扩容:在扩容的时候,首先会创建一个容量是原来容量两倍的数组,然后将原数组的元素进行再散列后插入到新的数组里,为了高效,ConcurrentHashMap不会对整个容器进行扩容,只会对某个segment进行扩容。

size操作

如果要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小后求和,Segement里的全局变量count是一个volatile变量,那么再多线程场景下,是不是直接把所有Segment的count相加就可以得到整个ConcurrentHashMap大小了呢?不是!虽然相加时可以获取每个Segment的count的最新值,但是可能累加前使用的count在累加过程发生变化,那么统计结果就不再准确。所以最安全的做法是在统计size的时候把所有segment的put、remove、clean方法全部锁住,但这种做法显然非常低效。

因为在累加count操作过程中,之前累加过的count发生变化的几率非常小,所以ConcurrentHashMap的做法是先尝试三次通过不锁住Segmen的方法来统计各个Segment大小,如果统计过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小

那么ConcurrentHashMap是如何判断在统计的时候容器是否发生了变化呢?使用modCount变量,在put、remove、clean方法里操作元素前都会将变量modCount进行加1,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化。

 

ConcurrentHashMap-JDK1.8

简介

在JDK1.7之前,ConcurrentHashMap是通过分段锁机制来实现的,所以其最大并发度受Segment的个数限制。因此,在JDK1.8中,ConcurrentHashMap的实现原理摒弃了这种设计,而是选择了与HashMap类似的数组+链表+红黑树的方式实现,而加锁则采用CAS的synchronized实现。

数据结构:结构上和Java8的HashMap基本上一样,不过要保证线程安全性,所以在源码上确实要更复杂。

图中的节点有三种类型

  • 第一种是,空着的位置表示当前还没有元素来填充。
  • 第二种是和HashMap非常类似的拉链法结构,在每一个槽中会首先填入第一个节点,但是后续如果计算出相同的Hash值们就用链表的形式往后进行延伸
  • 第三种是红黑树结构。

当第二种情况的链表长度达到某一个阈值(默认为8),且同时容量小于等于64(否则有限扩容数组)的时候,ConcurrentHashMap便会把这个链表从链表形式转化成红黑树的形式,目的是进一步提高他的查找性能

源码分析

Node节点:

 static class Node<K,V> implements Map.Entry<K,V> {​
​
final int hash;​
​
final K key;​
​
volatile V val;​
​
volatile Node<K,V> next;​
​
// ...​
​
}

可以看出,每个Node里面是key-value的形式,并且把value用volatile修饰,以便保证可见性,同事内部还有一个指向下一个节点的next指针

put方法

put方法的核心是putVal方法,这个方法需要逐步阅读,所以把一些重要步骤的解读直接作为主食放入了源码中

 final V putVal(K key, V value, boolean onlyIfAbsent) {​
​
if (key == null || value == null) {​
throw new NullPointerException();​
}​
​
//计算 hash 值​
int hash = spread(key.hashCode());​
int binCount = 0;​
​
for (Node<K, V>[] tab = table; ; ) {​
Node<K, V> f;​
int n, i, fh;​
​
//如果数组是空的,就进行初始化​
if (tab == null || (n = tab.length) == 0) {​
tab = initTable();​
}​
​
// 找该 hash 值对应的数组下标​
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {​
//如果该位置是空的,就用 CAS 的方式放入新值​
if (casTabAt(tab, i, null,​
new Node<K, V>(hash, key, value, null))) {​
break;​
}​
}​
​
//hash值等于 MOVED 代表在扩容​
else if ((fh = f.hash) == MOVED) {​
tab = helpTransfer(tab, f);​
}​
​
//槽点上是有值的情况​
else {​
V oldVal = null;​
//用 synchronized 锁住当前槽点,保证并发安全​
synchronized (f) {​
if (tabAt(tab, i) == f) {​
//如果是链表的形式​
if (fh >= 0) {​
binCount = 1;​
//遍历链表​
for (Node<K, V> e = f; ; ++binCount) {​
K ek;​
return null;​
}

总结:方法中会逐步根据当前槽点是未初始化、空、扩容、链表、红黑树等不同情况做出不同的处理

get方法

 public V get(Object key) {​
​
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;​
//计算 hash 值​
int h = spread(key.hashCode());​
//如果整个数组是空的,或者当前槽点的数据是空的,说明 key 对应的 value 不存在,直接返回 null​
if ((tab = table) != null && (n = tab.length) > 0 &&​
(e = tabAt(tab, (n - 1) & h)) != null) {​
//判断头结点是否就是我们需要的节点,如果是则直接返回​
if ((eh = e.hash) == h) {​
if ((ek = e.key) == key || (ek != null && key.equals(ek)))​
return e.val;​
}​
​
//如果头结点 hash 值小于 0,说明是红黑树或者正在扩容,就用对应的 find 方法来查找​
else if (eh < 0)​
return (p = e.find(h, key)) != null ? p.val : null;​
//遍历链表来查找​
while ((e = e.next) != null) {​
if (e.hash == h &&​
((ek = e.key) == key || (ek != null && key.equals(ek))))​
return e.val;​
}​
}​
return null;​
}

总结一下get的过程:

  1. 计算Hash
  2. 值,并由此值找到对应的槽点
  3. 如果数组是空的或者该位置为null,那么直接返回null
  4. 如果该位置处的节点刚好是我们需要的,直接返回该节点的值
  5. 如果该位置节点是红黑树或者正在扩容,那就用find方法继续查找
  6. 如果是链表,就进行遍历链表查找
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇