JDK并发集合11:ConcurrentMap(并发容器)

ConcurrentMap

ConcurrentMap是一个接口,定义了能够支持并发访问的map集合,定义了以下几个方法(节选):

  • getOrDefault:获取不到则返回默认值;
  • putIfAbsent:如果没有这个key存在,则插入默认值;
  • computeIfAbsent:传入的是一个mappingFunction,如果key不存在则通过mappingFunction设置默认值;
  • computeIfPresent:传入的是一个mappingFunction,如果key存在则通过mappingFunction设置默认值;
  • compute:传入一个remappingFunction,对value进行remappingFunction运行,如果结果为空则删除key,如果不为空则写入并返回新值;例如创建或附加一个字符串 msg 到一个值映射map.compute(key, (k, v) -> (v == null) ? msg : v.concat(msg))
  • merge:传入一个remappingFunction,使用旧值和给定的value来计算新值,如果结果为空则删除key,如果不为空则写入并返回新值,这个函数用于组合一个键的多个值,例如创建或附加一个字符串 msg 到一个值映射map.merge(key, msg, String::concat)

ConcurrentHashMap

ConcurrentHashMap是HashMap的线程安全版本,它的特点是使用分段锁,在1.8中锁的粒度是槽位,即每个槽位一个锁。

ConcurrentHashMap1

属性

节点类型

ConcurrentHashMap的节点有三种,单个节点,链表,以及红黑树。初始的时候是单个节点,哈希冲突的节点以“拉链法”的形式放到节点后面作为链表,当节点数大于等于8,且table长度大于64的时候转为红黑树,小于等于6又会转回链表。

关于为什么选择8和6,有这一种解释:红黑树的插入复杂度O(lgN),查询复杂度O(lgN)。链表的插入复杂度为O(1),查询复杂度O(N),lg(8)=3,lg(6)=2.58,在6到8之间,红黑树和链表的插入复杂度+查询复杂度差不多,大于8的话红黑树占优势,小于6的话链表占优势。

负载因子

因为1.8是每一个哈希槽位一个锁,如果槽位内节点多了,则会出现多个线程竞争一个锁,导致性能下降。

然而在负载因子0.75的情况下,这不成一个问题。

负载因子默认是0.75,即数据量超过容量大小的0.75就需要扩容。

在理想的随机哈希情况下,容器中节点的频率(节点出现的频率在 hash 桶中)遵循泊松分布,其概率密度函数:
20181013121514354

参数λ是单位时间(或单位面积)内随机事件的平均发生次数。

对于0.75的默认调整阈值,泊松分布的概率质量函数中参数λ(事件发生的平均次数)的值约为0.5,尽管λ的值会因为load factor值的调整而产生较大变化。

那么每个桶内元素个数的概率:

0:    0.60653066
1:    0.30326533
2:    0.07581633
3:    0.01263606
4:    0.00157952
5:    0.00015795
6:    0.00001316
7:    0.00000094
8:    0.00000006

在随机哈希下,两个线程发生锁冲突的概率仅有1 / (8 * 桶中的元素个数)。其实笔者并没有理解这句话,但是按照上面桶内元素个数的概率,可以看出来在负载因子0.75的情况下,桶内元素个数在可接受范围内。

操作

初始化

插入数据的时候如果表为空,则会初始化数据。多个线程都进入到initTable的时候,CAS优先成功修改了sizeCtl那个线程进行初始化,其他线程一直等待。等到初始化完成,再把sizeCtl设置回去。

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
// 如果sizeCtl<0说明正在初始化或者扩容,让出CPU
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin //sizeCtl=-1,自旋等待 http://www.51gjie.com/java/729.html
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//关键:把sizeCtl设为-1
// 如果把sizeCtl原子更新为-1成功,则当前线程进入初始化
// 如果原子更新失败则说明有其它线程先一步进入初始化了,则进入下一次循环
// 如果下一次循环时还没初始化完毕,则sizeCtl<0进入上面if的逻辑让出CPU
// 如果下一次循环更新完毕了,则table.length!=0,退出循环
            try {
// 再次检查table是否为空,防止ABA问题
                if ((tab = table) == null || tab.length == 0) {
// 如果sc为0则使用默认值16
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//初始化
                    table = tab = nt;
// 设置sc为数组长度的0.75倍
// n - (n >>> 2) = n - n/4 = 0.75n
// 可见这里装载因子和扩容门槛都是写死了的
// 这也正是没有threshold和loadFactor属性的原因
                    sc = n - (n >>> 2);//sizeCtl并非表示数组长度,所以初始化成功之后,就不再等于数组长度,而是n-(n>>>2)=n-n/4=0.75n,表示下一次扩容的阈值
                }
            } finally {
                sizeCtl = sc;//把sizeCtl再设回去
            }
            break;
        }
    }
    return tab;
}

