ConcurrentSkipListMap实现原理

跳表介绍

跳表是一种随机层次的数据结构,通过建立多级索引,实现以二分查找遍历一个有序链表。

20171219212454554

SkipList让已排序的数据分布在多层链表中,以0~1随机数决定一个数据的向上攀升与否,以时间换空间的一个算法。

SkipList的性质:

  1. 多层结构,level通过一定概率随机产生;
  2. 每一层都是一个有序链表;
  3. 最底层包含所有元素;
  4. 如果一个元素出现在第i层,则i层以下也包含这个元素;
  5. 每个节点包含两个指针,一个指向同一链表中的下一个元素,一个指向下面一层的元素。

性能对比

ConcurrentHashMap和ConcurrentSkipListMap对比,ConcurrentSkipListMap的存取时间是log(N),和线程数几乎无关。

使用建议

  • 非多线程下尽量使用TreeMap;
  • 并发读相对较低的情况下可以使用Collections.synchronizedSortedMap将TreeMap进行包装;
  • 对于高并发程序,应当使用ConcurrentSkipListMap提供更高的并发度。

基本操作

查找

20171219212920970

插入

20171219215037478

跳表的插入肯定会在底层增加一个节点,但是往上的层次是否需要增加节点则完全随机。SkipList通过概率保证整张表的节点分布均匀。

如果概率算得level大于当前表的最大level,则会新增一个level:
20171219215328464

删除

20171219215638922

前导知识预备

删除操作需要分几步走

  1. 找到待删除节点,并将其value属性值设置为null。
    整个过程基于CAS无锁。如果失败了,那么会通过循环再次尝试CAS。
+——+ +——+ +——+
… | b |——>| n |—–>| f | …
+——+ +——+ +——+
  1. 向待删除节点的next位置新增一个marker标记节点。
    整个过程也是基于CAS无锁。这一步用于分散下一步的并发冲突性。
   +------+       +------+      +------+       +------+
  ...  |   b  |------>|   n  |----->|marker|---->|   f  | ...
   +------+       +------+      +------+       +------+
  1. 断链操作。
    CAS式删除具体的节点,实际上也就是跳过待删节点,让待删节点的前驱节点直接越过本身指向待删节点的后继结点即可。

如果第二步marker添加失败,则不会有第三部,直接回退到第一步。
这一步成功后,n节点将作为内存中的一个游离节点,最终被GC掉。

       +------+                                    +------+
  ...  |   b  |----------------------->|   f  | ...
       +------+                                    +------+

基本成员属性

基本数据存储单元:

static final class Node<K,V> {
    final K key;
    volatile Object value;
    volatile Node<K,V> next;

    Node(K key, Object value, Node<K,V> next) {
        this.key = key;
        this.value = value;
        this.next = next;
    }
    //省略其它的一些基于当前结点的 CAS 方法
}

跳表的基本组成单元,包装了Node:

static class Index<K,V> {
    final Node<K,V> node;
    final Index<K,V> down;
    volatile Index<K,V> right;

    Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
        this.node = node;
        this.down = down;
        this.right = right;
    }
    //省略其它的一些基于当前结点的 CAS 方法
}

头结点:level属性用于标识当前层次的序号。

static final class HeadIndex<K,V> extends Index<K,V> {
    final int level;
    HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
        super(node, down, right);
        this.level = level;
    }
}

整个跳表的头结点:通过它可以遍历访问整张跳表。

/**
 * The topmost head index of the skiplist.
 */
private transient volatile HeadIndex<K,V> head;

put的并发实现

//基本的 put 方法,向跳表中添加一个节点
public V put(K key, V value) {
    if (value == null)
        throw new NullPointerException();
    return doPut(key, value, false);
}

内部调用doPut

第一部分:向底层链表插入一个节点:

