并发集合——ConcurrentSkipListMap 源码分析

2020年3月19日 | 作者 Siran | 9600字 | 阅读大约需要20分钟
归档于 并发编程 | 标签 #Map

问题

  • 什么是跳表?

  • ConcurrentSkipList是有序的吗?

  • ConcurrentSkipList是如何保证线程安全的?

  • ConcurrentSkipList插入、删除、查询元素的时间复杂度各是多少?

  • ConcurrentSkipList的索引具有什么特性?

  • 为什么Redis选择使用跳表而不是红黑树来实现有序集合?


简述

对于一个单链表来讲,即便链表中存储的数据是有序的,如果我们要想在其中查找某个数据,也只能从头到尾遍历链表,时间复杂度很高,是O(n)

如果下图中那样,对链表建立一级“索引”,每两个节点提取一个节点到上一级,我们把抽出来的那 一级叫作索引或索引层。图中的down表示down指针,指向下一级节点。 如果我们现在要查找某个节点,比如16

  • 我们可以先在最顶索引层遍历,当遍历到索引层中值为13的节点时

  • 我们发现下一个节点是17,那要查找的节点16肯定就在 这两个节点之间

  • 然后我们通过索引层节点的down指针,下降到原始链表这一层,继续遍历。这个时候,我们只需要再遍历2个节点,就可以找到值等于16的这个节点了。

  • 这样,原来如果要查找16,需要遍历10个节点,现在只需要遍历7个节点。

  • 我们在第一级索引的基础之上,每两个节点就抽出一个节点到第二级索引。现在我们再来查找16,只需要遍历6个节点了,需要遍历的节点数量又减少了。


源码分析

Map 类图

  • 继承 AbstractMap

  • 实现 ConcurrentNavigableMap


重要的内部类

//数据节点,典型的单链表结构
static final class Node<K,V> {
        final K key;

        // 注意:这里value的类型是Object,而不是V
        // 在删除元素的时候value会指向当前元素本身
        volatile Object value;
        volatile Node<K,V> next;

 
        Node(K key, Object value, Node<K,V> next) {
            this.key = key;
            this.value = value;
            this.next = next;
        }

        Node(Node<K,V> next) {
            this.key = null;
            this.value = this;  // 当前元素本身(marker)
            this.next = next;
        }
}

// 索引节点,存储着对应的Node值,及向下和向右的索引指针
static class Index<K,V> {
        final Node<K,V> node;
        final Index<K,V> down;
        volatile Index<K,V> right;

        Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
            this.node = node;
            this.down = down;
            this.right = right;
        }
}

// 头索引节点,继承自Index,并扩展一个level字段,用于记录索引的层级
static final class HeadIndex<K,V> extends Index<K,V> {
        final int level;
        HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
            super(node, down, right);
            this.level = level;
        }
    }
  • Node:数据节点,存储数据的节点,典型的单链表结构
  • Index:索引节点,存储着对应的node值,及向下和向右的索引指针
  • HeadIndex:头索引节点,继承自Index,并扩展一个level字段,用于记录索引的层级

构造方法

public ConcurrentSkipListMap() {
        this.comparator = null;
        initialize();
    }

public ConcurrentSkipListMap(Comparator<? super K> comparator) {
        this.comparator = comparator;
        initialize();
    }

public ConcurrentSkipListMap(Map<? extends K, ? extends V> m) {
        this.comparator = null;
        initialize();
        putAll(m);
    }

public ConcurrentSkipListMap(SortedMap<K, ? extends V> m) {
        this.comparator = m.comparator();
        initialize();
        buildFromSorted(m);
    }

private static final Object BASE_HEADER = new Object();

//构造方法都调用了此方法
private void initialize() {
        keySet = null;
        entrySet = null;
        values = null;
        descendingMap = null;
        head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
                                  null, null, 1);
    }

