有的时候博客内容会有变动,首发博客是最新的,其他博客地址可能会未同步,认准
https://blog.zysicyj.top
可点击链接
https://blog-1253652709.cos.ap-guangzhou.myqcloud.com//picgo/202401180921373.png解答疑问
HashMap比较普遍的实现方式是“数组+链表”,这种方式被称为“拉链法”。ConcurrentHashMap在这个基本原理之上进行了各种优化,因为JDK7和JDK8中实现区别较大,我们分开讨论。
JDK7中的实现方式
为了提高并发度,在JDK7中,一个HashMap被拆分为多个子HashMap。每一个子HashMap被称为一个Segment,多个线程操作多个Segment相互独立。
具体来说,每个segment都继承自ReentranLock,Segment的数量等于锁的数量,这些锁彼此之间都是独立的,即所谓的“分段锁”
/**
* 这里表示由多个segment组成
*/
final Segment<K,V>[] segments;
//继承自ReentrantLock,关于ReentrantLock后面细聊
static final class Segment<K,V> extends ReentrantLock implements Serializable {
构造函数分析
/**
* 创建一个具有指定初始容量、负载因子和并发级别的新的空 ConcurrentHashMap。
*
* @param initialCapacity 初始容量。实现会进行内部调整以容纳这么多元素。
* 用initialCapacity除以ssize就是每个segment的初始大小(会始终保证是2的幂次)
* @param loadFactor 负载因子阈值,用于控制何时进行调整。当每个桶的平均元素数超过此阈值时,可能会进行调整。
* 要注意的是segment的个数是不能扩容的,但是每个segment的内部可以扩展
* @param concurrencyLevel 预估的并发更新线程数。实现会进行内部调整以尽量容纳这么多线程。
* 这个值就是segment的数量,简单理解就是控制了并发度,该值在初始化之后就不能更改了
* 默认的segment值是16(会始终保证是2的幂次)
* @throws IllegalArgumentException 如果初始容量为负数或负载因子或并发级别为非正值。
*/
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
// 检查负载因子是否大于0,初始容量是否为非负值,并发级别是否为正值
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) {
throw new IllegalArgumentException();
}
// 如果并发级别大于最大段数,则将并发级别设为最大段数
if (concurrencyLevel > MAX_SEGMENTS) {
concurrencyLevel = MAX_SEGMENTS;
}
// 计算适合给定并发级别的2的幂大小
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1; // 等效于 ssize = ssize * 2;
}
// 设置段的移位值和掩码
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
// 如果初始容量超过最大容量,则设置为最大容量
if (initialCapacity > MAXIMUM_CAPACITY) {
initialCapacity = MAXIMUM_CAPACITY;
}
// 计算每段的初始容量
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity) {
++c; // 调整容量以确保能够容纳所有元素
}
// 找到大于等于c的最小2的幂值
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c) {
cap <<= 1; // 等效于 cap = cap * 2;
}
// 创建段和段表的第一个段
Segment<K, V> s0 = new Segment<K, V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K, V>[]) new HashEntry[cap]);
Segment<K, V>[] ss = (Segment<K, V>[]) new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // 有序写入段表的第一个段
this.segments = ss;
}
详细说明
参数检查与异常抛出
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0):- 检查
loadFactor是否大于0。 - 检查
initialCapacity是否为非负值。 - 检查
concurrencyLevel是否为正值。 - 如果任意一个条件不满足,抛出
IllegalArgumentException。
- 检查
调整并发级别
if (concurrencyLevel > MAX_SEGMENTS):- 如果并发级别大于最大段数
MAX_SEGMENTS,将并发级别设置为MAX_SEGMENTS。
- 如果并发级别大于最大段数
计算段表大小和移位值
while (ssize < concurrencyLevel):- 通过循环计算最接近且不小于
concurrencyLevel的2的幂值ssize。 sshift用于记录移位操作的次数。
- 通过循环计算最接近且不小于
设置段移位值和掩码
this.segmentShift = 32 - sshift:- 设置段移位值,用于计算哈希值对应的段。
this.segmentMask = ssize - 1:- 设置段掩码,用于哈希值和段掩码进行位与操作得到段索引。
调整初始容量
if (initialCapacity > MAXIMUM_CAPACITY):- 如果初始容量大于最大容量
MAXIMUM_CAPACITY,将初始容量设置为MAXIMUM_CAPACITY。
- 如果初始容量大于最大容量
计算每段的初始容量
int c = initialCapacity / ssize:- 计算每段的初始容量
c。
- 计算每段的初始容量
if (c * ssize < initialCapacity):- 如果调整后的容量小于初始容量,增加
c以确保能够容纳所有元素。
- 如果调整后的容量小于初始容量,增加
找到大于等于
c的最小2的幂值int cap = MIN_SEGMENT_TABLE_CAPACITY:- 设置初始容量为最小段表容量
MIN_SEGMENT_TABLE_CAPACITY。
- 设置初始容量为最小段表容量
while (cap < c):- 通过循环找到大于等于
c的最小2的幂值cap。
- 通过循环找到大于等于
创建段和段表
Segment<K, V> s0 = new Segment<K, V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K, V>[]) new HashEntry[cap]):- 创建段
s0,并初始化段表的第一个段。
- 创建段
Segment<K, V>[] ss = (Segment<K, V>[]) new Segment[ssize]:- 创建段表
ss,段表大小为ssize。
- 创建段表
UNSAFE.putOrderedObject(ss, SBASE, s0):- 有序写入段表的第一个段
s0。
- 有序写入段表的第一个段
赋值段表
this.segments = ss:- 将创建的段表赋值给
segments。
- 将创建的段表赋值给
这个构造函数通过对初始容量、负载因子和并发级别进行调整和计算,确保 ConcurrentHashMap 能够高效地处理并发操作,并适应给定的初始参数。
put函数分析
这个方法用于将指定的键映射到指定的值,并返回与键关联的先前值(如果存在的话)。如果键或值为 null,将抛出 NullPointerException。
/**
* 将指定的键映射到此表中的指定值。键和值均不能为 null。
*
* <p> 可以通过调用带有与原始键相等的键的 <tt>get</tt> 方法来检索该值。
*
* @param key 与指定值关联的键
* @param value 与指定键关联的值
* @return 与 <tt>key</tt> 关联的先前值,如果没有针对 <tt>key</tt> 的映射,则返回 <tt>null</tt>
* @throws NullPointerException 如果指定的键或值为 null
*/
@SuppressWarnings("unchecked")
public V put(K key, V value) {
Segment<K, V> s;
// 检查值是否为 null,如果是,则抛出 NullPointerException
if (value == null) {
throw new NullPointerException();
}
// 计算键的哈希值
int hash = hash(key);
// 根据哈希值计算段索引
int j = (hash >>> segmentShift) & segmentMask;
// 尝试获取指定段。如果段为空,则调用 ensureSegment 创建段
if ((s = (Segment<K, V>) UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null) {
s = ensureSegment(j);
}
// 调用段的 put 方法,将键值对插入段中,并返回先前与键关联的值
return s.put(key, hash, value, false);
}
详细说明
参数检查
if (value == null):- 检查
value是否为 null。如果是,则抛出NullPointerException。
- 检查
计算哈希值
int hash = hash(key):- 计算键的哈希值。哈希函数
hash将键转换为整数哈希值。
- 计算键的哈希值。哈希函数
计算段索引
int j = (hash >>> segmentShift) & segmentMask:- 根据哈希值计算段索引。
hash >>> segmentShift将哈希值右移segmentShift位,然后与segmentMask进行按位与操作得到段索引。
- 根据哈希值计算段索引。
获取指定段
if ((s = (Segment<K, V>) UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null):- 使用
UNSAFE.getObject非易失性地尝试获取指定段。如果段为空,则调用ensureSegment方法创建段。 (j << SSHIFT) + SBASE计算段在segments数组中的偏移量。
- 使用
确保段存在
s = ensureSegment(j):- 如果段为空,调用
ensureSegment(j)方法确保段存在并返回创建的段。
- 如果段为空,调用
调用段的
put方法return s.put(key, hash, value, false):- 调用段的
put方法将键值对插入段中,并返回先前与键关联的值。
- 调用段的
辅助方法
hash 方法
该方法用于计算键的哈希值。在 ConcurrentHashMap 中,哈希函数通常会进行一些额外的混淆操作,以减少哈希冲突。
ensureSegment 方法
该方法用于确保段在段表中存在。如果指定索引的段为空,该方法将创建一个新的段并将其插入段表中。
总结
这个 put 方法通过计算键的哈希值、定位段索引、确保段存在,然后将键值对插入相应的段中,从而实现了键值对的安全并发插入。段的使用使得多个线程可以同时操作不同的段,从而提高了并发性能。
在上面的代码中没有加锁操作,因为锁是加在s.put内部的,也就是分段加锁。另外,多个线程可能同时调用ensureSegment对Segment[j]进行初始化,在这个函数的内部要避免重复初始化。
下面是对 ensureSegment 方法的详细说明及详细注释。这个方法用于确保在给定索引处的段存在。如果该段不存在,则创建它并通过 CAS 操作记录到段表中。
/**
* 返回给定索引的段,如果不存在则创建它并通过 CAS 操作记录到段表中。
*
* @param k 段的索引
* @return 对应索引的段
*/
@SuppressWarnings("unchecked")
private Segment<K, V> ensureSegment(int k) {
final Segment<K, V>[] ss = this.segments; // 获取当前的段表
long u = (k << SSHIFT) + SBASE; // 计算段在段表中的原始偏移量
Segment<K, V> seg;
// 尝试获取段,使用 volatile 读取确保可见性
if ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K, V> proto = ss[0]; // 使用段表的第一个段作为原型
int cap = proto.table.length; // 获取原型段的表大小
float lf = proto.loadFactor; // 获取原型段的负载因子
int threshold = (int) (cap * lf); // 计算扩容阈值
HashEntry<K, V>[] tab = (HashEntry<K, V>[]) new HashEntry[cap]; // 创建新的哈希表
// 再次检查段是否已经被其他线程创建
if ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K, V> s = new Segment<K, V>(lf, threshold, tab); // 创建新的段
while ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) {
// 使用 CAS 操作将新的段设置到段表中
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) {
break; // 成功设置段后退出循环
}
}
}
}
return seg; // 返回段
}
详细说明
获取当前的段表
final Segment<K, V>[] ss = this.segments;:- 获取当前的段表
segments。
- 获取当前的段表
计算段在段表中的原始偏移量
long u = (k << SSHIFT) + SBASE;:- 根据段的索引
k计算其在段表中的原始偏移量。SSHIFT和SBASE是常量,用于定位段在内存中的位置。
- 根据段的索引
尝试获取段
if ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null):- 使用
UNSAFE.getObjectVolatile方法尝试获取段,确保读取操作的可见性。如果段不存在,则继续进行段的创建。
- 使用
使用段表的第一个段作为原型
Segment<K, V> proto = ss[0];:- 获取段表的第一个段
proto作为原型,便于后续创建新的段。
- 获取段表的第一个段
获取原型段的表大小和负载因子
int cap = proto.table.length;:- 获取原型段的哈希表大小
cap。
- 获取原型段的哈希表大小
float lf = proto.loadFactor;:- 获取原型段的负载因子
lf。
- 获取原型段的负载因子
计算扩容阈值
int threshold = (int) (cap * lf);:- 计算扩容阈值
threshold。
- 计算扩容阈值
创建新的哈希表
HashEntry<K, V>[] tab = (HashEntry<K, V>[]) new HashEntry[cap];:- 创建新的哈希表
tab。
- 创建新的哈希表
再次检查段是否已经被其他线程创建
if ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null):- 再次使用
UNSAFE.getObjectVolatile检查段是否已经被其他线程创建。如果仍然为null,则继续创建段。
- 再次使用
创建新的段
Segment<K, V> s = new Segment<K, V>(lf, threshold, tab);:- 创建新的段
s,使用计算好的负载因子、阈值和哈希表。
- 创建新的段
使用 CAS 操作设置段
while ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null):- 在循环中再次检查段是否存在。如果仍然不存在,使用 CAS 操作将新的段设置到段表中。
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)):- 使用
UNSAFE.compareAndSwapObject方法进行 CAS 操作。如果成功设置段,则退出循环。
- 使用
返回段
return seg;:- 返回确保存在的段。
辅助方法
UNSAFE.getObjectVolatile
该方法用于以 volatile 方式读取对象,确保读取操作的可见性。
UNSAFE.compareAndSwapObject
该方法用于执行 CAS 操作,确保线程安全地设置段。
总结
这个 ensureSegment 方法通过确保段在段表中存在并且使用 CAS 操作进行段的创建和设置,从而实现了线程安全的段创建。这样可以确保多个线程可以安全地操作 ConcurrentHashMap,并且在需要时动态地扩展段表。
上面这个函数的目的是当segments[k]=null时对其进行初始化。由于多个线程可能同时调用该函数,UNSAFE.getObjectVolatile(ss,u)==null 出现了3次,在3个不同的时间点重复检查segments[k]==null。但即使如此,也不能完全保证避免重复初始化,所以最后需要执行一个CAS操作UNSAFE.compareAndSwapObject(ss,u,null,seg=s)保证只被初始化一次。保险起见,把这次CAS操作放在一个while循环里,保证出来的时候segments[k]一定不为空。
segments[j]被成功地初始化了,下面进入内部,了解如何把元素放进去。
下面是对 put 方法的详细说明及详细注释。这个方法用于将指定的键映射到指定的值,并返回与键关联的先前值(如果存在的话)。如果键或值为 null,将抛出 NullPointerException。
/**
* 将指定的键映射到此表中的指定值。键和值均不能为 null。
*
* @param key 键,与指定值关联
* @param hash 键的哈希值
* @param value 值,与指定键关联
* @param onlyIfAbsent 如果为 true,则仅当键尚未关联值时才插入
* @return 与键关联的先前值,如果没有,则返回 null
*/
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 尝试获取锁,如果未能获取锁,则调用 scanAndLockForPut 方法获取锁
HashEntry<K, V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
// 获取当前段的哈希表
HashEntry<K, V>[] tab = table;
// 计算键在哈希表中的索引
int index = (tab.length - 1) & hash;
// 获取索引处的第一个条目
HashEntry<K, V> first = entryAt(tab, index);
// 遍历链表以查找键
for (HashEntry<K, V> e = first;;) {
if (e != null) {
K k;
// 检查当前条目是否与给定键匹配
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
oldValue = e.value; // 保存先前的值
if (!onlyIfAbsent) {
e.value = value; // 更新值
++modCount; // 修改计数加一
}
break;
}
e = e.next; // 移动到下一个条目
} else {
// 如果节点为空,则插入新条目
if (node != null) {
node.setNext(first); // 将新条目链接到现有链表的头部
} else {
node = new HashEntry<K, V>(hash, key, value, first); // 创建新的哈希条目
}
int c = count + 1; // 更新计数
if (c > threshold && tab.length < MAXIMUM_CAPACITY) {
rehash(node); // 需要扩容并重新哈希
} else {
setEntryAt(tab, index, node); // 将新条目设置到哈希表中
}
++modCount; // 修改计数加一
count = c; // 更新当前段中的条目数
oldValue = null; // 当前没有先前值
break;
}
}
} finally {
// 确保释放锁
unlock();
}
return oldValue; // 返回先前的值
}
详细说明
获取锁
HashEntry<K, V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);:- 尝试获取段的锁。如果获取失败,则调用
scanAndLockForPut方法获取锁。
- 尝试获取段的锁。如果获取失败,则调用
获取当前段的哈希表
HashEntry<K, V>[] tab = table;:- 获取当前段的哈希表
table。
- 获取当前段的哈希表
计算键在哈希表中的索引
int index = (tab.length - 1) & hash;:- 计算键的哈希值在哈希表中的索引。
获取索引处的第一个条目
HashEntry<K, V> first = entryAt(tab, index);:- 获取索引
index处的第一个条目。
- 获取索引
遍历链表查找键
for (HashEntry<K, V> e = first;;) { ... }:- 遍历链表以查找是否存在给定键的条目。
if (e != null) { ... } else { ... }:- 如果当前条目不为空,则检查键是否匹配。
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { ... }:- 如果找到匹配的键,则更新值并返回先前的值。
e = e.next;:- 移动到下一个条目。
else { ... }:- 如果当前条目为空,则创建新条目并插入到链表中。
if (node != null) node.setNext(first);:- 如果
node不为空,将其链接到现有链表的头部。
- 如果
else node = new HashEntry<K, V>(hash, key, value, first);:- 否则创建新的哈希条目。
int c = count + 1;:- 更新段中的条目计数。
if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node);:- 如果条目数超过阈值且哈希表未达到最大容量,则需要扩容并重新哈希。
else setEntryAt(tab, index, node);:- 将新条目设置到哈希表中。
++modCount;:- 修改计数加一。
count = c;:- 更新当前段中的条目数。
oldValue = null;:- 当前没有先前值。
释放锁
finally { unlock(); }:- 确保在操作完成后释放锁。
返回先前的值
return oldValue;:- 返回与键关联的先前值(如果有)。
辅助方法
tryLock 方法
该方法尝试获取段的锁。如果成功,则返回 true,否则返回 false。
scanAndLockForPut 方法
该方法用于在获取锁失败时进行扫描并尝试获取
总结
- 在Segment里面有2个count变量,count与modCount,前者表示元素的个数,后者表示修改的次数。当待put的元素、key值或者hash值和链表中某个节点相等时,不会重复插入新节点。此时再进一步判断,如果onlyIfAbsent=true,则什么都不做;如果onlyIfAbsent=false,则修改该节点的value,同时modCount累加一次。
- 进入上面的else分支,说明已经遍历到了链表尾部,并且没有发现与key值或者hash值相等的节点,此时在链表头部插入1个新节点,并把table[index]赋值为该节点。因为first就是链表头部,所以直接把node的next指针指向first就可以了。
- 在函数的开始,加锁的时候,进行了一次优化。如果tryLock()成功,即拿到锁,则进入下面的代码;如果tryLock()不成功,也不能闲着,那进入scanAndLockForPut(key,hash,value)做什么呢?
下面是对 scanAndLockForPut 方法的详细说明及详细注释。这个方法用于在尝试获取锁的过程中扫描包含给定键的节点,如果未找到则创建并返回一个新节点。返回时,确保锁已被持有。
/**
* 在尝试获取锁的过程中扫描包含给定键的节点,如果未找到则创建并返回一个新节点。
* 返回时,确保锁已被持有。与大多数方法不同,此方法中的 equals 调用未被筛选:
* 由于遍历速度无关紧要,因此我们可以通过此方法来预热相关代码和访问。
*
* @param key 键
* @param hash 键的哈希值
* @param value 值
* @return 如果未找到键则返回一个新节点,否则返回 null
*/
private HashEntry<K, V> scanAndLockForPut(K key, int hash, V value) {
// 获取哈希值对应的第一个条目
HashEntry<K, V> first = entryForHash(this, hash);
HashEntry<K, V> e = first;
HashEntry<K, V> node = null;
int retries = -1; // 在定位节点时为负数
// 尝试获取锁
while (!tryLock()) {
HashEntry<K, V> f; // 用于重新检查下面的第一个条目
if (retries < 0) {
// 如果当前条目为空
if (e == null) {
if (node == null) // 如果节点为空,推测性地创建节点
node = new HashEntry<K, V>(hash, key, value, null);
retries = 0;
}
// 如果找到匹配的键
else if (key.equals(e.key))
retries = 0;
// 移动到下一个条目
else
e = e.next;
}
// 如果重试次数超过最大重试次数
else if (++retries > MAX_SCAN_RETRIES) {
lock(); // 强制获取锁
break;
}
// 如果重试次数为偶数且第一个条目已改变
else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) {
e = first = f; // 如果条目已改变,则重新遍历
retries = -1;
}
}
return node; // 返回新创建的节点(如果有)
}
详细说明
获取哈希值对应的第一个条目
HashEntry<K, V> first = entryForHash(this, hash);:- 根据哈希值获取对应的第一个条目。
初始化变量
HashEntry<K, V> e = first;:- 初始化变量
e,指向第一个条目。
- 初始化变量
HashEntry<K, V> node = null;:- 初始化变量
node,用于存储新创建的节点。
- 初始化变量
int retries = -1;:- 初始化重试计数器
retries为 -1,用于定位节点。
- 初始化重试计数器
尝试获取锁
while (!tryLock()) { ... }:- 尝试获取锁,如果未能获取锁则进入循环。
检查当前条目
if (retries < 0) { ... }:- 如果
retries小于 0,表示正在定位节点。 if (e == null) { ... } else if (key.equals(e.key)) { ... } else { ... }:- 如果当前条目为空,且
node为空,则推测性地创建节点。 - 如果找到匹配的键,则设置
retries为 0。 - 否则移动到下一个条目。
- 如果当前条目为空,且
- 如果
处理重试次数超过最大重试次数
else if (++retries > MAX_SCAN_RETRIES) { lock(); break; }:- 如果重试次数超过
MAX_SCAN_RETRIES,则强制获取锁并退出循环。
- 如果重试次数超过
重新遍历条目
else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { ... }:- 如果重试次数为偶数且第一个条目已改变,则重新遍历。
返回新节点
return node;:- 返回新创建的节点(如果有)。
辅助方法
entryForHash 方法
该方法用于根据哈希值获取对应的第一个条目。
/**
* 根据哈希值获取对应的第一个条目。
*
* @param segment 当前段
* @param hash 键的哈希值
* @return 对应的第一个条目
*/
private HashEntry<K, V> entryForHash(Segment<K, V> segment, int hash) {
HashEntry<K, V>[] tab = segment.table;
return tab[(tab.length - 1) & hash];
}
总结
scanAndLockForPut 方法通过在获取锁的过程中扫描包含给定键的节点,如果未找到则创建并返回一个新节点。返回时确保锁已被持有。这种方式确保了在高并发环境下对 ConcurrentHashMap 的高效操作。
上面的函数看似复杂,实则只是做了两件事:一是拿不到锁,不立即阻塞,而是先自旋,若自旋到一定次数仍未拿到锁,再调用lock()阻塞;二是在自旋的过程中遍历了链表,若发现没有重复的节点,则提前新建一个节点,为后面再插入节省时间。


