JDK并发集合8:StampedLock(同步工具)

之前的ReentrantReadWriteLock读写锁比ReentrantLock的并发度高,但是读的时候还是需要加锁,即悲观锁。我们来看一种乐观锁的实现,可以进一步提高并发度。

乐观读

StampedLock具有三种模式:写模式、读模式、乐观读模式。它比ReentrantReadWriteLock多了一种“乐观读”的模式:

  • 读时不加锁
  • 使用的时候发现数据被修改再升级为“悲观读”
  • 相当于降低了“读”的地位,升高了“写”

我们来看使用的例子:

public class Point {
    private double x, y;
    private final StampedLock sl = new StampedLock();

    void move(double deltaX, double deltaY) {
        // 获取写锁,返回一个版本号(戳)
        long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            // 释放写锁,需要传入上面获取的版本号
            sl.unlockWrite(stamp);
        }
    }

    double distanceFromOrigin() {
        // 乐观读
        long stamp = sl.tryOptimisticRead();
        double currentX = x, currentY = y;
        // 验证版本号是否有变化
        if (!sl.validate(stamp)) {
            // 版本号变了,乐观读转悲观读
            stamp = sl.readLock();
            try {
                // 重新读取x、y的值
                currentX = x;
                currentY = y;
            } finally {
                // 释放读锁,需要传入上面获取的版本号
                sl.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

    void moveIfAtOrigin(double newX, double newY) {
        // 获取悲观读锁
        long stamp = sl.readLock();
        try {
            while (x == 0.0 && y == 0.0) {
                // 转为写锁
                long ws = sl.tryConvertToWriteLock(stamp);
                // 转换成功
                if (ws != 0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                }
                else {
                    // 转换失败
                    sl.unlockRead(stamp);
                    // 获取写锁
                    stamp = sl.writeLock();
                }
            }
        } finally {
            // 释放锁
            sl.unlock(stamp);
        }
    }
}

从这个例子我们看出来StampedLock的特性:

  1. 写锁和ReentrantReadWriteLock使用方式一样;
  2. 读锁加了一种“乐观读”模式,读取后要使用的时候可以通过validate验证是否数据已经改变,如果改变了则升级为悲观读;
  3. 悲观读可以通过tryConvertToWriteLock转换为写锁,这在ReentrantReadWriteLock中是不能实现的;
  4. 我们后面会看到,乐观读的情况下是解决了ABA问题的,如果有其他线程在过程中获取了写锁又释放,则版本号会不一致,还是需要获取悲观读锁。

属性和方法

StampedLock没有组合AQS,而是自己实现了state和队列。

//state的高24位存储的是版本号,低8位存储的是是否有加锁,第8位存储的是写锁,低7位存储的是读锁被获取的次数,
// 因为只有第8位存储写锁的话,那么写锁只能被获取一次,也就不可能重入了。
private transient volatile long state;

state

StampedLock自己有一个state用于存储同步状态,它包含了写锁、读锁以及数据的版本号,版本号是用于解决ABA问题的。
StampedLock-state

tryOptimisticRead

tryOptimisticRead返回高25位的值,即包括了版本号以及写锁的位置。

// 读线程的个数占有低7位
private static final int LG_READERS = 7;
// 写线程个数所在的位置
private static final long WBIT  = 1L << LG_READERS;//第八位表示写锁,写锁只有一个bit,不可重入
// 读线程个数所在的位置
private static final long RBITS = WBIT - 1L;//最低的七位表示读锁
// 读线程个数的反数,高25位全部为1
private static final long SBITS = ~RBITS; // note overlap with ABITS
    
public long tryOptimisticRead() {
    long s;
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

validate

validate的时候只要对比版本好和写锁跟一开始是不是一致就行了,并不需要关心读锁是否有被其他线程获取。

另外要说明的一点是,上面使用了内存屏障U.loadFence(),是因为在这行代码的下一行里面的stamp、SBITS变量不是volatile的,由此可以禁止其和前面的currentX=X,currentY=Y进行重排序。

public boolean validate(long stamp) {
    // 强制加入内存屏障,刷新数据
    U.loadFence();
    return (stamp & SBITS) == (state & SBITS);
}

结合来看,如果有个线程一开始获取了“乐观读”锁,那么在validate的时候:

  • 写锁被占有,validate为false
  • 读的过程中有其他线程占有写锁,写位变了,validate为false
  • 读的过程中有其他线程占有写锁又释放,版本号+1,validate为false
  • 读的过程中其他线程只占有读锁,validate为true

加解锁流程

在AQS中,当一个线程获取不到锁,则会立即加入阻塞队列,并且进入阻塞状态。但是在StampedLock中,会不断自旋,自旋到一定次数还获取不到锁的话才加入阻塞队列进入阻塞状态,其自旋次数定义如下:单核则不自旋

/** Number of processors, for spin control */
private static final int NCPU = Runtime.getRuntime().availableProcessors();

/** Maximum number of retries before enqueuing on acquisition */
private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;

/** Maximum number of retries before blocking at head on acquisition */
private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;

/** Maximum number of retries before re-blocking */
private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;

写锁

获取写锁

线程如果获取不到写锁,则会进入acquireWrite方法,进入阻塞队列,并进行自旋,如果自旋到一定次数还不能获取锁,则阻塞。

public long writeLock() {
    long s, next;  // bypass acquireWrite in fully unlocked case only
// ABITS = 255 = 1111 1111
// WBITS = 128 = 1000 0000
// state与ABITS如果等于0,尝试原子更新state的值加WBITS
// 如果成功则返回更新的值,如果失败调用acquireWrite()方法
    return ((((s = state) & ABITS) == 0L &&
             U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
            next : acquireWrite(false, 0L));//acquireWrite进入阻塞队列,并进行自旋
}

private long acquireWrite(boolean interruptible, long deadline) {
// node为新增节点,p为尾节点(即将成为node的前置节点)
    WNode node = null, p;
    // 第一次自旋——入队
    //(1)如果头节点等于尾节点,说明没有其它线程排队,那就多自旋一会,看能不能尝试获取到写锁;
    //(2)否则,自旋次数为0,直接让其入队;
    for (int spins = -1;;) { // spin while enqueuing 入队时自旋
        long m, s, ns;
        if ((m = (s = state) & ABITS) == 0L) {
            if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                return ns;//自旋的时候拿到锁了,函数返回
        }
// 如果自旋次数小于0,则计算自旋的次数
// 如果当前有写锁独占且队列无元素,说明快轮到自己了
// 就自旋就行了,如果自旋完了还没轮到自己才入队
// 则自旋次数为SPINS常量
// 否则自旋次数为0
        else if (spins < 0)
            spins = (m == WBIT && wtail == whead) ? SPINS : 0;
        else if (spins > 0) {
// 当自旋次数大于0时,当前这次自旋随机减一次自旋次数
            if (LockSupport.nextSecondarySeed() >= 0)//生成随机数
                --spins;//不断自旋,以一定的概率把spins值往下递减
        }
        else if ((p = wtail) == null) { // initialize queue
// 如果队列未初始化,新建一个空节点并初始化头节点和尾节点
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        }
        else if (node == null)
// 如果新增节点还未初始化,则新建之,并赋值其前置节点为尾节点
            node = new WNode(WMODE, p);
        else if (node.prev != p)
// 如果尾节点有变化,则更新新增节点的前置节点为新的尾节点
            node.prev = p;
        else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
// 尝试更新新增节点为新的尾节点成功,成功则退出循环
            p.next = node;
            break;//for循环唯一的break,CAS tail成功(成功加入队列尾部),才会退出for循环
        }
    }

    // 第二次自旋——阻塞并等待唤醒
    //(1)第三段自旋在第二段自旋内部;
    //(2)如果头节点等于前置节点,那就进入第三段自旋,不断尝试获取写锁;
    //(3)否则,尝试唤醒头节点中等待着的读线程;
    //(4)最后,如果当前线程一直都没有获取到写锁,就阻塞当前线程并等待被唤醒;
    for (int spins = -1;;) {
// h为头节点,np为新增节点的前置节点,pp为前前置节点,ps为前置节点的状态
        WNode h, np, pp; int ps;
        //如果发现自己刚好也在队列头部,说明队列中除了空的Head节点,就是当前线程了。
        // 此时,再进行新一轮的自旋,直到达到MAX_HEAD_SPINS次数,然后进入阻塞。
        if ((h = whead) == p) {// 如果头节点等于前置节点,说明快轮到自己了
            if (spins < 0)
// 初始化自旋次数
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
// 增加自旋次数
                spins <<= 1;
// 第三次自旋,不断尝试获取写锁
            for (int k = spins;;) { // spin at head
                long s, ns;
                if (((s = state) & ABITS) == 0L) {//再次尝试拿锁
                    if (U.compareAndSwapLong(this, STATE, s,
                                             ns = s + WBIT)) {
// 尝试获取写锁成功,将node设置为新头节点并清除其前置节点(gc)
                        whead = node;
                        node.prev = null;
                        return ns;
                    }
                }
// 随机立减自旋次数,当自旋次数减为0时跳出循环再重试
                else if (LockSupport.nextSecondarySeed() >= 0 &&
                         --k <= 0)//不断自旋
                    break;
            }
        }
        else if (h != null) { // help release stale waiters 用于协助唤醒读节点的
// 这段代码很难进来,是用于协助唤醒读节点的
// 我是这么调试进来的:
// 起三个写线程,两个读线程
// 写线程1获取锁不要释放
// 读线程1获取锁,读线程2获取锁(会阻塞)
// 写线程2获取锁(会阻塞)
// 写线程1释放锁,此时会唤醒读线程1
// 在读线程1里面先不要唤醒读线程2
// 写线程3获取锁,此时就会走到这里来了
            WNode c; Thread w;
// 如果头节点的 cowai t链表(栈)不为空,唤醒里面的所有节点
            //cowait指针,用于串联起所有的读线程
            while ((c = h.cowait) != null) {//自己从阻塞中唤醒,然后唤醒cowait中的所有reader线程
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        // 如果头节点没有变化
        if (whead == h) {
            // 如果尾节点有变化,则更新
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            }
            else if ((ps = p.status) == 0)// 如果尾节点状态为0,则更新成WAITING
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {// 如果尾节点状态为取消,则把它从链表中删除
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            else {
                // 有超时时间的处理
                long time; // 0 argument to park means no timeout
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)// 已超时,剔除当前节点
                    return cancelWaiter(node, node, false);
// 当前线程
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
// 把node的线程指向当前线程
                node.thread = wt;
                if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                    whead == h && node.prev == p)// 阻塞当前线程
                    U.park(false, time);  // emulate LockSupport.park 进入阻塞状态,之后被另外一个线程release唤醒,接着往下执行这个for循环
                // *************分界线***************
                // 当前节点被唤醒后,清除线程
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                // 如果中断了,取消当前节点
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

这里有两个大的for循环,第一个for循环用于把线程包装为节点放入队列的尾部,一边放还一边尝试CAS获取锁,如果获取到了则直接返回,如果获取不到则会一直自旋知道加入队列尾部。
到了第二个for循环,则当前节点已经在队列尾部。如果这时候发现当前节点也在头部,则会一直自旋获取锁,直到达到MAX_HEAD_SPINS次,然后进入阻塞。当其他线程调用release之后,本线程则会从“分界线”那里被唤醒继续往下走。

释放写锁

  1. 更改state的值,把“写”位置0,高24位(版本号)加1。
  2. 唤醒下一个等待着的节点。
public void unlockWrite(long stamp) {
    WNode h;
    if (state != stamp || (stamp & WBIT) == 0L)
        throw new IllegalMonitorStateException();
    // 这行代码实际有两个作用:
    // 1. 更新版本号加1
    // 2. 释放写锁
    // stamp + WBIT实际会把state的第8位置为0,也就相当于释放了写锁
    // 同时会进1,也就是高24位整体加1了
    state = (stamp += WBIT) == 0L ? ORIGIN : stamp;//释放锁,把state回归原位
    // 如果头节点不为空,并且状态不为0,调用release方法唤醒它的下一个节点
    if ((h = whead) != null && h.status != 0)//唤醒队列头部的第一个节点
        release(h);
}

private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        // 将其状态改为0
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
        // 如果头节点的下一个节点为空或者其状态为已取消
        if ((q = h.next) == null || q.status == CANCELLED) {
            // 从尾节点向前遍历找到一个可用的节点
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        // 唤醒q节点所在的线程
        if (q != null && (w = q.thread) != null)
            U.unpark(w);
    }
}

读锁

获取读锁

如果没有写锁占用,则原子更新读锁次数+1,否则进入acquireRead排队。

public long readLock() {
    long s = state, next;  // bypass acquireRead on common uncontended case
// 没有写锁占用,并且读锁被获取的次数未达到最大值
// 尝试原子更新读锁被获取的次数加1
// 如果成功直接返回,如果失败调用acquireRead()方法
    return ((whead == wtail && (s & ABITS) < RFULL &&
             U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
            next : acquireRead(false, 0L));
}

private long acquireRead(boolean interruptible, long deadline) {
    // node为新增节点,p为尾节点
    WNode node = null, p;
    // 第一段自旋——入队
    for (int spins = -1;;) {
        WNode h;
        // 如果头节点等于尾节点
        // 说明没有排队的线程了,快轮到自己了,直接自旋不断尝试获取读锁
        if ((h = whead) == (p = wtail)) {
            // 第二段自旋——不断尝试获取读锁
            for (long m, s, ns;;) {
                // 尝试获取读锁,如果成功了直接返回版本号
                if ((m = (s = state) & ABITS) < RFULL ?
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))// 如果读线程个数达到了最大值,会溢出,返回的是0
                    return ns;
                else if (m >= WBIT) {
                    // m >= WBIT表示有其它线程先一步获取了写锁
                    if (spins > 0) {
                        // 随机立减自旋次数
                        if (LockSupport.nextSecondarySeed() >= 0)
                            --spins;
                    }
                    else {
                        // 如果自旋次数为0了,看看是否要跳出循环
                        if (spins == 0) {
                            WNode nh = whead, np = wtail;
                            if ((nh == h && np == p) || (h = nh) != (p = np))
                                break;
                        }
                        spins = SPINS;
                    }
                }
            }
        }
        // 如果尾节点为空,初始化头节点和尾节点
        if (p == null) { // initialize queue
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        }
        else if (node == null)
            // 如果新增节点为空,初始化之
            node = new WNode(RMODE, p);
        else if (h == p || p.mode != RMODE) {
            // 如果头节点等于尾节点或者尾节点不是读模式
            // 当前节点入队
            if (node.prev != p)
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                p.next = node;
                break;
            }
        }
        else if (!U.compareAndSwapObject(p, WCOWAIT,
                                         node.cowait = p.cowait, node))
            // 接着上一个elseif,这里肯定是尾节点为读模式了
            // 将当前节点加入到尾节点的cowait中,这是一个栈
            // 上面的CAS成功了是不会进入到这里来的
            // 所以cowait连着的是一串读线程?
            node.cowait = null;
        else {
            // 第三段自旋——阻塞当前线程并等待被唤醒
            for (;;) {
                WNode pp, c; Thread w;
                // 如果头节点不为空且其cowait不为空,协助唤醒其中等待的读线程
                if ((h = whead) != null && (c = h.cowait) != null &&
                    U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null) // help release
                    U.unpark(w);
                // 如果头节点等于前前置节点或者等于前置节点或者前前置节点为空
                // 这同样说明快轮到自己了
                if (h == (pp = p.prev) || h == p || pp == null) {
                    long m, s, ns;
                    // 第四段自旋——又是不断尝试获取锁
                    do {
                        if ((m = (s = state) & ABITS) < RFULL ?
                            U.compareAndSwapLong(this, STATE, s,
                                                 ns = s + RUNIT) :
                            (m < WBIT &&
                             (ns = tryIncReaderOverflow(s)) != 0L))
                            return ns;
                    } while (m < WBIT);// 只有当前时刻没有其它线程占有写锁就不断尝试
                }
                // 如果头节点未曾改变且前前置节点也未曾改
                // 阻塞当前线程
                if (whead == h && p.prev == pp) {
                    long time;
                    // 如果前前置节点为空,或者头节点等于前置节点,或者前置节点已取消
                    // 从第一个for自旋开始重试
                    if (pp == null || h == p || p.status > 0) {
                        node = null; // throw away
                        break;
                    }
                    // 超时检测
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        // 如果超时了,取消当前节点
                        return cancelWaiter(node, p, false);
                    // 当前线程
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    // 设置进node中
                    node.thread = wt;
                    // 检测之前的条件未曾改变
                    if ((h != pp || (state & ABITS) == WBIT) &&
                        whead == h && p.prev == pp)
                        // 阻塞当前线程并等待被唤醒
                        U.park(false, time);
                    // 唤醒之后清除线程
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    // 如果中断了,取消当前节点
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, p, true);
                }
            }
        }
    }

    // 只有第一个读线程会走到下面的for循环处,参考上面第一段自旋中有一个break,当第一个读线程入队的时候break出来的

    // 第五段自旋——跟上面的逻辑差不多,只不过这里单独搞一个自旋针对第一个读线程

    for (int spins = -1;;) {
        WNode h, np, pp; int ps;
        // 如果头节点等于尾节点,说明快轮到自己了
        // 不断尝试获取读锁
        if ((h = whead) == p) {
            // 设置自旋次数
            if (spins < 0)
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
            // 第六段自旋——不断尝试获取读锁
            for (int k = spins;;) { // spin at head
                long m, s, ns;
                // 不断尝试获取读锁
                if ((m = (s = state) & ABITS) < RFULL ?
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                    // 获取到了读锁
                    WNode c; Thread w;
                    whead = node;
                    node.prev = null;
                    // 唤醒当前节点中所有等待着的读线程
                    // 因为当前节点是第一个读节点,所以它是在队列中的,其它读节点都是挂这个节点的cowait栈中的
                    while ((c = node.cowait) != null) {
                        if (U.compareAndSwapObject(node, WCOWAIT,
                                                   c, c.cowait) &&
                            (w = c.thread) != null)
                            U.unpark(w);
                    }
                    // 返回版本号
                    return ns;
                }
                // 如果当前有其它线程占有着写锁,并且没有自旋次数了,跳出当前循环
                else if (m >= WBIT &&
                         LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                    break;
            }
        }
        else if (h != null) {
            // 如果头节点不等于尾节点且不为空且其为读模式,协助唤醒里面的读线程
            WNode c; Thread w;
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        // 如果头节点未曾变化
        if (whead == h) {
            // 更新前置节点及其状态等
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            }
            else if ((ps = p.status) == 0)
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            else {
                // 第一个读节点即将进入阻塞
                long time;
                // 超时设置
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    // 如果超时了取消当前节点
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                node.thread = wt;
                if (p.status < 0 &&
                    (p != h || (state & ABITS) == WBIT) &&
                    whead == h && node.prev == p)
                    // 阻塞第一个读节点并等待被唤醒
                    U.park(false, time);
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

读锁的获取过程一共有六段自旋:

  1. 读节点进来都是先判断是头节点如果等于尾节点,说明快轮到自己了,就不断地尝试获取读锁,如果成功了就返回;
  2. 如果头节点不等于尾节点,这里就会让当前节点入队,这里入队又分成了两种;
  3. 一种是首个读节点入队,它是会排队到整个队列的尾部,然后跳出第一段自旋;
  4. 另一种是非第一个读节点入队,它是进入到首个读节点的cowait栈中,所以更确切地说应该是入栈;
  5. 不管是入队还入栈后,都会再次检测头节点是不是等于尾节点了,如果相等,则会再次不断尝试获取读锁;
  6. 如果头节点不等于尾节点,那么才会真正地阻塞当前线程并等待被唤醒;
  7. 上面说的首个读节点其实是连续的读线程中的首个,如果是两个读线程中间夹了一个写线程,还是老老实实的排队。

释放读锁

将state的低7位减1,当减为0的时候说明完全释放了读锁,就唤醒下一个排队的线程。

//将state的低7位减1,当减为0的时候说明完全释放了读锁,就唤醒下一个排队的线程。
public void unlockRead(long stamp) {
    long s, m; WNode h;
    for (;;) {
        // 检查版本号
        if (((s = state) & SBITS) != (stamp & SBITS) ||
            (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
            throw new IllegalMonitorStateException();
        // 读线程个数正常
        if (m < RFULL) {
            // 释放一次读锁
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                // 如果读锁全部都释放了,且头节点不为空且状态不为0,唤醒它的下一个节点
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                break;
            }
        }
        else if (tryDecReaderOverflow(s) != 0L)
            // 读线程个数溢出检测
            break;
    }
}
private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        // 将其状态改为0
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
        // 如果头节点的下一个节点为空或者其状态为已取消
        if ((q = h.next) == null || q.status == CANCELLED) {
            // 从尾节点向前遍历找到一个可用的节点
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        // 唤醒q节点所在的线程
        if (q != null && (w = q.thread) != null)
            U.unpark(w);
    }
}

变异的CLH队列

StampedLock也需要有一个队列保存阻塞的线程,但是它不是依靠AQS的同步队列来实现的,而是自建另一个双向链表。

/** Wait nodes */
static final class WNode {
    volatile WNode prev;
    volatile WNode next;
    // 读线程所用的链表(实际是一个栈结果)
    volatile WNode cowait;    // list of linked readers
    // 阻塞的线程
    volatile Thread thread;   // non-null while possibly parked
    // 状态
    volatile int status;      // 0, WAITING, or CANCELLED
    // 读模式还是写模式
    final int mode;           // RMODE or WMODE
    WNode(int m, WNode p) { mode = m; prev = p; }
}

/** Head of CLH queue */
// 队列的头节点
private transient volatile WNode whead;
/** Tail (last) of CLH queue */
// 队列的尾节点
private transient volatile WNode wtail;

其CLH队列中还有一个cowait保存着连续的读线程。
--2020-10-13---10.02.01

在注释中我们可以看到,之所以用cowait,是因为StampedLock只有七位可以来保存读线程,每个读需要占用一个序列号。如果组织成cowait形式,则整个cowait链只需要一个序列号。

Conceptually, the primary state of the lock includes a sequence number that is odd when write-locked and even otherwise. However, this is offset by a reader count that is non-zero when read-locked. The read count is ignored when validating "optimistic" seqlock-reader-style stamps. Because we must use a small finite number of bits (currently 7) for readers, a supplementary reader overflow word is used when the number of readers exceeds the count field. We do this by treating the max reader count value (RBITS) as a spinlock protecting overflow updates.
Waiters use a modified form of CLH lock used in AbstractQueuedSynchronizer (see its internal documentation for a fuller account), where each node is tagged (field mode) as either a reader or writer. Sets of waiting readers are grouped (linked) under a common node (field cowait) so act as a single node with respect to most CLH mechanics.
By virtue of the queue structure, wait nodes need not actually carry sequence numbers;
we know each is greater than its predecessor. This simplifies the scheduling policy to a mainly-FIFO scheme that incorporates elements of Phase-Fair locks (see Brandenburg & Anderson, especially http://www.cs.unc.edu/~bbb/diss/).
In particular, we use the phase-fair anti-barging rule: If an incoming reader arrives while read lock is held but there is a queued writer, this incoming reader is queued. (This rule is responsible for some of the complexity of method acquireRead, but without it, the lock becomes highly unfair.)
Method release does not (and sometimes cannot) itself wake up cowaiters. This is done by the primary thread, but helped by any other threads with nothing better to do in methods acquireRead and acquireWrite.

流程

我们来举个例子看一下出入队的流程。

  1. ThreadA调用writeLock获取写锁。cas成功直接返回。
    stampedLock1

  2. ThreadB调用readLock获取读锁。这时候因为有写线程占用了锁,所以获取读锁失败。则调用acquireRead入队。因为当前队列为空,则acquireRead中会初始化头结点(WNode模式),然后将自身包装成一个读结点,插入队尾,并修改前置节点status为-1,表示WAITING, 表示当前线程要去睡被阻塞了,前驱节点释放锁之后需要唤醒当前节点。

stampedLock2

  1. ThreadC调用readLock获取读锁。ThreadC被包装为节点加入等待队列后,是链接到ThreadB结点的cowait栈指针中的。

注意:读结点的cowait字段其实构成了一个栈,入栈的过程其实是个“头插法”插入单链表的过程。比如,再来个ThreadX读结点,则cowait链表结构为:ThreadB - > ThreadX -> ThreadC。最终唤醒读结点时,将从栈顶开始。

stampedLock3

  1. ThreadD调用writeLock获取写锁。获取失败,调用acquireWrite方法入队。首先自旋的尝试获取写锁,获取成功后,就直接返回;否则,会将当前线程包装成一个写结点,插入到等待队列。

stampedLock4

  1. ThreadE调用readLock获取读锁。获取失败,调用acquireRead方法,在该方法的第一个自旋中,会将ThreadE加入等待队列。

stampedLock5

  1. ThreadA调用unlockWrite释放写锁。通过CAS操作,修改State成功后,会调用release方法唤醒等待队列的队首结点。这时候队首节点是ThreadB。

stampedLock6

  1. ThreadB被唤醒后继续向下执行。ThreadB被唤醒后,会从原阻塞处继续向下执行,然后开始下一次自旋。第二次自旋时,ThreadB发现写锁未被占用,则成功获取到读锁,然后从栈顶(ThreadB的cowait指针指向的结点)开始唤醒栈中所有线程。

stampedLock7

  1. ThreadC被唤醒后,继续执行,并进入下一次自旋,下一次自旋时,会成功获取到读锁。当读锁的数量变为0时才会调用release方法,所以当BC释放锁后,ThreadD被唤醒后继续向下执行。ThreadD会从原阻塞处继续向下执行,并在下一次自旋中获取到写锁,然后返回。

stampedLock8

  1. ThreadD调用unlockWrite释放写锁。会调用unlockWrite唤醒队首结点(ThreadE)。ThreadE被唤醒后会从原阻塞处继续向下执行,但由于ThreadE是个读结点,所以同时会唤醒cowait栈中的所有读结点。最终,等待队列的结构如下

stampedLock9

StampedLock VS ReentrantReadWriteLock

  1. 两者都有获取读锁、获取写锁、释放读锁、释放写锁的方法,这是相同点;
  2. 两者的结构基本类似,都是使用state + CLH队列;
  3. 前者的state分成三段,高24位存储版本号、低7位存储读锁被获取的次数、第8位存储写锁被获取的次数;
  4. 后者的state分成两段,高16位存储读锁被获取的次数,低16位存储写锁被获取的次数;
  5. 前者的CLH队列可以看成是变异的CLH队列,连续的读线程只有首个节点存储在队列中,其它的节点存储的首个节点的cowait栈中;
  6. 后者的CLH队列是正常的CLH队列,所有的节点都在这个队列中;
  7. 前者获取锁的过程中有判断首尾节点是否相同,也就是是不是快轮到自己了,如果是则不断自旋,所以适合执行短任务;
  8. 后者获取锁的过程中非公平模式下会做有限次尝试;
  9. 前者只有非公平模式,一上来就尝试获取锁;
  10. 前者唤醒读锁是一次性唤醒连续的读锁的,而且其它线程还会协助唤醒;
  11. 后者是一个接着一个地唤醒的;
  12. 前者有乐观读的模式,乐观读的实现是通过判断state的高25位是否有变化来实现的;
  13. 前者各种模式可以互转,类似tryConvertToXxx()方法;
  14. 前者写锁不可重入,后者写锁可重入;
  15. 前者无法实现条件锁,后者可以实现条件锁;
    stampedlockVS

refer

https://zhuanlan.zhihu.com/p/159178009
https://segmentfault.com/a/1190000015808032

comments powered by Disqus