在构造方法中都调用了initialize()方法

  • 初始化了一些属性,并创建了一个头索引节点,里面存储着一个数据节点,这个数据节点的值是空对象,且它的层级是1

  • 所以,初始化的时候,跳表中只有一个头索引节点,层级是1,数据节点是一个空对象downright都是null


put方法

添加元素

public V put(K key, V value) {
        // 不能存储value为null的元素
        // 因为value为null标记该元素被删除(后面会看到)
        if (value == null)
            throw new NullPointerException();
        //调用doPut()方法添加元素
        return doPut(key, value, false);
    }

private V doPut(K key, V value, boolean onlyIfAbsent) {
        //添加元素后存储在z中
        Node<K,V> z;             // added node
        // key也不能为null
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        //Part I:找到目标节点的位置并插入 这里的目标节点是数据节点,也就是最底层的那条链 自旋
        outer: for (;;) {
            // 寻找目标节点之前最近的一个索引对应的数据节点,存储在b中,b=before
            // 并把b的下一个数据节点存储在n中,n=next
            // 为了便于描述,我这里把b叫做当前节点,n叫做下一个节点
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                //如果下一个节点不为空
                //就拿其key与目标节点的key比较,找到目标节点应该插入的位置
                if (n != null) {
                    //v=value,存储节点value值
                    //c=compare,存储两个节点比较的大小
                    Object v;
                    int c;
                    //n的下一个数据节点,也就是b的下一个节点的下一个节点(孙子节点)
                    Node<K,V> f = n.next;
                    //如果n不为b的下一个节点 说明有其它线程修改了数据,则跳出内层循环 也就是回到了外层循环自旋的位置,从头来过
                    if (n != b.next)               // inconsistent read
                        break;
                    //如果n的value值为空,说明该节点已删除
                    if ((v = n.value) == null) {   // n is deleted
                        //协助删除节点
                        n.helpDelete(b, f);
                        break;
                    }
                    //如果b的值为空或者v等于n,说明b已被删除
                    //这时候n就是marker节点,那b就是被删除的那个
                    if (b.value == null || v == n) // b is deleted
                        break;
                    //如果目标key与下一个节点的key大
                    //说明目标元素所在的位置还在下一个节点的后面
                    if ((c = cpr(cmp, key, n.key)) > 0) {
                       // 就把当前节点往后移一位
                       // 同样的下一个节点也往后移一位
                       // 再重新检查新n是否为空,它与目标key的关系
                        b = n;
                        n = f;
                        continue;
                    }
                    //如果比较时发现下一个节点的key与目标key相同
                    //说明链表中本身就存在目标节点
                    if (c == 0) {
                        //则用新值替换旧值,并返回旧值(onlyIfAbsent=false)
                        if (onlyIfAbsent || n.casValue(v, value)) {
                            @SuppressWarnings("unchecked") V vv = (V)v;
                            return vv;
                        }
                        //如果替换旧值时失败,说明其它线程先一步修改了值,从头来过
                        break; // restart if lost race to replace value
                    }
                    // else c < 0; fall through
                }
                // 有两种情况会到这里
                // 一是到链表尾部了,也就是n为null了
                // 二是找到了目标节点的位置,也就是上面的c<0
                // 新建目标节点,并赋值给z
                // 这里把n作为新节点的next
                // 如果到链表尾部了,n为null,这毫无疑问
                // 如果c<0,则n的key比目标key大,相妆于在b和n之间插入目标节点z
                z = new Node<K,V>(key, value, n);
                //原子更新b的下一个节点为目标节点z
                if (!b.casNext(n, z))
                    //如果更新失败,说明其它线程先一步修改了值,从头来过
                    break;         // restart if lost race to append to b
                //如果更新成功,跳出自旋状态
                break outer;
            }
        }
        //经过Part I,目标节点已经插入到有序链表中了
        //Part II:随机决定是否需要建立索引及其层次,如果需要则建立自上而下的索引
        //取个随机数
        int rnd = ThreadLocalRandom.nextSecondarySeed();
        // 0x80000001展开为二进制为10000000000000000000000000000001
        // 只有两头是1
        // 这里(rnd & 0x80000001) == 0
        // 相当于排除了负数(负数最高位是1),排除了奇数(奇数最低位是1)
        // 只有最高位最低位都不为1的数跟0x80000001做&操作才会为0
        // 也就是正偶数
        if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
            //默认level为1,也就是只要到这里了就会至少建立一层索引
            int level = 1, max;
            //随机数从最低位的第二位开始,有几个连续的1则level就加几
            //因为最低位肯定是0,正偶数嘛
            //比如,1100110,level就加2
            while (((rnd >>>= 1) & 1) != 0)
                ++level;
            //用于记录目标节点建立的最高的那层索引节点
            Index<K,V> idx = null;
            //取头索引节点(这是最高层的头索引节点)
            HeadIndex<K,V> h = head;
            //如果生成的层数小于等于当前最高层的层级
            //也就是跳表的高度不会超过现有高度
            if (level <= (max = h.level)) {
                //从第一层开始建立一条竖直的索引链表
                //这条链表使用down指针连接起来
                //每个索引节点里面都存储着目标节点这个数据节点
                //最后idx存储的是这条索引链表的最高层节点
                for (int i = 1; i <= level; ++i)
                    idx = new Index<K,V>(z, idx, null);
            }
            else { // try to grow by one level
                //如果新的层数超过了现有跳表的高度  则最多只增加一层
                //比如现在只有一层索引,那下一次最多增加到两层索引,增加多了也没有意义
                level = max + 1; // hold in array and later pick the one to use
                // idxs用于存储目标节点建立的竖起索引的所有索引节点
                // 其实这里直接使用idx这个最高节点也是可以完成的
                // 只是用一个数组存储所有节点要方便一些
                // 注意,这里数组0号位是没有使用的
                @SuppressWarnings("unchecked")Index<K,V>[] idxs =
                    (Index<K,V>[])new Index<?,?>[level+1];
                // 从第一层开始建立一条竖的索引链表(跟上面一样,只是这里顺便把索引节点放到数组里面了)
                for (int i = 1; i <= level; ++i)
                    idxs[i] = idx = new Index<K,V>(z, idx, null);
                //自旋
                for (;;) {
                    //旧的最高层头索引节点
                    h = head;
                    //旧的最高层级
                    int oldLevel = h.level;
                    //再次检查,如果旧的最高层级已经不比新层级矮了
                    //说明有其它线程先一步修改了值,从头来过
                    if (level <= oldLevel) // lost race to add level
                        break;
                    //新的最高层头索引节点
                    HeadIndex<K,V> newh = h;
                    //头节点指向的数据节点
                    Node<K,V> oldbase = h.node;
                    //超出的部分建立新的头索引节点
                    for (int j = oldLevel+1; j <= level; ++j)
                        newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                    //原子更新头索引节点
                    if (casHead(h, newh)) {
                        //h指向新的最高层头索引节点
                        h = newh;
                        //把level赋值为旧的最高层级的
                        //idx指向的不是最高的索引节点了
                        //而是与旧最高层平齐的索引节点
                        idx = idxs[level = oldLevel];
                        break;
                    }
                }
            }
            // find insertion points and splice in
            // 经过上面的步骤,有两种情况
            // 一是没有超出高度,新建一条目标节点的索引节点链
            // 二是超出了高度,新建一条目标节点的索引节点链,同时最高层头索引节点同样往上长
            // Part III:将新建的索引节点(包含头索引节点)与其它索引节点通过右指针连接在一起
            // 这时level是等于旧的最高层级的,自旋
            splice: for (int insertionLevel = level;;) {
                //h为最高头索引节点
                int j = h.level;
                //从头索引节点开始遍历
                //为了方便,这里叫q为当前节点,r为右节点,d为下节点,t为目标节点相应层级的索引
                for (Index<K,V> q = h, r = q.right, t = idx;;) {
                    //如果遍历到了最右边,或者最下边,
                    //也就是遍历到头了,则退出外层循环
                    if (q == null || t == null)
                        break splice;
                    //如果右节点不为空
                    if (r != null) {
                        //n是右节点的数据节点,为了方便,这里直接叫右节点的值
                        Node<K,V> n = r.node;
                        // compare before deletion check avoids needing recheck
                        //比较目标key与右节点的值
                        int c = cpr(cmp, key, n.key);
                        //如果右节点的值为空了,则表示此节点已删除
                        if (n.value == null) {
                            //则把右节点删除
                            if (!q.unlink(r))
                                //如果删除失败,说明有其它线程先一步修改了,从头来过
                                break;
                            //删除成功后重新取右节点
                            r = q.right;
                            continue;
                        }
                        //如果比较c>0,表示目标节点还要往右
                        if (c > 0) {
                            //则把当前节点和右节点分别右移
                            q = r;
                            r = r.right;
                            continue;
                        }
                    }

                    //到这里说明已经到当前层级的最右边了 这里实际是会先走第二个if
                    //第一个if j与insertionLevel相等了
                    //实际是先走的第二个if,j自减后应该与insertionLevel相等
                    if (j == insertionLevel) {
                        //这里是真正连右指针的地方
                        if (!q.link(r, t))
                            //连接失败,从头来过
                            break; // restart
                        //t节点的值为空,可能是其它线程删除了这个元素
                        if (t.node.value == null) {
                            //这里会去协助删除元素
                            findNode(key);
                            break splice;
                        }
                        //当前层级右指针连接完毕,向下移一层继续连接
                        //如果移到了最下面一层,则说明都连接完成了,退出外层循环
                        if (--insertionLevel == 0)
                            break splice;
                    }
                    //第二个if
                    //j先自减1,再与两个level比较
                    //j、insertionLevel和t(idx)三者是对应的,都是还未把右指针连好的那个层级
                    if (--j >= insertionLevel && j < level)
                        // t往下移
                        t = t.down;
                    //当前层级到最右边了
                    //那只能往下一层级去走了
                    //当前节点下移
                    //再取相应的右节点
                    q = q.down;
                    r = q.right;
                }
            }
        }
        return null;
    }