//第一部分
private V doPut(K key, V value, boolean onlyIfAbsent) {
    Node<K,V> z;
    //边界值判断,空的 key 自然是不允许插入的
    if (key == null)
        throw new NullPointerException();
    //拿到比较器的引用
    Comparator<? super K> cmp = comparator;
    for (;;) {
        //根据 key,找到待插入的位置
        //b 叫做前驱节点,将来作为新加入结点的前驱节点
        //n 叫做后继结点,将来作为新加入结点的后继结点
        //也就是说,新节点将插入在 b 和 n 之间
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            //如果 n 为 null,那么说明 b 是链表的最尾端的结点,这种情况比较简单,直接构建新节点插入即可
            //否则走下面的判断体
            if (n != null) {
                Object v; int c;
                Node<K,V> f = n.next;
                //如果 n 不再是 b 的后继结点了,说明有其他线程向 b 后面添加了新元素
                //那么我们直接退出内循环,重新计算新节点将要插入的位置
                if (n != b.next)
                    break;
                //value =0 说明 n 已经被标识位待删除,其他线程正在进行删除操作
                //调用 helpDelete 帮助删除,并退出内层循环重新计算待插入位置
                if ((v = n.value) == null) { 
                    n.helpDelete(b, f);
                    break;
                }
                //b 已经被标记为待删除,前途结点 b 都丢了,可不得重新计算待插入位置吗
                if (b.value == null || v == n) 
                    break;
                //如果新节点的 key 大于 n 的 key 说明找到的前驱节点有误,按序往后挪一个位置即可
                //回到内层循环重新试图插入
                if ((c = cpr(cmp, key, n.key)) > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                //新节点的 key 等于 n 的 key,这是一次 update 操作,CAS 更新即可
                //如果更新失败,重新进循环再来一次
                if (c == 0) {
                    if (onlyIfAbsent || n.casValue(v, value)) {
                        @SuppressWarnings("unchecked") V vv = (V)v;
                        return vv;
                    }
                    break; 
                }
            }
            //无论遇到何种问题,到这一步说明待插位置已经确定
            z = new Node<K,V>(key, value, n);
            if (!b.casNext(n, z))//完成底层的插入工作 
                break;      
            int level = randomLevel();//得到一个随机的level作为该key-value插入的最高level
            if (level > 0)
                insertIndex(z, level);//进行插入操作
            return null;
        }
    }

第二部分:根据level值,确认是否需要增加一层索引。如果不需要则构件号底层到level层的index节点的纵向引用。如果需要则新创建一层索引,完成head节点的指针转移,并构建好纵向的index节点引用。

//第二部分
/**
 * 获得一个随机的level值
 */
private int randomLevel(){
    int x = randomSeed;
    x^= x <<13;
    x^= x >>>17;
    randomSeed=x^=x<<5;
    if((x&0x8001)!=0)//test highest and lowest bits
        return 0;
    int level=1;
    while(((x>>>=1)&1)!=0)++level;
    return level;
}

//执行插入操作:如上图所示,有两种可能的情况:
//1.当level存在时,对level<=n都执行insert操作
//2.当level不存在(大于目前的最大level)时,首先添加新的level,然后在执行操作1
private void insertIndex(Node<K,V> z,int level) {
        int max;
        Index<K,V> idx = null;
        HeadIndex<K,V> h = head;
        //如果概率算得的 level 在当前跳表 level 范围内
        //构建一个从 1 到 level 的纵列 index 结点引用
        if (level <= (max = h.level)) {
            for (int i = 1; i <= level; ++i)
                idx = new Index<K,V>(z, idx, null);
            addIndex(idx, h, level);//把最高level的idx传给addIndex方法 
        }
        //否则需要新增一个 level 层
        else { 
            level = max + 1; 
            Index<K,V>[] idxs =(Index<K,V>[])new Index<?,?>[level+1];
            for (int i = 1; i <= level; ++i)
                idxs[i] = idx = new Index<K,V>(z, idx, null);
            for (;;) {
                h = head;
                int oldLevel = h.level;
                //level 肯定是比 oldLevel 大一的,如果小了说明其他线程更新过表了
                if (level <= oldLevel) 
                    break;
                HeadIndex<K,V> newh = h;
                Node<K,V> oldbase = h.node;
                //正常情况下,循环只会执行一次,如果由于其他线程的并发操作导致 oldLevel 的值不稳定,那么会执行多次循环体
                for (int j = oldLevel+1; j <= level; ++j)
                    newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                //更新头指针
                if (casHead(h, newh)) {
                    h = newh;
                    idx = idxs[level = oldLevel];
                    break;
                }
            }
            addIndex(idxs[k], h, k);
        }

第三部分:将我们的新节点在每个索引层都构建好前后的链接关系。

//第三部分
/** 
* 在1~indexlevel层中插入数据  
*/  
private void addIndex(Index<K,V> idx, HeadIndex<K,V> h, int level) {
    for (int insertionLevel = level;;) {
            int j = h.level;
            for (Index<K,V> q = h, r = q.right, t = idx;;) {
                //其他线程并发操作导致头结点被删除,直接退出外层循环
                //这种情况发生的概率很小,除非并发量实在太大
                if (q == null || t == null)
                    break splice;
                if (r != null) {
                    Node<K,V> n = r.node;
                    int c = cpr(cmp, key, n.key);
                    //如果 n 正在被其他线程删除,那么调用 unlink 去删除它
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;
                        //重新获取 q 的右结点,再次进入循环
                        r = q.right;
                        continue;
                    }
                    //c > 0 说明前驱结点定位有误,重新进入
                    if (c > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }
                if (j == insertionLevel) {
                    //尝试着将 t 插在 q 和 r 之间,如果失败了,退出内循环重试
                    if (!q.link(r, t))
                        break; // restart
                    //如果插入完成后,t 结点被删除了,那么结束插入操作
                    if (t.node.value == null) {
                        findNode(key);
                        return;
                    }
                    // insertionLevel-- 处理底层链接
                    if (--insertionLevel == 0)
                        return;
                }
                //--j,j 应该与 insertionLevel 同步,它代表着我们创建的那个纵向的结点数组的索引
                //并完成层次下移操作
                if (--j >= insertionLevel && j < level)
                    t = t.down;
                //至此,新节点在当前层次的前后引用关系已经被链接完成,现在处理下一层
                q = q.down;
                r = q.right;
            }
        }
    }
    return null;
}

图示

初始化跳表为:
20171222100053434

第一部分,新增一个节点到最底层的链表上:
20171222100029636

第二部分,假设概率得出一个level值为10,那么新增一层索引层:
20171222100001896

第三部分,链接各个索引层次上的新节点
20171222100001896-1

辅助函数

findPredecessor

根据给定的key,返回最合适的前驱节点。

首先从head节点开始在当前索引层上寻找最后一个比给定key小的节点,它就是我们需要的前驱节点,只需要返回它接即可。

private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
    if (key == null)
        throw new NullPointerException(); 
    for (;;) {
        for (Index<K,V> q = head, r = q.right, d;;) {
            //r 为空说明 head 后面并没有其他节点了
            if (r != null) {
                Node<K,V> n = r.node;
                // r 节点处于待删除状态,那么尝试 unlink 它,失败了将重新进入循环再此尝试
                //否则重新获取 q 的右结点并重新进入循环查找前驱节点
                if (n.value == null) {
                    if (!q.unlink(r))
                        break;           // restart
                    r = q.right;         // reread r
                    continue;
                }
                //大于零说明当前位置上的 q 还不是我们要的前驱节点,继续往后找
                if (cpr(cmp, key, k) > 0) {
                    q = r;
                    r = r.right;
                    continue;
                }
            }
            //如果当前的 level 结束了或者 cpr(cmp, key, k) <= 0 会达到此位置
            //往低层递归,如果没有低层了,那么当前的 q 就是最合适的前驱节点
            //整个循环只有这一个出口,无论如何最终都会从此处结束方法
            if ((d = q.down) == null)
                return q.node;
           //否则向低层递归并重置 q 和 r
            q = d;
            r = d.right;
        }
    }
}