put

put的时候有以下三个操作:

  1. 如果table为空,则调用initTable进行初始化
  2. 槽位为空则CAS为新的节点并返回
  3. 如果槽位正在迁移则协助迁移
  4. 最后对槽位加锁,插入数据。
  5. 检查是否需要扩容
public V put(K key, V value) {
    return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
// 要插入的元素所在桶的元素个数
    int binCount = 0;
// 死循环,结合CAS使用(如果CAS失败,则会重新取整个桶进行下面的流程)
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();//分支一,整个数组初始化
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//分支二:第i个元素初始化
            // 利用CAS去进行无锁线程安全操作,如果bin是空的
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
// 如果使用CAS插入元素时,发现已经有元素了,则进入下一次循环,重新操作
// 如果使用CAS插入元素成功,则break跳出循环,流程结束
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)//分支三:扩容
            tab = helpTransfer(tab, f);
        else {//分支四:放入元素
            V oldVal = null;
            synchronized (f) {//加锁
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {//链表
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {//红黑树
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {//如果是链表,则上面的binCount会从1一直累加
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);//超出阈值,转换成红黑树
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);//总元素个数累加1
    return null;
}

扩容

当链表长度到达8的时候,会调用treeifyBin尝试去转换为红黑树。如果数组长度小于阈值64,不做红黑树转换,直接扩容。

treeifyBin

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);//数组长度小于阈值64,不做红黑树转换,直接扩容
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) {//链表转换成红黑树
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =//遍历链表,构建红黑树
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

tryPresize

扩容使用tryPresize:

private final void tryPresize(int size) {
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);//根据元素个数计算数组大小
    int sc;
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        if (tab == null || (n = tab.length) == 0) {//hash表初始化,和上面初始化的时候一样
            n = (sc > c) ? sc : c;
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2);//即n-n/4=0.75n,下一次扩容的阈值
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        else if (tab == table) {
            int rs = resizeStamp(n);
            if (sc < 0) {//sc<0,说明多个线程正在进行并发扩容
                Node<K,V>[] nt;
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)//扩容结束
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);//帮着扩容
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);//第一次扩容
        }
    }
}

transfer