private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
        //key不能为空
        if (key == null)
            throw new NullPointerException(); // don't postpone errors
        for (;;) {
            //从最高层头索引节点开始查找,先向右,再向下
            //直到找到目标位置之前的那个索引
            for (Index<K,V> q = head, r = q.right, d;;) {
                //如果右节点不为空
                if (r != null) {
                    //右节点对应的数据节点,为了方便,我们叫右节点的值
                    Node<K,V> n = r.node;
                    K k = n.key;
                    //如果右节点的value为空  说明其它线程把这个节点标记为删除了    则协助删除
                    if (n.value == null) {
                        if (!q.unlink(r))
                            //如果删除失败  说明其它线程先删除了,从头来过
                            break;           // restart
                        //删除之后重新读取右节点
                        r = q.right;         // reread r
                        continue;
                    }
                    //如果目标key比右节点还大,继续向右寻找
                    if (cpr(cmp, key, k) > 0) {
                        //往右移
                        q = r;
                        //重新取右节点
                        r = r.right;
                        continue;
                    }
                    //如果c<0,说明不能再往右
                }
                //到这里说明当前层级已经到最右了
                //两种情况:一是r==null,二是c<0
                //再从下一级开始找
                //如果没有下一级了,就返回这个索引对应的数据节点
                if ((d = q.down) == null)
                    return q.node;
                //往下移
                q = d;
                //重新取右节点
                r = d.right;
            }
        }
    }