helpDelete

检测到某个节点属性值为null时,会调用这个方法来删除该节点。

通过将b.next指向f,完成节点n的删除。

/*
   一般的调用形式如下:
   n.helpDelete(b, f);
*/
void helpDelete(Node<K,V> b, Node<K,V> f) {
    if (f == next && this == b.next) {
       if (f == null || f.value != f) 
            casNext(f, new Node<K,V>(f));
        else
            b.casNext(this, f.next);
    }
}

总结:对于并发的处理,基本都是双层for循环+CAS无锁式更新。如果遇到竞争失利将推出里层循环重新进行尝试。

remove的并发实现

public V remove(Object key) {
    return doRemove(key, null);
}

remove方法会有一堆判断:根据给定的key和value会判断是否存在与key对应的一个节点,也会判断和待删节点相关的前后节点是否正在被删除,其次才是删除的三大步骤,核心步骤还是将待删节点的value属性赋null以标记该节点无用了。

final V doRemove(Object key, Object value) {
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        //找到 key 的前驱节点
        //因为删除不单单是根据 key 找到对应的结点,然后赋 null 就完事的,还要负责链接该结点前后的关联
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            //目前 n 基本上就是我们要删除的结点,它为 null,那自然不用继续了,已经被删除了
            if (n == null)
                break outer;
            Node<K,V> f = n.next;
            //再次确认 n 还是不是 b 的后继结点,如果不是将退出里层循环重新进入
            if (n != b.next)               
                break;
            //如果有人正在删除 n,那么帮助它删除
            if ((v = n.value) == null) {     
                n.helpDelete(b, f);
                break;
            }
            //b 被删除了,重新定位前驱节点
            if (b.value == null || v == n)     
                break;
            //正常情况下,key 应该等于 n.key
            //key 大于 n.key 说明我们要找的结点可能在 n 的后面,往后递归即可
            //key 小于 n.key 说明 key 所代表的结点根本不存在
            if ((c = cpr(cmp, key, n.key)) < 0)
                break outer;
            if (c > 0) {
                b = n;
                n = f;
                continue;
            }
            //如果删除是根据键和值两个参数来删除的话,value 是不为 null 的
            //这种情况下,如果 n 的 value 属性不等于我们传入的 value ,那么是不进行删除的
            if (value != null && !value.equals(v))
                break outer;
            //下面三个步骤才是整个删除操作的核心,大致的逻辑我们也在上文提及过了,此处想必会容易理解些
            //第一步,尝试将待删结点的 value 属性赋值 null,失败将退出重试
            if (!n.casValue(v, null))
                break;
            //第二步和第三步如果有一步由于竞争失败,将调用 findNode 方法根据我们第一步的成果,也就是删除所有 value 为 null 的结点
            if (!n.appendMarker(f) || !b.casNext(n, f))//casNext方法就是通过比较和设置b(前驱)的next节点的方式来实现删除操作  
                findNode(key);  // 通过尝试findNode的方式继续find  
            //否则说明三个步骤都成功完成了   
            else {
                findPredecessor(key, cmp);  
                //判断此次删除后是否导致某一索引层没有其他节点了,并适情况删除该层索引(如果head的right引用为空,则表示不存在该level)
                if (head.right == null)
                    tryReduceLevel();
            }
            @SuppressWarnings("unchecked") V vv = (V)v;
            return vv;
        }
    }
    return null;
}

get的并发实现

public V get(Object key) {
    return doGet(key);
}
private V doGet(Object key) {
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    //依然是双层循环来处理并发
    outer: for (;;) {
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            //以下的一些判断的作用已经描述了多次,此处不再赘述了
            if (n == null)
                break outer;
            Node<K,V> f = n.next;
            if (n != b.next)           
                break;
            if ((v = n.value) == null) {    
                n.helpDelete(b, f);
                break;
            }
            if (b.value == null || v == n)  
                break;
            //c = 0 说明 n 就是我们要的结点
            if ((c = cpr(cmp, key, n.key)) == 0) {
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
            //c < 0 说明不存在这个 key 所对应的结点
            if (c < 0)
                break outer;
            b = n;
            n = f;
        }
    }
    return null;
}
comments powered by Disqus