跳表介绍
跳表是一种随机层次的数据结构,通过建立多级索引,实现以二分查找遍历一个有序链表。
SkipList让已排序的数据分布在多层链表中,以0~1随机数决定一个数据的向上攀升与否,以时间换空间的一个算法。
SkipList的性质:
- 多层结构,level通过一定概率随机产生;
- 每一层都是一个有序链表;
- 最底层包含所有元素;
- 如果一个元素出现在第i层,则i层以下也包含这个元素;
- 每个节点包含两个指针,一个指向同一链表中的下一个元素,一个指向下面一层的元素。
性能对比
ConcurrentHashMap和ConcurrentSkipListMap对比,ConcurrentSkipListMap的存取时间是log(N),和线程数几乎无关。
使用建议
- 非多线程下尽量使用TreeMap;
- 并发读相对较低的情况下可以使用Collections.synchronizedSortedMap将TreeMap进行包装;
- 对于高并发程序,应当使用ConcurrentSkipListMap提供更高的并发度。
基本操作
查找
插入
跳表的插入肯定会在底层增加一个节点,但是往上的层次是否需要增加节点则完全随机。SkipList通过概率保证整张表的节点分布均匀。
如果概率算得level大于当前表的最大level,则会新增一个level:
删除
前导知识预备
删除操作需要分几步走
- 找到待删除节点,并将其value属性值设置为null。
整个过程基于CAS无锁。如果失败了,那么会通过循环再次尝试CAS。
+——+ +——+ +——+
… | b |——>| n |—–>| f | …
+——+ +——+ +——+
- 向待删除节点的next位置新增一个marker标记节点。
整个过程也是基于CAS无锁。这一步用于分散下一步的并发冲突性。
+------+ +------+ +------+ +------+
... | b |------>| n |----->|marker|---->| f | ...
+------+ +------+ +------+ +------+
- 断链操作。
CAS式删除具体的节点,实际上也就是跳过待删节点,让待删节点的前驱节点直接越过本身指向待删节点的后继结点即可。
如果第二步marker添加失败,则不会有第三部,直接回退到第一步。
这一步成功后,n节点将作为内存中的一个游离节点,最终被GC掉。
+------+ +------+
... | b |----------------------->| f | ...
+------+ +------+
基本成员属性
基本数据存储单元:
static final class Node<K,V> {
final K key;
volatile Object value;
volatile Node<K,V> next;
Node(K key, Object value, Node<K,V> next) {
this.key = key;
this.value = value;
this.next = next;
}
//省略其它的一些基于当前结点的 CAS 方法
}
跳表的基本组成单元,包装了Node:
static class Index<K,V> {
final Node<K,V> node;
final Index<K,V> down;
volatile Index<K,V> right;
Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
this.node = node;
this.down = down;
this.right = right;
}
//省略其它的一些基于当前结点的 CAS 方法
}
头结点:level属性用于标识当前层次的序号。
static final class HeadIndex<K,V> extends Index<K,V> {
final int level;
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}
整个跳表的头结点:通过它可以遍历访问整张跳表。
/**
* The topmost head index of the skiplist.
*/
private transient volatile HeadIndex<K,V> head;
put的并发实现
//基本的 put 方法,向跳表中添加一个节点
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
return doPut(key, value, false);
}
内部调用doPut
第一部分:向底层链表插入一个节点:
//第一部分
private V doPut(K key, V value, boolean onlyIfAbsent) {
Node<K,V> z;
//边界值判断,空的 key 自然是不允许插入的
if (key == null)
throw new NullPointerException();
//拿到比较器的引用
Comparator<? super K> cmp = comparator;
for (;;) {
//根据 key,找到待插入的位置
//b 叫做前驱节点,将来作为新加入结点的前驱节点
//n 叫做后继结点,将来作为新加入结点的后继结点
//也就是说,新节点将插入在 b 和 n 之间
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
//如果 n 为 null,那么说明 b 是链表的最尾端的结点,这种情况比较简单,直接构建新节点插入即可
//否则走下面的判断体
if (n != null) {
Object v; int c;
Node<K,V> f = n.next;
//如果 n 不再是 b 的后继结点了,说明有其他线程向 b 后面添加了新元素
//那么我们直接退出内循环,重新计算新节点将要插入的位置
if (n != b.next)
break;
//value =0 说明 n 已经被标识位待删除,其他线程正在进行删除操作
//调用 helpDelete 帮助删除,并退出内层循环重新计算待插入位置
if ((v = n.value) == null) {
n.helpDelete(b, f);
break;
}
//b 已经被标记为待删除,前途结点 b 都丢了,可不得重新计算待插入位置吗
if (b.value == null || v == n)
break;
//如果新节点的 key 大于 n 的 key 说明找到的前驱节点有误,按序往后挪一个位置即可
//回到内层循环重新试图插入
if ((c = cpr(cmp, key, n.key)) > 0) {
b = n;
n = f;
continue;
}
//新节点的 key 等于 n 的 key,这是一次 update 操作,CAS 更新即可
//如果更新失败,重新进循环再来一次
if (c == 0) {
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
break;
}
}
//无论遇到何种问题,到这一步说明待插位置已经确定
z = new Node<K,V>(key, value, n);
if (!b.casNext(n, z))//完成底层的插入工作
break;
int level = randomLevel();//得到一个随机的level作为该key-value插入的最高level
if (level > 0)
insertIndex(z, level);//进行插入操作
return null;
}
}
第二部分:根据level值,确认是否需要增加一层索引。如果不需要则构件号底层到level层的index节点的纵向引用。如果需要则新创建一层索引,完成head节点的指针转移,并构建好纵向的index节点引用。
//第二部分
/**
* 获得一个随机的level值
*/
private int randomLevel(){
int x = randomSeed;
x^= x <<13;
x^= x >>>17;
randomSeed=x^=x<<5;
if((x&0x8001)!=0)//test highest and lowest bits
return 0;
int level=1;
while(((x>>>=1)&1)!=0)++level;
return level;
}
//执行插入操作:如上图所示,有两种可能的情况:
//1.当level存在时,对level<=n都执行insert操作
//2.当level不存在(大于目前的最大level)时,首先添加新的level,然后在执行操作1
private void insertIndex(Node<K,V> z,int level) {
int max;
Index<K,V> idx = null;
HeadIndex<K,V> h = head;
//如果概率算得的 level 在当前跳表 level 范围内
//构建一个从 1 到 level 的纵列 index 结点引用
if (level <= (max = h.level)) {
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);
addIndex(idx, h, level);//把最高level的idx传给addIndex方法
}
//否则需要新增一个 level 层
else {
level = max + 1;
Index<K,V>[] idxs =(Index<K,V>[])new Index<?,?>[level+1];
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
for (;;) {
h = head;
int oldLevel = h.level;
//level 肯定是比 oldLevel 大一的,如果小了说明其他线程更新过表了
if (level <= oldLevel)
break;
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
//正常情况下,循环只会执行一次,如果由于其他线程的并发操作导致 oldLevel 的值不稳定,那么会执行多次循环体
for (int j = oldLevel+1; j <= level; ++j)
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
//更新头指针
if (casHead(h, newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
addIndex(idxs[k], h, k);
}
第三部分:将我们的新节点在每个索引层都构建好前后的链接关系。
//第三部分
/**
* 在1~indexlevel层中插入数据
*/
private void addIndex(Index<K,V> idx, HeadIndex<K,V> h, int level) {
for (int insertionLevel = level;;) {
int j = h.level;
for (Index<K,V> q = h, r = q.right, t = idx;;) {
//其他线程并发操作导致头结点被删除,直接退出外层循环
//这种情况发生的概率很小,除非并发量实在太大
if (q == null || t == null)
break splice;
if (r != null) {
Node<K,V> n = r.node;
int c = cpr(cmp, key, n.key);
//如果 n 正在被其他线程删除,那么调用 unlink 去删除它
if (n.value == null) {
if (!q.unlink(r))
break;
//重新获取 q 的右结点,再次进入循环
r = q.right;
continue;
}
//c > 0 说明前驱结点定位有误,重新进入
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
if (j == insertionLevel) {
//尝试着将 t 插在 q 和 r 之间,如果失败了,退出内循环重试
if (!q.link(r, t))
break; // restart
//如果插入完成后,t 结点被删除了,那么结束插入操作
if (t.node.value == null) {
findNode(key);
return;
}
// insertionLevel-- 处理底层链接
if (--insertionLevel == 0)
return;
}
//--j,j 应该与 insertionLevel 同步,它代表着我们创建的那个纵向的结点数组的索引
//并完成层次下移操作
if (--j >= insertionLevel && j < level)
t = t.down;
//至此,新节点在当前层次的前后引用关系已经被链接完成,现在处理下一层
q = q.down;
r = q.right;
}
}
}
return null;
}
图示
初始化跳表为:
第一部分,新增一个节点到最底层的链表上:
第二部分,假设概率得出一个level值为10,那么新增一层索引层:
第三部分,链接各个索引层次上的新节点
辅助函数
findPredecessor
根据给定的key,返回最合适的前驱节点。
首先从head节点开始在当前索引层上寻找最后一个比给定key小的节点,它就是我们需要的前驱节点,只需要返回它接即可。
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
if (key == null)
throw new NullPointerException();
for (;;) {
for (Index<K,V> q = head, r = q.right, d;;) {
//r 为空说明 head 后面并没有其他节点了
if (r != null) {
Node<K,V> n = r.node;
// r 节点处于待删除状态,那么尝试 unlink 它,失败了将重新进入循环再此尝试
//否则重新获取 q 的右结点并重新进入循环查找前驱节点
if (n.value == null) {
if (!q.unlink(r))
break; // restart
r = q.right; // reread r
continue;
}
//大于零说明当前位置上的 q 还不是我们要的前驱节点,继续往后找
if (cpr(cmp, key, k) > 0) {
q = r;
r = r.right;
continue;
}
}
//如果当前的 level 结束了或者 cpr(cmp, key, k) <= 0 会达到此位置
//往低层递归,如果没有低层了,那么当前的 q 就是最合适的前驱节点
//整个循环只有这一个出口,无论如何最终都会从此处结束方法
if ((d = q.down) == null)
return q.node;
//否则向低层递归并重置 q 和 r
q = d;
r = d.right;
}
}
}
helpDelete
检测到某个节点属性值为null时,会调用这个方法来删除该节点。
通过将b.next指向f,完成节点n的删除。
/*
一般的调用形式如下:
n.helpDelete(b, f);
*/
void helpDelete(Node<K,V> b, Node<K,V> f) {
if (f == next && this == b.next) {
if (f == null || f.value != f)
casNext(f, new Node<K,V>(f));
else
b.casNext(this, f.next);
}
}
总结:对于并发的处理,基本都是双层for循环+CAS无锁式更新。如果遇到竞争失利将推出里层循环重新进行尝试。
remove的并发实现
public V remove(Object key) {
return doRemove(key, null);
}
remove方法会有一堆判断:根据给定的key和value会判断是否存在与key对应的一个节点,也会判断和待删节点相关的前后节点是否正在被删除,其次才是删除的三大步骤,核心步骤还是将待删节点的value属性赋null以标记该节点无用了。
final V doRemove(Object key, Object value) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
//找到 key 的前驱节点
//因为删除不单单是根据 key 找到对应的结点,然后赋 null 就完事的,还要负责链接该结点前后的关联
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
//目前 n 基本上就是我们要删除的结点,它为 null,那自然不用继续了,已经被删除了
if (n == null)
break outer;
Node<K,V> f = n.next;
//再次确认 n 还是不是 b 的后继结点,如果不是将退出里层循环重新进入
if (n != b.next)
break;
//如果有人正在删除 n,那么帮助它删除
if ((v = n.value) == null) {
n.helpDelete(b, f);
break;
}
//b 被删除了,重新定位前驱节点
if (b.value == null || v == n)
break;
//正常情况下,key 应该等于 n.key
//key 大于 n.key 说明我们要找的结点可能在 n 的后面,往后递归即可
//key 小于 n.key 说明 key 所代表的结点根本不存在
if ((c = cpr(cmp, key, n.key)) < 0)
break outer;
if (c > 0) {
b = n;
n = f;
continue;
}
//如果删除是根据键和值两个参数来删除的话,value 是不为 null 的
//这种情况下,如果 n 的 value 属性不等于我们传入的 value ,那么是不进行删除的
if (value != null && !value.equals(v))
break outer;
//下面三个步骤才是整个删除操作的核心,大致的逻辑我们也在上文提及过了,此处想必会容易理解些
//第一步,尝试将待删结点的 value 属性赋值 null,失败将退出重试
if (!n.casValue(v, null))
break;
//第二步和第三步如果有一步由于竞争失败,将调用 findNode 方法根据我们第一步的成果,也就是删除所有 value 为 null 的结点
if (!n.appendMarker(f) || !b.casNext(n, f))//casNext方法就是通过比较和设置b(前驱)的next节点的方式来实现删除操作
findNode(key); // 通过尝试findNode的方式继续find
//否则说明三个步骤都成功完成了
else {
findPredecessor(key, cmp);
//判断此次删除后是否导致某一索引层没有其他节点了,并适情况删除该层索引(如果head的right引用为空,则表示不存在该level)
if (head.right == null)
tryReduceLevel();
}
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
}
return null;
}
get的并发实现
public V get(Object key) {
return doGet(key);
}
private V doGet(Object key) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
//依然是双层循环来处理并发
outer: for (;;) {
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
//以下的一些判断的作用已经描述了多次,此处不再赘述了
if (n == null)
break outer;
Node<K,V> f = n.next;
if (n != b.next)
break;
if ((v = n.value) == null) {
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n)
break;
//c = 0 说明 n 就是我们要的结点
if ((c = cpr(cmp, key, n.key)) == 0) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
//c < 0 说明不存在这个 key 所对应的结点
if (c < 0)
break outer;
b = n;
n = f;
}
}
return null;
}