//协助删除元素 Node.class中的方法
void helpDelete(Node<K,V> b, Node<K,V> f) {
            /*
             * Rechecking links and then doing only one of the
             * help-out stages per call tends to minimize CAS
             * interference among helping threads.
             */
            //这里的调用者this==n,三者关系是b->n->f
            if (f == next && this == b.next) {
                //将n的值设置为null后,会先把n的下个节点设置为marker节点
                //这个marker节点的值是它自己
                //这里如果不是它自己说明marker失败了,重新marker
                if (f == null || f.value != f) // not already marked
                    casNext(f, new Node<K,V>(f));
                else
                    //marker过了,就把b的下个节点指向marker的下个节点
                    b.casNext(this, f.next);
            }
        }

//Index.class中的方法,删除succ节点
final boolean unlink(Index<K,V> succ) {
            //原子更新当前节点指向下一个节点的下一个节点  也就是删除下一个节点
            return node.value != null && casRight(succ, succ.right);
        }


//Index.class中的方法,在当前节点与succ之间插入newSucc节点
final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
            //在当前节点与下一个节点中间插入一个节点
            Node<K,V> n = node;
            //新节点指向当前节点的下一个节点
            newSucc.right = succ;
            //原子更新当前节点的下一个节点指向新节点
            return n.value != null && casRight(succ, newSucc);
        }