里面使用到了transfer来进行扩容:

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    //计算步长
    // 将 length / 8 然后除以 CPU核心数。如果得到的结果小于 16,那么就使用 16。
    // 这里的目的是让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    // 新的 table 尚未初始化
    if (nextTab == null) {            // initiating
// 如果nextTab为空,说明还没开始迁移
// 就新建一个新桶数组
        try {
// 新桶数组是原桶的两倍
            // 扩容  2 倍
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            // 更新
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        // 更新成员变量
        nextTable = nextTab;
        // 更新转移下标,就是 老的 tab 的 length
        transferIndex = n;//初始的transferIndex为旧HashMap的数组长度
    }
// 新桶数组大小
    // 新 tab 的 length
    int nextn = nextTab.length;
// 新建一个ForwardingNode类型的节点,并把新桶数组存储在里面
    // 创建一个 fwd 节点,用于占位。当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点。
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // 首次推进为 true,如果等于 true,说明需要再次推进一个下标(i--),反之,如果是 false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进
    boolean advance = true;
    // 完成状态,如果是 true,就结束此方法。
    boolean finishing = false; // to ensure sweep before committing nextTab
    // 死循环,i 表示下标,bound 表示当前线程可以处理的当前桶区间最小下标
    //i为遍历的下标,bound为遍历的边界。如果成功拿到一个任务,则i = nextIndex-1,bound=nextIndex-stride;如果拿不到,则i=0,bound=0.
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        // 如果当前线程可以向后推进;这个循环就是控制 i 递减。同时,每个线程都会进入这里取得自己需要转移的桶的区间

        //advance表示在从i=transferIndex-1遍历到bound位置的过程中,是否一直继续
        //三个分支都是advance=false,意味着若三个分支都不执行,才可能一直执行while循环。
        // 目的在于,当对transferIndex执行CAS操作不成功的时候,需要自旋,以期拿到一个stride的迁移任务

// 整个while循环就是在算i的值,过程太复杂,不用太关心
// i的值会从n-1依次递减,感兴趣的可以打下断点就知道了
// 其中n是旧桶数组的大小,也就是说i从15开始一直减到1这样去迁移元素

//i表示的是当前正在迁移哪个!!!
// bound表示的是当前线程可以处理的最小当前区间最小下标
        while (advance) {
            int nextIndex, nextBound;
            // 对 i 减一,判断是否大于等于 bound (正常情况下,如果大于 bound 不成立,说明该线程上次领取的任务已经完成了。那么,需要在下面继续领取任务)
            // 如果对 i 减一大于等于 bound(还需要继续做任务),或者完成了,修改推进状态为 false,不能推进了。任务成功后修改推进状态为 true。
            // 通常,第一次进入循环,i-- 这个判断会无法通过,从而走下面的 nextIndex 赋值操作(获取最新的转移下标)。
            // 其余情况都是:如果可以推进,将 i 减一,然后修改成不可推进。如果 i 对应的桶处理成功了,改成可以推进。
            if (--i >= bound || finishing)//对数组的遍历,通过这里的--i进行.如果成功执行了--i,就不用继续while循环了.因为每次advance只能前进一步
                advance = false;// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进
                //transferIndex小于0,整个HashMap完成
            else if ((nextIndex = transferIndex) <= 0) {
                // 这里的目的是:1. 当一个线程进入时,会选取最新的转移下标。2. 当一个线程处理完自己的区间时,如果还有剩余区间的没有别的线程处理。再次获取区间。
                // 如果小于等于0,说明没有区间了 ,i 改成 -1,推进状态变成 false,不再推进,表示,扩容结束了,当前线程可以退出了
                // 这个 -1 会在下面的 if 块里判断,从而进入完成状态判断
                i = -1;
                advance = false;// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进
            }// CAS 修改 transferIndex,即 length - 区间值,留下剩余的区间值供后面的线程使用
            //对transferIndex进行CAS操作,即为当前线程分配一个stride。cas操作成功则线程拿到一个stride的迁移任务,cas操作不成功,线程没抢到任务,会继续while循环,自旋
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;// 这个值就是当前线程可以处理的最小当前区间最小下标
                i = nextIndex - 1;// 初次对i 赋值,这个就是当前线程可以处理的当前区间的最大下标
                advance = false;// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进,这样对导致漏掉某个桶。下面的 if (tabAt(tab, i) == f) 判断会出现这样的情况。
            }
        }// 如果 i < 0 (不在 tab 下标内,按照上面的判断,领取最后一段区间的线程扩容结束)
        //  如果 i >= tab.length(不知道为什么这么判断)
        //  如果 i + tab.length >= nextTable.length  (不知道为什么这么判断)
        // i已经越界,整个HashMap已经遍历完成
        if (i < 0 || i >= n || i + n >= nextn) {
// 如果一次遍历完成了
// 也就是整个map所有桶中的元素都迁移完成了
            int sc;
            if (finishing) {// 如果完成了扩容
// 如果全部迁移完成了,则替换旧桶数组
// 并设置下一次扩容门槛为新桶数组容量的0.75倍
                nextTable = null;// 删除成员变量
                table = nextTab;// 更新 table
                sizeCtl = (n << 1) - (n >>> 1);// 更新阈值
                return;
            }// 如果没完成
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 当前线程扩容完成,把扩容线程数-1
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
// 扩容完成两边肯定相等
                    return;
// 把finishing设置为true
// finishing为true才会走到上面的if条件
                finishing = advance = true;
// i重新赋值为n
// 这样会再重新遍历一次桶数组,看看是不是都迁移完成了
// 也就是第二次遍历都会走到下面的(fh = f.hash) == MOVED这个条件
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null)//tab[i]迁移完毕,赋值一个forwardingNode
// 如果桶中无数据,直接放入ForwardingNode标记该桶已迁移
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)//已经在迁移过程中
// 如果桶中第一个元素的hash值为MOVED
// 说明它是ForwardingNode节点
// 也就是该桶已迁移
            advance = true; // already processed
        else {//对tab[i]进行迁移操作,tab[i]可能是一个链表或者红黑树
// 锁定该桶并迁移元素
            synchronized (f) {
// 再次判断当前桶第一个元素是否有修改
// 也就是可能其它线程先一步迁移了元素
                if (tabAt(tab, i) == f) {
// 把一个链表分化成两个链表
// 规则是桶中各元素的hash与桶大小n进行与操作
// 等于0的放到低位链表(low)中,不等于0的放到高位链表(high)中
// 其中低位链表迁移到新桶中的位置相对旧桶不变
// 高位链表迁移到新桶中位置正好是其在旧桶的位置加n
// 这也正是为什么扩容时容量在变成两倍的原因
                    Node<K,V> ln, hn;
                    if (fh >= 0) {//链表
// 第一个元素的hash值大于等于0
// 说明该桶中元素是以链表形式存储的
// 这里与HashMap迁移算法基本类似
// 唯一不同的是多了一步寻找lastRun
// 这里的lastRun是提取出链表后面不用处理再特殊处理的子链表
// 比如所有元素的hash值与桶大小n与操作后的值分别为 0 0 4 4 0 0 0
// 则最后后面三个0对应的元素肯定还是在同一个桶中
// 这时lastRun对应的就是倒数第三个节点
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            //这个循环意味着在lastRun之后的所有元素,hash值都是一样的,记录下这个最后的位置
                            // 寻找最后一个hash值不等于runBit的元素
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;

                            }
                        }
                        if (runBit == 0) {//因为是 某个数 & n,所以结果要不就是n要不就是0.
// 看看最后这几个元素归属于低位链表还是高位链表
                            ln = lastRun;//如果最后一个runBit是0,即后面的全是0,即留在原来的链表
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
// 遍历链表,把hash&n为0的放在低位链表中
// 不为0的放在高位链表中
                        //从lastRun往后的所有节点,不需依次拷贝,而是直接链接到新的链表头部。从lastRun往前的所有节点,需要依次拷贝。
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        //把tab[i]位置的链表或红黑树重新组装成两部分,
                        // 一部分链接到nextTab[i]的位置,一部分链接到nextTab[i+n]的位置,如图5-11所示。
                        // 然后把tab[i]的位置指向一个ForwardingNode节点。
// 低位链表的位置不变
                        setTabAt(nextTab, i, ln);
// 高位链表的位置是原位置加n
                        setTabAt(nextTab, i + n, hn);
// 标记当前桶已迁移
                        setTabAt(tab, i, fwd);
// advance为true,返回上面进行--i操作
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {//红黑树,迁移方法和链表类似
// 如果第一个元素是树节点
// 也是一样,分化成两颗树
// 也是根据hash&n为0放在低位树中
// 不为0放在高位树中
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
// 遍历整颗树,根据hash&n是否为0分化成两颗树
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
// 如果分化的树中元素个数小于等于6,则退化成链表
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        //把tab[i]位置的链表或红黑树重新组装成两部分,
                        // 一部分链接到nextTab[i]的位置,一部分链接到nextTab[i+n]的位置,如图5-11所示。
                        // 然后把tab[i]的位置指向一个ForwardingNode节点。
// 低位树的位置不变
                        setTabAt(nextTab, i, ln);
// 高位树的位置是原位置加n
                        setTabAt(nextTab, i + n, hn);
// 标记该桶已迁移
                        setTabAt(tab, i, fwd);
// advance为true,返回上面进行--i操作
                        advance = true;
                    }
                }
            }
        }
    }
}
  1. 扩容的基本原理如下图所示,首先建一个新的HashMap,其数组长度是旧数组长度的2倍,然后把旧的元素逐个迁移过来。
    ConcurrentHashMap
    上面的旧HashMap是扩容前的table,下面的新HashMap是扩容后的。扩容过程中,如果有线程调用put的时候发现正在扩容,则会加入协助扩容,即会有多个线程一起进行扩容;

  2. 多线程进行扩容,就需要解决并发冲突的问题。这里是采用任务划分的方式,每个线程分得一段槽位,这段槽位用步长(stride)来表示,如下图所示。transferIndex表示整个数组扩容的进度。
    stride

