页面加载中
博客快捷键
按住 Shift 键查看可用快捷键
ShiftK
开启/关闭快捷键功能
ShiftA
打开/关闭中控台
ShiftD
深色/浅色显示模式
ShiftS
站内搜索
ShiftR
随机访问
ShiftH
返回首页
ShiftL
友链页面
ShiftP
关于本站
ShiftI
原版/本站右键菜单
松开 Shift 键或点击外部区域关闭
互动
最近评论
暂无评论
标签
寻找感兴趣的领域
暂无标签
    0
    文章
    0
    标签
    8
    分类
    10
    评论
    128
    功能
    深色模式
    标签
    JavaScript12TypeScript8React15Next.js6Vue10Node.js7CSS5前端20
    互动
    最近评论
    暂无评论
    标签
    寻找感兴趣的领域
    暂无标签
      0
      文章
      0
      标签
      8
      分类
      10
      评论
      128
      功能
      深色模式
      标签
      JavaScript12TypeScript8React15Next.js6Vue10Node.js7CSS5前端20
      未知歌曲
      未播放
      ♪ 暂无歌词 ♪
      随便逛逛
      博客分类
      文章标签
      复制地址
      深色模式
      AnHeYuAnHeYu
      Search⌘K
      博客
        暂无其他文档

        并发容器之ConCurrentHashMap(三)

        JDK8中ConcurrentHashMap取消分段锁,采用单个HashMap结构,并引入红黑树优化性能。当链表长度超过8且容量大于64时转为红黑树,节点少于6时转回链表。锁粒度细化到头节点,并发度等于Node数组长度,同时支持并发扩容,既降低Hash冲突又提升并发性能。

        June 17, 202419 分钟 阅读5 次阅读

        参考资料:JAVA并发实现原理:JDK源码剖析 作者:余春龙

        接下来我们说说JDK8中的实现。JDK8中比较重要的两个变化如下: 1. 取消了分段锁,所有数据存放在一个大的HashMap中 2. 引入了红黑树

        这里我们再回顾一下JDK7的实现原理图,对比JDK8实现原理

        JDK7的实现原理图 JDK8实现原理图 如果头节点是Node类型,则尾随它的就是一个普通的链表;如果头节点是TreeNode类型,它的后面就是一颗红黑树,TreeNode是Node的子类。

        红黑树

        在JDK 8中,HashMap的实现做了一些优化,其中之一是引入了链表和红黑树之间的相互转换规则。这一改变主要是为了提升在哈希冲突高的情况下的性能。在JDK 7及以前,HashMap的桶(bucket)中存储的是链表,这在哈希冲突较多时会导致性能退化为O(n)。JDK 8通过在适当的条件下将链表转换为红黑树来提升性能,使得最坏情况下的时间复杂度从O(n)降低到O(log n)。

        在JDK 7中的分段锁,有三个好处: 1. 减少Hash冲突,避免一个槽里有太多元素。 2. 提高读和写的并发度。段与段之间相互独立。 3. 提供扩容的并发度。扩容的时候,不是整个ConcurrentHashMap 一起扩容,而是每个Segment独立扩容。 针对这三个好处,我们来看一下在JDK 8中相应的处理方式: 1. 使用红黑树,当一个槽里有很多元素时,其查询和更新速度会比链表快很多,Hash冲突的问题由此得到较好的解决。 2. 加锁的粒度,并非整个ConcurrentHashMap,而是对每个头节点分别加锁,即并发度,就是Node数组的长度,初始长度为16,和在JDK 7中初始Segment的个数相同。 3. 并发扩容,这是难度最大的。在JDK 7中,一旦Segment的个数在初始化的时候确立,不能再更改,并发度被固定。之后只是在每个Segment内部扩容,这意味着每个Segment独立扩容,互不影响,不存在并发扩容的问题。但在JDK 8中,相当于只有1个Segment,当一个线程要扩容Node数组的时候,其他线程还要读写,因此处理过程很复杂,后面会详细分析. 由上述对比可以总结出来:JDK 8的实现一方面降低了Hash冲突,另一方面也提升了并发度。

        以下是链表和红黑树之间相互转换的规则:

        链表转为红黑树的条件

        1. 链表长度达到阈值: 当一个桶中的链表长度超过阈值(默认值为8)时,会触发链表向红黑树的转换。这个阈值在HashMap的源码中由TREEIFY_THRESHOLD常量定义,默认值为8。

        2. HashMap的大小超过最小树化容量: 当HashMap的容量(size)超过最小树化容量(默认值为64)时,链表才会转换为红黑树。这个阈值在HashMap的源码中由MIN_TREEIFY_CAPACITY常量定义,默认值为64。

        如果HashMap的容量没有超过MIN_TREEIFY_CAPACITY,即使桶中的链表长度超过了TREEIFY_THRESHOLD,HashMap也只会进行扩容(resize)而不会进行树化。

        红黑树转为链表的条件

        当红黑树中的节点数量减少到一定阈值(默认值为6)时,会将红黑树转换回链表。这个阈值在HashMap的源码中由UNTREEIFY_THRESHOLD常量定义,默认值为6。这是为了避免当节点较少时维护红黑树的额外开销。

        源码分析

        下面是HashMap源码中与链表和红黑树转换相关的部分代码:

        // 转换链表为红黑树的阈值
        static final int TREEIFY_THRESHOLD = 8;
        
        // 转换红黑树为链表的阈值
        static final int UNTREEIFY_THRESHOLD = 6;
        
        // 最小树化容量
        static final int MIN_TREEIFY_CAPACITY = 64;
        
        void treeifyBin(Node<K,V>[] tab, int hash) {
            int n, index; Node<K,V> e;
            if (tab == null || (n = tab.length) < MIN_TREEIFY_CAPACITY)
                resize();
            else if ((e = tab[index = (n - 1) & hash]) != null) {
                TreeNode<K,V> hd = null, tl = null;
                do {
                    TreeNode<K,V> p = replacementTreeNode(e, null);
                    if (tl == null)
                        hd = p;
                    else {
                        p.prev = tl;
                        tl.next = p;
                    }
                    tl = p;
                } while ((e = e.next) != null);
                tab[index] = hd;
                if (hd != null)
                    hd.treeify(tab);
            }
        }
        
        final void untreeify(HashMap.Node<K,V>[] tab) {
            HashMap.Node<K,V> b;
            // 将红黑树转换回链表
            for (TreeNode<K,V> e = first; e != null; e = e.next) {
                HashMap.Node<K,V> newNode = newNode(e.hash, e.key, e.value, null);
                if (b == null)
                    tab[index] = newNode;
                else
                    b.next = newNode;
                b = newNode;
            }
        }
        

        在上面的代码中,treeifyBin方法负责将链表转换为红黑树,而untreeify方法则负责将红黑树转换为链表。TREEIFY_THRESHOLD和UNTREEIFY_THRESHOLD分别控制了链表和红黑树之间的转换阈值,而MIN_TREEIFY_CAPACITY则确保只有在HashMap的容量足够大时才进行树化操作。

        通过这些规则和优化,JDK 8中的HashMap能够在面对大量哈希冲突时保持较高的性能,从而提升整体效率。

        下面我们详细的一步步分析

        构造函数分析

        // 指定初始容量和负载因子的构造函数
        public HashMap(int initialCapacity, float loadFactor) {
            // 检查初始容量是否合法
            if (initialCapacity < 0)
                throw new IllegalArgumentException("Illegal initial capacity: " + initialCapacity);
            // 如果初始容量大于最大容量,则将初始容量设置为最大容量
            if (initialCapacity > MAXIMUM_CAPACITY)
                initialCapacity = MAXIMUM_CAPACITY;
            // 检查负载因子是否合法
            if (loadFactor <= 0 || Float.isNaN(loadFactor))
                throw new IllegalArgumentException("Illegal load factor: " + loadFactor);
            this.loadFactor = loadFactor; // 设置负载因子
            this.threshold = tableSizeFor(initialCapacity); // 设置扩容阈值
        }
        
        // 计算大于或等于cap的最小2的幂
        static final int tableSizeFor(int cap) {
            int n = cap - 1; // 减一是为了在n正好是2的幂时不增加
            n |= n >>> 1; // 处理高1位
            n |= n >>> 2; // 处理高2位
            n |= n >>> 4; // 处理高4位
            n |= n >>> 8; // 处理高8位
            n |= n >>> 16; // 处理高16位
            // 如果结果小于0,则返回1;如果结果大于最大容量,则返回最大容量;否则返回n+1
            return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
        }
        

        结论: 1. 设定的阈值并不是真实的阈值,他会自动计算最接近你给的容量的2的幂次为容量 2. 所有的容量都是2的幂次

        初始化

        在上面的构造函数里只计算了数组的初始大小,并没有对数组进行初始化。当多个线程都往里面放入元素的时候,再进行初始化。这就存在一个问题:多个线程重复初始化。下面看一下是如何处理的。

        /**
         * 初始化表,使用记录在 sizeCtl 中的大小。
         * @return 初始化后的桶数组
         */
        private final Node<K,V>[] initTable() {
            Node<K,V>[] tab; // 声明桶数组
            int sc; // 声明 sizeCtl 临时变量
            while ((tab = table) == null || tab.length == 0) { // 如果桶数组为空或长度为0
                if ((sc = sizeCtl) < 0) // 如果 sizeCtl 小于0,表示正在初始化
                    Thread.yield(); // 让出CPU时间片,避免自旋等待
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 尝试将 sizeCtl 置为-1,表示正在初始化
                    try {
                        if ((tab = table) == null || tab.length == 0) { // 再次检查桶数组是否为空
                            int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 初始化容量为 sizeCtl 的值或默认容量
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 创建新的桶数组
                            table = tab = nt; // 将新桶数组赋值给 table
                            sc = n - (n >>> 2); // 计算新的 sizeCtl 值,即容量的0.75倍
                        }
                    } finally {
                        sizeCtl = sc; // 恢复 sizeCtl 的值
                    }
                    break; // 退出循环
                }
            }
            return tab; // 返回初始化后的桶数组
        }
        

        详细解释

        1. 变量声明:

          • Node<K,V>[] tab:声明桶数组。
          • int sc:声明临时变量,用于存储sizeCtl的值。
        2. 循环检查和初始化:

          • while ((tab = table) == null || tab.length == 0):检查桶数组是否为空或长度为0。如果是,则进入循环。
          • if ((sc = sizeCtl) < 0):将sizeCtl的值赋给sc并检查是否小于0。如果sizeCtl小于0,表示当前有其他线程正在初始化桶数组,调用Thread.yield()让出CPU时间片,避免自旋等待。
          • else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)):尝试将sizeCtl的值从sc设置为-1,表示当前线程正在初始化。如果成功,进入try块。
        3. 实际初始化逻辑:

          • if ((tab = table) == null || tab.length == 0):再次检查桶数组是否为空或长度为0。
          • int n = (sc > 0) ? sc : DEFAULT_CAPACITY:如果sizeCtl大于0,则使用sizeCtl的值作为初始容量;否则,使用默认容量DEFAULT_CAPACITY。
          • @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]:创建新的桶数组nt。
          • table = tab = nt:将新创建的桶数组赋值给table和tab。
          • sc = n - (n >>> 2):计算新的sizeCtl值,即容量的0.75倍。
        4. 恢复sizeCtl值并退出循环:

          • finally { sizeCtl = sc; }:在finally块中将sizeCtl恢复为新计算的值。
          • break:退出循环。
        5. 返回初始化后的桶数组:

          • return tab:返回初始化后的桶数组。

        通过上面的代码可以看到,多个线程的竞争是通过对sizeCtl进行CAS操作实现的。如果某个线程成功地把sizeCtl 设置为-1,它就拥有了初始化的权利,进入初始化的代码模块,等到初始化完成,再把sizeCtl设置回去;其他线程则一直执行while循环,自旋等待,直到数组不为null,即当初始化结束时,退出整个函数。因为初始化的工作量很小,所以此处选择的策略是让其他线程一直等待,而没有帮助其初始化。\

        put实现分析

        好的,下面是详细注释后的put和putVal方法的代码:

        put 方法

        /**
         * 将指定的键与指定的值映射到此表中。
         * 键和值都不能为空。
         *
         * <p>可以通过调用 {@code get} 方法并传入与原始键相等的键来检索该值。
         *
         * @param key 与指定值相关联的键
         * @param value 与指定键相关联的值
         * @return 与 {@code key} 关联的先前值,如果没有该键的映射关系,则返回 {@code null}
         * @throws NullPointerException 如果指定的键或值为空
         */
        public V put(K key, V value) {
            return putVal(key, value, false); // 调用 putVal 方法实现
        }
        

        putVal 方法

        /**
         * 实现 put 和 putIfAbsent 的方法
         * @param key 与指定值相关联的键
         * @param value 与指定键相关联的值
         * @param onlyIfAbsent 如果为 true,则仅当键未被映射时才插入值
         * @return 与 {@code key} 关联的先前值,如果没有该键的映射关系,则返回 {@code null}
         */
        final V putVal(K key, V value, boolean onlyIfAbsent) {
            // 键和值不能为空
            if (key == null || value == null) throw new NullPointerException();
            // 计算键的哈希值
            int hash = spread(key.hashCode());
            int binCount = 0;
            // 进入循环,处理桶数组
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh;
                // 如果桶数组为空或长度为0,则初始化桶数组
                if (tab == null || (n = tab.length) == 0)
                    tab = initTable();
                // 计算索引,如果对应位置为空,则使用 CAS 操作插入新节点
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                        break; // 添加到空桶时不需要锁定
                }
                // 如果哈希值为 MOVED,表示正在扩容,协助迁移
                else if ((fh = f.hash) == MOVED)
                    tab = helpTransfer(tab, f);
                else {
                    V oldVal = null;
                    synchronized (f) { // 锁定桶中的第一个节点
                        if (tabAt(tab, i) == f) {
                            if (fh >= 0) {
                                binCount = 1;
                                // 遍历链表
                                for (Node<K,V> e = f;; ++binCount) {
                                    K ek;
                                    if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    if ((e = e.next) == null) {
                                        pred.next = new Node<K,V>(hash, key, value, null);
                                        break;
                                    }
                                }
                            }
                            // 如果是树节点,插入或更新值
                            else if (f instanceof TreeBin) {
                                Node<K,V> p;
                                binCount = 2;
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
                    }
                    // 如果桶中节点数不为0,则判断是否需要树化,并返回旧值
                    if (binCount != 0) {
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
            // 更新计数,并进行必要的扩容
            addCount(1L, binCount);
            return null;
        }
        

        详细解释

        1. put方法:

          • 调用putVal方法来实现put操作。
        2. putVal方法:

          • 参数检查:
            • 检查键和值是否为空,如果为空,则抛出NullPointerException。
          • 哈希计算:
            • 计算键的哈希值。
          • 循环处理桶数组:
            • 进入循环,处理桶数组。
            • 如果桶数组为空或长度为0,则初始化桶数组。
            • 计算索引,如果对应位置为空,则使用CAS操作插入新节点。
            • 如果哈希值为MOVED,表示正在扩容,协助迁移。
            • 如果桶中第一个节点的哈希值大于等于0,表示是链表,遍历链表,查找或插入节点。
            • 如果是树节点,调用putTreeVal方法处理树节点。
          • 同步处理:
            • 锁定桶中的第一个节点,防止并发修改。
          • 树化处理:
            • 判断链表长度是否超过阈值,超过则树化。
          • 更新计数:
            • 调用addCount方法更新计数,并进行必要的扩容。

        这段代码的主要功能是将键值对插入到ConcurrentHashMap中,并在必要时进行扩容和树化处理,以提高性能和存储效率。

        上面的for循环有4个大的分支: 第1个分支,是整个数组的初始化,前面已讲; 第2个分支,是所在的槽为空,说明该元素是该槽的第一个元素,直接新建一个头节点,然后返回; 第3个分支,说明该槽正在进行扩容,帮助其扩容; 第4个分支,就是把元素放入槽内。槽内可能是一个链表,也可能是一棵红黑树,通过头节点的类型可以判断是哪一种。第4个分支是包裹在synchronized (f)里面的,f对应的数组下标位置的头节点,意味着每个数组元素有一把锁,并发度等于数组的长度。

        上面的binCount表示链表的元素个数,当这个数目超过TREEIFY_THRESHOLD=8时,把链表转换成红黑树,也就是treeifyBin(tab,i)函数。但在这个函数内部,不一定需要进行红黑树转换,可能只做扩容操作,所以接下来从扩容讲起。

        最后更新于 June 17, 2024
        On this page
        暂无目录