把整个插入过程分成三个部分

  • 第一部分找到需要插入元素的位置,并且插入

    • 寻找目标节点之前最近的一个索引对应的数据节点(数据节点都是在最底层的链表上)

    • 从这个数据节点开始往后遍历,直到找到目标节点应该插入的位置

    • 如果这个位置有元素,就更新其值(onlyIfAbsent=false)

    • 如果这个位置没有元素,就把目标节点插入

    • 至此,目标节点已经插入到最底层的数据节点链表中了

  • 第二部分随机决定是否需要建立索引及其层次,如果需要则建立自上而下的索引

    • 取个随机数rnd,计算(rnd & 0x80000001);

    • 如果不等于0,结束插入过程,也就是不需要创建索引,返回

    • 如果等于0,才进入创建索引的过程(只要正偶数才会等于0)

    • 计算 while(((rnd>>>=1)&1)!=0),决定层级数,level 从 1 开始

    • 如果算出来的层级不高于现有最高层级,则直接建立一条竖直的索引链表(只有down有值),并结束Part II

    • 如果算出来的层级高于现有最高层级,则新的层级只能比现有最高层级多1

    • 同样建立一条竖直的索引链表(只有down有值)

    • 将头索引也向上增加到相应的高度,结束Part II

    • 也就是说,如果层级不超过现有高度,只建立一条索引链,否则还要额外增加头索引链的高度

  • 第三部分:将新建的索引节点(包含头索引节点)与其它索引节点通过右指针连接在一起(补上right指针)

    • (1) 从最高层级的头索引节点开始,向右遍历,找到目标索引节点的位置;

    • (2) 如果当前层有目标索引,则把目标索引插入到这个位置,并把目标索引前一个索引向下移一个层级

    • (3) 如果当前层没有目标索引,则把目标索引位置前一个索引向下移一个层级

    • (4) 同样地,再向右遍历,寻找新的层级中目标索引的位置,回到第(2)步

    • (5) 依次循环找到所有层级目标索引的位置并把它们插入到横向的索引链表中;


添加元素举例

假设初始链表是这样 假如,我们现在要插入一个元素9。 (1)寻找目标节点之前最近的一个索引对应的数据节点,在这里也就是找到了5这个数据节点;

(2)从5开始向后遍历,找到目标节点的位置,也就是在8和12之间;

(3)插入9这个元素,Part I 结束;

然后,计算其索引层级,假如是3,也就是level=3

(1)建立竖直的down索引链表;

(2)超过了现有高度2,还要再增加head索引链的高度;

(3)至此,Part II 结束;

最后,把right指针补齐。

(1)从第3层的head往右找当前层级目标索引的位置;

(2)找到就把目标索引和它前面索引的right指针连上,这里前一个正好是head;