stride的计算公式如下:单核情况下等于数组长度,即不进行多线程扩容。多核情况下将 length / 8 然后除以 CPU核心数。如果得到的结果小于 16,那么就使用 16。这里的目的是让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶

if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
    stride = MIN_TRANSFER_STRIDE; // subdivide range

transferIndex记录了扩容的进度,初始值为n,从大到小缩减,每次减stride个位置,最终减至n<=0,表示整个扩容完成:

  • [0,transferIndex-1] 表示还没有分配到线程扩容的部分
  • [transfexIndex,n-1] 表示已经分配给某个线程进行扩容

transfer过程中修改transferIndex的代码:

else if (U.compareAndSwapInt
         (this, TRANSFERINDEX, nextIndex,
          nextBound = (nextIndex > stride ?
                       nextIndex - stride : 0))) {
  1. 扩容过程中需要访问数据的时候,有些数据已经在新的数组,有些还是在旧的,ConcurrentHashMap使用ForwardingNode标记该槽位已经迁移完成,如果访问到该槽位,则转发到新的数组。
    ConcurrentHashMap--

  2. 在迁移过程中还做了链表优化。因为数组长度是2的次方,扩容后的长度是原数组的两倍,那么原来槽位的数据,假设为i,扩容后要么在i,要么在“table.length+i”处。那么扩容的过程就是把tab[i]的链表或者红黑树组成两个部分,一部分链接到i,一部分连接到tabke.length+i。

同时链表还可以做一个优化:找到链表后面哈希值一致的一段,挂到高位或者地位,剩下的逐个判断:

for (Node<K,V> p = f.next; p != null; p = p.next) {
    int b = p.hash & n;
    //这个循环意味着在lastRun之后的所有元素,hash值都是一样的,记录下这个最后的位置
    // 寻找最后一个hash值不等于runBit的元素
    if (b != runBit) {
        runBit = b;
        lastRun = p;

    }
}
if (runBit == 0) {//因为是 某个数 & n,所以结果要不就是n要不就是0.
// 看看最后这几个元素归属于低位链表还是高位链表
    ln = lastRun;//如果最后一个runBit是0,即后面的全是0,即留在原来的链表
    hn = null;
}
else {
    hn = lastRun;
    ln = null;
}
// 遍历链表,把hash&n为0的放在低位链表中
// 不为0的放在高位链表中
//从lastRun往后的所有节点,不需依次拷贝,而是直接链接到新的链表头部。从lastRun往前的所有节点,需要依次拷贝。
for (Node<K,V> p = f; p != lastRun; p = p.next) {
    int ph = p.hash; K pk = p.key; V pv = p.val;
    if ((ph & n) == 0)
        ln = new Node<K,V>(ph, pk, pv, ln);
    else
        hn = new Node<K,V>(ph, pk, pv, hn);
}
//把tab[i]位置的链表或红黑树重新组装成两部分,
// 一部分链接到nextTab[i]的位置,一部分链接到nextTab[i+n]的位置,如图5-11所示。
// 然后把tab[i]的位置指向一个ForwardingNode节点。
// 低位链表的位置不变
setTabAt(nextTab, i, ln);
// 高位链表的位置是原位置加n
setTabAt(nextTab, i + n, hn);
// 标记当前桶已迁移
setTabAt(tab, i, fwd);

sizeCtl

回头再看看tryPresize,这里有个关键的变量sizeCtl,控制着扩容的进度。把tryPresize函数贴回来这里:

private final void tryPresize(int size) {
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);//根据元素个数计算数组大小
    int sc;
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        if (tab == null || (n = tab.length) == 0) {//hash表初始化,和上面初始化的时候一样
            n = (sc > c) ? sc : c;
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2);//即n-n/4=0.75n,下一次扩容的阈值
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        else if (tab == table) {
            int rs = resizeStamp(n);
            if (sc < 0) {//sc<0,说明多个线程正在进行并发扩容
                Node<K,V>[] nt;
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)//扩容结束
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);//帮着扩容
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);//第一次扩容
        }
    }
}