(3)然后前一个索引向下移,这里就是head下移;

(4)再往右找目标索引的位置;

(5)找到了就把right指针连上,这里前一个是3的索引;

(6)然后3的索引下移;

(7)再往右找目标索引的位置;

(8)找到了就把right指针连上,这里前一个是5的索引;

(9)然后5下移,到底了,Part III 结束,整个插入过程结束;


remove 方法

删除元素

public V remove(Object key) {
        return doRemove(key, null);
    }

final V doRemove(Object key, Object value) {
        //key不为空
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            //寻找目标节点之前的最近的索引节点对应的数据节点
            //为了方便,这里叫b为当前节点,n为下一个节点,f为下下个节点
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                //整个链表都遍历完了也没找到目标节点,退出外层循环
                if (n == null)
                    break outer;
                //下下个节点
                Node<K,V> f = n.next;
                //再次检查
                //如果n不是b的下一个节点了
                //说明有其它线程先一步修改了,从头来过
                if (n != b.next)                    // inconsistent read
                    break;
                //如果下个节点的值奕为null了
                //说明有其它线程标记该元素为删除状态了
                if ((v = n.value) == null) {        // n is deleted
                    //协助删除
                    n.helpDelete(b, f);
                    break;
                }
                //如果b的值为空或者v等于n,说明b已被删除
                //这时候n就是marker节点,那b就是被删除的那个
                if (b.value == null || v == n)      // b is deleted
                    break;
                //如果c<0,说明没找到元素,退出外层循环
                if ((c = cpr(cmp, key, n.key)) < 0)
                    break outer;
                //如果c>0,说明还没找到,继续向右找
                if (c > 0) {
                    //当前节点往后移
                    b = n;
                    //下一个节点往后移
                    n = f;
                    continue;
                }
                //c=0,说明n就是要找的元素
                //如果value不为空且不等于找到元素的value,不需要删除,退出外层循环
                if (value != null && !value.equals(v))
                    break outer;
                //如果value为空,或者相等  原子标记n的value值为空
                if (!n.casValue(v, null))
                    //如果删除失败,说明其它线程先一步修改了,从头来过
                    break;
                // P.S.到了这里n的值肯定是设置成null了
                // 关键!!!!
                // 让n的下一个节点指向一个market节点
                // 这个market节点的key为null,value为marker自己,next为n的下个节点f
                // 或者让b的下一个节点指向下下个节点
                // 注意:这里是或者||,因为两个CAS不能保证都成功,只能一个一个去尝试
                // 这里有两层意思:
                // 一是如果标记market成功,再尝试将b的下个节点指向下下个节点,如果第二步失败了,进入条件,如果成功了就不用进入条件了
                // 二是如果标记market失败了,直接进入条件
                if (!n.appendMarker(f) || !b.casNext(n, f))
                    // 通过findNode()重试删除(里面有个helpDelete()方法)
                    findNode(key);                  // retry via findNode
                else {
                    //上面两步操作都成功了,才会进入这里,不太好理解,上面两个条件都有非"!"操作
                    //说明节点已经删除了,通过findPredecessor()方法删除索引节点
                    //findPredecessor()里面有unlink()操作
                    findPredecessor(key, cmp);      // clean index
                    //如果最高层头索引节点没有右节点,则跳表的高度降级
                    if (head.right == null)
                        tryReduceLevel();
                }
                //返回删除的元素值
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
        }
        return null;
    }

(1)寻找目标节点之前最近的一个索引对应的数据节点(数据节点都是在最底层的链表上);

(2)从这个数据节点开始往后遍历,直到找到目标节点的位置;

(3)如果这个位置没有元素,直接返回null,表示没有要删除的元素;

(4)如果这个位置有元素,先通过 n.casValue(v,null)原子更新把其value设置为null;

(5)通过 n.appendMarker(f)在当前元素后面添加一个marker元素标记当前元素是要删除的元素;

(6)通过 b.casNext(n,f)尝试删除元素;

(7)如果上面两步中的任意一步失败了都通过 findNode(key)中的 n.helpDelete(b,f)再去不断尝试删除;

(8)如果上面两步都成功了,再通过 findPredecessor(key,cmp)中的 q.unlink(r)删除索引节点;

(9)如果head的right指针指向了null,则跳表高度降级;


删除元素举例

假如初始跳表如下图所示,我们要删除9这个元素。 (1)找到9这个数据节点;

(2)把9这个节点的value值设置为null;

(3)在9后面添加一个marker节点,标记9已经删除了;

(4)让8指向12;

(5)把索引节点与它前一个索引的right断开联系;

(6)跳表高度降级;

至于,为什么要有(2)(3)(4)这么多步骤呢,因为多线程下如果直接让8指向12,如果其它线程先一步在9和12间插入了一个元素10呢,这时候就不对了。

所以这里搞了三步来保证多线程下操作的正确性。

如果第(2)步失败了,则直接重试;

如果第(3)或(4)步失败了,因为第(2)步是成功的,则通过helpDelete()不断重试去删除;

其实helpDelete()里面也是不断地重试(3)和(4);

只有这三步都正确完成了,才能说明这个元素彻底被删除了。


get方法

获取元素

public V get(Object key) {
        return doGet(key);
    }

private V doGet(Object key) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            //寻找目标节点之前最近的索引对应的数据节点
            //为了方便,这里叫b为当前节点,n为下个节点,f为下下个节点
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                //如果链表到头还没找到元素,则跳出外层循环
                Object v;
                int c;
                if (n == null)
                    break outer;
                //下下个节点
                Node<K,V> f = n.next;
                //如果不一致读,从头来过
                if (n != b.next)                // inconsistent read
                    break;
                //如果n的值为空,说明节点已被其它线程标记为删除
                if ((v = n.value) == null) {    // n is deleted
                    //协助删除,再重试
                    n.helpDelete(b, f);
                    break;
                }
                //如果b的值为空或者v等于n,说明b已被删除
                //这时候n就是marker节点,那b就是被删除的那个
                if (b.value == null || v == n)  // b is deleted
                    break;
                // 如果c==0,说明找到了元素,就返回元素值
                if ((c = cpr(cmp, key, n.key)) == 0) {
                    @SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
                //如果c<0,说明没找到元素
                if (c < 0)
                    break outer;
                //如果c>0,说明还没找到,继续寻找 当前节点往后移
                b = n;
                //下一个节点往后移
                n = f;
            }
        }
        return null;
    }

(1)寻找目标节点之前最近的一个索引对应的数据节点(数据节点都是在最底层的链表上);

(2)从这个数据节点开始往后遍历,直到找到目标节点的位置;

(3)如果这个位置没有元素,直接返回null,表示没有找到元素;

(4)如果这个位置有元素,返回元素的value值;


总结

  • 跳表是可以实现二分查找的有序链表

  • 每个元素的插入时会随机生成一个level,用于维持平衡,避免复杂度退化以及查找、插入、删除操作性能下降

  • 最低层包含所有的元素(level=0)

  • 如果一个元素出现在level(x),那么它肯定出现在x以下的所有level中

  • 每个索引节点包含两个指针,一个向下,一个向右

  • 跳表查询、插入、删除的时间复杂度为O(log n),与平衡二叉树接近

  • Redis选择使用跳表而不是红黑树来实现有序集合

    • Redis中的有序集合支持 插入、删除、查找、有序输出所有元素、查找区间内所有元素。
    • 插入、删除、查找、有序输出所有元素,这几个操作红黑树也可以完成,时间复杂度和跳表一样
    • 但是查找区间内所有的元素,红黑树的效率没有跳表来的高,跳表可以做到O(logn)的时间复杂度定位区间的起点,然后在原始链表中顺序往后遍历就可以了。

根据上面的总结手写一个SkipList