sizeCtl各值:

  1. -1,表示table正在初始化
  2. < -1,高16位存储着扩容邮戳,扩容邮戳计算方式如下,低16位存储着扩容线程数加1,即(1+nThreads),表示有n个线程正在一起扩容
static final int resizeStamp(int n) {
    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
public static int numberOfLeadingZeros(int i) {
    // HD, Figure 5-6
    if (i == 0)
        return 32;
    int n = 1;
    if (i >>> 16 == 0) { n += 16; i <<= 16; }
    if (i >>> 24 == 0) { n +=  8; i <<=  8; }
    if (i >>> 28 == 0) { n +=  4; i <<=  4; }
    if (i >>> 30 == 0) { n +=  2; i <<=  2; }
    n -= i >>> 31;
    return n;
}
  1. 0,默认值,后续在真正初始化的时候使用默认容量
  2. > 0,初始化或扩容完成后下一次的扩容门槛

在第一次扩容的时候,sizeCtl会被设置成一个很大的负数U.compareAndSwapInt(this,SIZECTL,sc,(rs <<RESIZE_STAMP_SHIFT)+2);之后每一个线程扩容的时候,sizeCtl 就加1,U.compareAndSwapInt(this,SIZECTL,sc,sc+1),表示有多少个线程正在进行扩容。待扩容完成之后,sizeCtl减1。

get

查询操作不加锁,所以是非强一致性的:

  1. 计算元素所在桶
  2. 如果不存在则返回
  3. 如果第一个元素命中则返回
  4. 查找链表或红黑树
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算hash
    int h = spread(key.hashCode());
// 如果元素所在的桶存在且里面有元素
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
// 如果第一个元素就是要找的元素,直接返回
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
// hash小于0,说明是树或者正在扩容
// 使用find寻找元素,find的寻找方式依据Node的不同子类有不同的实现方式
            return (p = e.find(h, key)) != null ? p.val : null;
// 遍历整个链表寻找元素
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

remove

删除元素:

  1. 计算hash
  2. 如果桶不存在则返回
  3. 如果正在扩容则协助扩容
  4. 锁槽位,查找节点、链表或红黑树,找到了删除
  5. 最后数组元素大小减一
public boolean remove(Object key, Object value) {
    if (key == null)
        throw new NullPointerException();
    return value != null && replaceNode(key, null, value) != null;
}
final V replaceNode(Object key, V value, Object cv) {
// 计算hash
    int hash = spread(key.hashCode());
// 自旋
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
// 如果目标key所在的桶不存在,跳出循环返回null
            break;
        else if ((fh = f.hash) == MOVED)
// 如果正在扩容中,协助扩容
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
// 标记是否处理过
            boolean validated = false;
            synchronized (f) {
// 再次验证当前桶第一个元素是否被修改过
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
// fh>=0表示是链表节点
                        validated = true;
// 遍历链表寻找目标节点
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
// 找到了目标节点
                                V ev = e.val;
// 检查目标节点旧value是否等于cv
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    if (value != null)
// 如果value不为空则替换旧值
                                        e.val = value;
                                    else if (pred != null)
// 如果前置节点不为空
// 删除当前节点
                                        pred.next = e.next;
                                    else
// 如果前置节点为空
// 说明是桶中第一个元素,删除之
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
// 遍历到链表尾部还没找到元素,跳出循环
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    else if (f instanceof TreeBin) {
// 如果是树节点
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
// 遍历树找到了目标节点
                        if ((r = t.root) != null &&
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
// 检查目标节点旧value是否等于cv
                            if (cv == null || cv == pv ||
                                (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
// 如果value不为空则替换旧值
                                    p.val = value;
                                else if (t.removeTreeNode(p))
// 如果value为空则删除元素
// 如果删除后树的元素个数较少则退化成链表
// t.removeTreeNode(p)这个方法返回true表示删除节点后树的元素个数较少
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
// 如果处理过,不管有没有找到元素都返回
            if (validated) {
// 如果找到了元素,返回其旧值
                if (oldVal != null) {
// 如果要替换的值为空,元素个数减1
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
// 没找到元素返回空
    return null;
}

size

在remove中可以看到,ConcurrentHashMap是通过addCount来改变数组元素大小的。
每次添加/删除元素后,元素数量加1/减1,并判断是否达到扩容门槛,达到了则进行扩容或协助扩容。

把数组的大小存储根据不同的线程存储到不同的段上(也是分段锁的思想),并且有一个baseCount,优先更新baseCount,如果失败了再更新不同线程对应的段,这样可以保证尽量小的减少冲突。总结一句话:先尝试把数量加到baseCount上,如果失败再加到分段的CounterCell上

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
// 这里使用的思想跟LongAdder类是一模一样的(后面会讲)
// 把数组的大小存储根据不同的线程存储到不同的段上(也是分段锁的思想)
// 并且有一个baseCount,优先更新baseCount,如果失败了再更新不同线程对应的段
// 这样可以保证尽量小的减少冲突
// 先尝试把数量加到baseCount上,如果失败再加到分段的CounterCell上
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
// 如果as为空
// 或者长度为0
// 或者当前线程所在的段为null
// 或者在当前线程的段上加数量失败
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 如果as为空
// 或者长度为0
// 或者当前线程所在的段为null
// 或者在当前线程的段上加数量失败
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
// 计算元素个数
        s = sumCount();
    }
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
// 如果元素个数达到了扩容门槛,则进行扩容
// 注意,正常情况下sizeCtl存储的是扩容门槛,即容量的0.75倍
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
// rs是扩容时的一个邮戳标识
            int rs = resizeStamp(n);
            if (sc < 0) {
// sc<0说明正在扩容中
                /**1.第一个判断 sc右移RESIZE_STAMP_SHIFT位,也就是比较高ESIZE_STAMP_BITS位生成戳和rs是否相等
                 * 相等则代表是同一个n,是在同一容量下进行的扩容,
                 *  2.第二个和第三个判断 判断当前帮助扩容线程数是否已达到MAX_RESIZERS最大扩容线程数
                 *  3.第四个和第五个判断 为了确保transfer()方法初始化完毕
                 */
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
// 扩容未完成,则当前线程加入迁移元素中
// 并把扩容线程数加1
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
// 这里是触发扩容的那个线程进入的地方
// sizeCtl的高16位存储着rs这个扩容邮戳
// sizeCtl的低16位存储着扩容线程数加1,即(1+nThreads)
// 所以官方说的扩容时sizeCtl的值为 -(1+nThreads)是错误的
// 进入迁移元素
                transfer(tab, null);
// 重新计算元素个数
            s = sumCount();
        }
    }
}

获取元素大小:

public int size() {
// 调用sumCount()计算元素个数
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

类似LongAddr的思想:

  • 修改size
    • baseCount:先尝试修改基础计数
    • counterCells:竞争激烈,则根据线程哈希值找到分段位置,修改计数
  • 获取size
    • baseCount和counterCells加和

性能优化总结

  1. ConcurrentHashMap使用CAS+自旋,减少上下文切换
  2. 分段锁可以减少线程争用
  3. CounterCell,分段存储元素个数,减少多线程同时更新一个字段带来的低效;
  4. @sun.misc.Contended (CounterCell上的注解)避免伪共享;
  5. 多线程协同扩容,加快扩容速度

ConcurrentSkipListMap

ConcurrentHashMap是key无序的并发hash容器,ConcurrentSkipListMap则是key有序的并发容器,与名字一样,它使用跳表实现。

至于既然都使用了树了,为什么不用搜索树,这样子搜起来会快一些,而是使用了跳表实现,Doug Lea的解释是:

Given the use of tree-like index nodes, you might wonder why this doesn't use some kind of search tree instead, which would support somewhat faster search operations. The reason is that there are no known efficient lock-free insertion and deletion algorithms for search trees. The immutability of the "down" links of index nodes (as opposed to mutable "left" fields in true trees) makes this tractable using only CAS operations.

即对于搜索树我们没有找到一种lock-free的插入和删除的算法。
那么意思就是skiplist可以做到lock-free的插入和删除。
我们来看一下它是怎么做到的。

无锁链表

之前我们看到的无锁链表,都是在head、tail处进行CAS,所以没有问题。但是现在我们需要在链表中间进行插入、删除,我们来看一下这会发生什么问题。

问题

  1. 链表插入
    首先是链表插入,插入的时候使用CAS:如下图所示,在10的后面插入20,首先20的next指向30,然后CAS使得10的next指向20。
    ConcurrentSkipList--

  2. 链表删除
    链表删除的时候如下图所示:删除10,只需要把10的上一个节点的next指针CAS为10的下一个节点即可。
    ConcurrentSkipList---1

  3. 并发的时候
    我们来看下并发的时候会有什么问题:如果两个线程同时操作,一个删除节点10,一个要在节点10后面插入节点20,这两个都是CAS操作,这时候就会发现20也被删除了。
    ConcurrentSkipList---2

究其原因,在删除10的时候,实际操作的是10的前驱,10是没有操作的。那么在10后面CAS插入20的时候,当前线程并不知道10正在被删除。

解决方案

解决方法就是加一个mark节点,删除10的时候,把10的next指针指向mark节点,mark成软删除。如下图所示.
ConcurrentSkipList-----

  1. 首先10指向mark节点,软删除.之后再找机会物理删除
  2. 这时候线程要在10后面插入20,CAS的时候发现10指向了一个mark节点,则失败返回查找其他插入点。

跳表

跳表其实就是多层的链表。

remove

假设现在b -> n -> f,

  1. 插入的时候:b.casNext(n,z),n为旧的next节点,z为插入节点
  2. 删除的时候:
    • 删除n之前,n.appendMarker(f)指向一个mark节点
    • 之后找机会物理删除: b.casNext(n, f)
  3. 插入mark节点后,如果再往被删除节点后面插入节点则可判断该节点是否指向mark节点,即已经被删除。如果指向mark节点,则n指向一个marker节点,成功了才把n弹出b.casNext(n,z)会报错
final V doRemove(Object key, Object value) {
// key不为空
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
// 自旋
    outer: for (;;) {//b -> n -> f
// 寻找目标节点之前的最近的索引节点对应的数据节点
// 为了方便,这里叫b为当前节点,n为下一个节点,f为下下个节点
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
// 整个链表都遍历完了也没找到目标节点,退出外层循环
            if (n == null)
                break outer;
// 下下个节点
            Node<K,V> f = n.next;
// 再次检查
// 如果n不是b的下一个节点了
// 说明有其它线程先一步修改了,从头来过
            if (n != b.next)                    // inconsistent read//不一致读,重新开始
                break;
// 如果下个节点的值奕为null了
// 说明有其它线程标记该元素为删除状态了
            if ((v = n.value) == null) {        // n is deleted//n已经被删除了,执行删除的清理逻辑
// 协助删除
                n.helpDelete(b, f);
                break;
            }
// 如果b的值为空或者v等于n,说明b已被删除
// 这时候n就是marker节点,那b就是被删除的那个
            if (b.value == null || v == n)      // b is deleted//b已经被删除了,重新开始
                break;
// 如果c<0,说明没找到元素,退出外层循环
            if ((c = cpr(cmp, key, n.key)) < 0)//要删除的元素小于n,说明没找到要删除的元素,返回null
                break outer;
// 如果c>0,说明还没找到,继续向右找
            if (c > 0) {//要删除的元素大于n,[b,n]后移一个位置,重新找
// 当前节点往后移
                b = n;
// 下一个节点往后移
                n = f;
                continue;
            }
// c=0,说明n就是要找的元素
// 如果value不为空且不等于找到元素的value,不需要删除,退出外层循环
            //没找到要删除的元素(k,v)。key相等,但value不匹配,返回null
            if (value != null && !value.equals(v))
                break outer;
// 如果value为空,或者相等
// 原子标记n的value值为空
            //要删除的元素等于n。执行下面一系列的删除逻辑
            if (!n.casValue(v, null))
// 如果删除失败,说明其它线程先一步修改了,从头来过
                break;
// P.S.到了这里n的值肯定是设置成null了

// 关键!!!!
// 让n的下一个节点指向一个market节点
// 这个market节点的key为null,value为marker自己,next为n的下个节点f
// 或者让b的下一个节点指向下下个节点
// 注意:这里是或者||,因为两个CAS不能保证都成功,只能一个一个去尝试
// 这里有两层意思:
// 一是如果标记market成功,再尝试将b的下个节点指向下下个节点,如果第二步失败了,进入条件,如果成功了就不用进入条件了
// 二是如果标记market失败了,直接进入条件
            //添加删除标记
            if (!n.appendMarker(f) || !b.casNext(n, f))
// 通过findNode()重试删除(里面有个helpDelete()方法)
                findNode(key);                  // retry via findNode
            else {
// 上面两步操作都成功了,才会进入这里,不太好理解,上面两个条件都有非"!"操作
// 说明节点已经删除了,通过findPredecessor()方法删除索引节点
// findPredecessor()里面有unlink()操作
                findPredecessor(key, cmp);      // clean index
// 如果最高层头索引节点没有右节点,则跳表的高度降级
                if (head.right == null)
                    tryReduceLevel();
            }
// 返回删除的元素值
            @SuppressWarnings("unchecked") V vv = (V)v;
            return vv;
        }
    }
    return null;
}

假设我们要删除9:
--2020-10-17---11.55.01

  1. 找到9,设置value为null
  2. CAS 9的next为marker,
  3. CAS 9的前驱的next为9原来的next
  4. 把索引节点和前驱节点断开联系
  5. 减少层高
    --2020-10-17---11.55.10

其他操作与跳表无异,比如put是遍历到插入位置的前驱节点,用概率算出层高,如果层比原来的还高就再建一层索引。get是从上往下遍历到位置。

插入元素

初始状态:
--2020-10-17---12.02.12
假设要插入9,找到目标节点之前最近的一个索引对应的数据节点,插入9.
--2020-10-17---12.02.16
计算层高。这里假设是3,则建立垂直的down链表,再增加head索引链的高度。
--2020-10-17---12.02.34
最后把right指针补齐。从第3层的head往右找当前层级目标索引的位置,找到就把目标索引和它前面索引的right指针连上。继续往下移,往右找,知道最底层。
--2020-10-17---12.02.40

查找元素

初始状态
--2020-10-17---12.07.12-1

查找路径

ConcurrentSkipListSet

ConcurrentSkipListSet只是对ConcurrentSkipListMap的简单封装。

public class ConcurrentSkipListSet<E>
    extends AbstractSet<E>
    implements NavigableSet<E>, Cloneable, java.io.Serializable {
    private final ConcurrentNavigableMap<E,Object> m;
    public ConcurrentSkipListSet() {
        m = new ConcurrentSkipListMap<E,Object>();
    }
    public boolean add(E e) {
        return m.putIfAbsent(e, Boolean.TRUE) == null;
    }
}

refer

https://blog.csdn.net/reliveIT/article/details/82960063
https://www.codeleading.com/article/21271536235/
http://huyan.couplecoders.tech/java/源码阅读/java集合/2019/05/13/Map接口在1.8版本新增的几个方法/

comments powered by Disqus