public class SkipList {
    //跳表中 层级最高16层
    private final static int MAX_LEVEL = 16;
    //当前层数
    private int levelCount = 1;
    //头节点
    private Node head = new Node(MAX_LEVEL);

    class Node {
        private int data = -1;

        /**
         * 表示当前节点位置的下一个节点所有层的数据,从上层切换到下层,就是数组下标-1,
         * forwards[3]表示当前节点在第三层的下一个节点。
         */
        private Node forwards[];

        public Node(int level) {
            forwards = new Node[level];
        }

        @Override
        public String toString() {
            StringBuilder builder = new StringBuilder();
            builder.append("{ data: ");
            builder.append(data);
            builder.append("; levels: ");
            builder.append(" }");
            return builder.toString();
        }
    }

    //获取元素的方法
    public Node find(int value) {
        Node p = head;
        //从最当前跳表中最高层往下开始遍历
        for (int i = levelCount - 1; i >= 0; i--) {
            //找到在当前层要获取元素的前一个的位置
            while (p.forwards[i] != null && p.forwards[i].data < value) {
                p = p.forwards[i];
            }
        }
        //找到直接返回
        if (p.forwards[0] != null && p.forwards[0].data == value)
            return p.forwards[0];
        else {
            return null;
        }
    }

    //插入一个元素
    public void insert(int value) {
        int level = head.forwards[0] == null ? 1 : randomLevel();
        // 每次只增加一层,如果条件满足
        if (level > levelCount) {
            level = ++levelCount;
        }
        Node newNode = new Node(level);
        newNode.data = value;
        Node p = head;
        // 从最大层开始查找,找到前一节点,通过--i,移动到下层再开始查找
        for (int i = levelCount - 1; i >= 0; --i) {
            while (p.forwards[i] != null && p.forwards[i].data < value) {
                // 找到前一节点
                p = p.forwards[i];
            }
            // levelCount 会 > level,所以加上判断
            if (level > i) {
                if (p.forwards[i] == null) {
                    p.forwards[i] = newNode;
                } else {
                    Node next = p.forwards[i];
                    p.forwards[i] = newNode;
                    newNode.forwards[i] = next;
                }
            }

        }
    }

    //删除操作
    public void delete(int value) {
        Node[] update = new Node[levelCount];
        Node p = head;
        //从最当前跳表中最高层往下开始遍历
        for (int i = levelCount - 1; i >= 0; i--) {
            while (p.forwards[i] != null && p.forwards[i].data < value){
                p = p.forwards[i];
            }
            update[i] = p;
        }
        //找到并删除该节点
        if(p.forwards[0] != null && p.forwards[0].data == value){
            for (int i = levelCount - 1; i >= 0 ; i--) {
                if(update[i].forwards[i] != null && update[i].forwards[i].data == value){
                    update[i].forwards[i] = update[i].forwards[i].forwards[i];
                }
            }
        }
    }

    private int randomLevel() {
        Random r = new Random();
        int level = 1;
        for (int i = 1; i < MAX_LEVEL; ++i) {
            if (r.nextInt() % 2 == 1) {
                level++;
            }
        }
        return level;
    }

    /**
     * 打印所有数据
     */
    public void printAll_beautiful() {
        Node p = head;
        Node[] c = p.forwards;
        Node[] d = c;
        int maxLevel = c.length;
        for (int i = maxLevel - 1; i >= 0; i--) {
            do {
                System.out.print((d[i] != null ? d[i].data : null) + ":" + i + "-------");
            } while (d[i] != null && (d = d[i].forwards)[i] != null);
            System.out.println();
            d = c;
        }
    }

    public static void main(String[] args) {
        SkipList list = new SkipList();

        list.insert(1);
        list.insert(2);
        list.insert(6);
        list.insert(7);
        list.insert(8);
        list.insert(3);
        list.insert(4);
        list.insert(5);
        System.out.println();
        list.printAll_beautiful();
    }
}

参考