JDK并发集合6:ReentrantReadWriteLock(同步工具)

读写锁是一种适用于多读少写的场景的锁,多个读线程可以用时对资源进行读取,但是写线程则是排他的。在多读少写的场景下,对比使用ReentrantLock和synchronized,能极大地提高并发度。

读写锁的特性:

互斥

类结构

ReentrantReadWriteLock本身实现了ReadWriterLock,需要实现“获取读锁”readLock()和“获取写锁”writeLock()。

和ReentrantLock一样,ReentrantReadWriteLock本身有个继承了AQS的同步器Sync,Sync又分为公平锁FairSync和非公平锁NonFairSync。

读锁ReadLock和写锁WriteLock实现了Lock,内部的lock()、unlock()方法是通过传入的sync来实现的。

public class ReentrantReadWriteLock implements ReadWriteLock {
    abstract static class Sync extends AbstractQueuedSynchronizer {} 
    static final class FairSync extends Sync {}
    static final class NonfairSync extends Sync {}
    
    /** Inner class providing readlock */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    
    public static class ReadLock implements Lock {}
    public static class WriteLock implements Lock {}
    
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
}

各个变量

内部的Sync继承于AQS,其也是用status来表示同步状态:

  • 高16位存储的是共享锁(读锁)被重入的次数
  • 低16位存储的是互斥锁(写锁)被重入的次数

因为不能同时CAS两个int,所以就用一个int来记录读写锁的状态。

readerLock和writerLock其实是同一把锁的两个视图,共用一个Sync对象。

读锁

lock

代码

// ReentrantReadWriteLock.ReadLock
public void lock() {
    sync.acquireShared(1);
}

//AQS
public final void acquireShared(int arg) {
    // 尝试获取共享锁(返回1表示成功,返回-1表示失败)
    if (tryAcquireShared(arg) < 0)
        // 失败了就可能要排队
        doAcquireShared(arg);
}

//ReentrantReadWriteLock
protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    Thread current = Thread.currentThread();
// 状态变量的值
// 在读写锁模式下,高16位存储的是共享锁(读锁)被获取的次数,低16位存储的是互斥锁(写锁)被获取的次数
     int c = getState();
// 互斥锁的次数
// 如果其它线程获得了写锁,直接返回-1

    //有其他线程持有写锁
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
// 读锁被获取的次数
    int r = sharedCount(c);
// 下面说明此时还没有写锁,尝试去更新state的值获取读锁
// 读者是否需要排队(是否是公平模式)
    if (!readerShouldBlock() &&/*公平和非公平的区别*/
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
// 获取读锁成功
        if (r == 0) {//r之前等于0,说明这是第一个拿到读锁的线程
// 如果之前还没有线程获取读锁
// 记录第一个读者为当前线程
            firstReader = current;
// 第一个读者重入的次数为1
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
// 如果有线程获取了读锁且是当前线程是第一个读者
// 则把其重入次数加1
            firstReaderHoldCount++;
        } else {//不是第一个持有读锁的
            //cachedHoldCounter 代表的是最后一个获取读锁的线程的计数器。

// 如果有线程获取了读锁且当前线程不是第一个读者
// 则从缓存中获取重入次数保存器
            HoldCounter rh = cachedHoldCounter;
// 如果缓存不属性当前线程
// 再从ThreadLocal中获取
// readHolds本身是一个ThreadLocal,里面存储的是HoldCounter
            // 如果最后一个线程计数器是 null 或者不是当前线程,那么就新建一个 HoldCounter 对象
            if (rh == null || rh.tid != getThreadId(current))
// get()的时候会初始化rh
                 cachedHoldCounter = rh = readHolds.get();// 给当前线程新建一个 HoldCounter
             else if (rh.count == 0)// 如果不是 null,且 count 是 0,就将上个线程的 HoldCounter 覆盖本地的。
// 如果rh的次数为0,把它放到ThreadLocal中去
                 readHolds.set(rh);
// 重入的次数加1(初始次数为0)
            rh.count++;
        }
// 获取读锁成功,返回1
        return 1;
    }
    // 死循环获取读锁。包含锁降级策略。
// 通过这个方法再去尝试获取读锁(如果之前其它线程获取了写锁,一样返回-1表示失败)
    return fullTryAcquireShared(current);//上面拿读锁失败,进入这个函数不断自旋拿读锁
}

//AQS
private void doAcquireShared(int arg) {
// 进入AQS的队列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
// 当前节点的前一个节点
            final Node p = node.predecessor();
// 如果前一个节点是头节点(说明是第一个排队的节点)
            if (p == head) {
// 再次尝试获取读锁
                int r = tryAcquireShared(arg);
// 如果成功了
                if (r >= 0) {
// 头节点后移并传播
// 传播即唤醒后面连续的读节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
// 没获取到读锁,阻塞并等待被唤醒
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
//AQS
private void setHeadAndPropagate(Node node, int propagate) {
    // h为旧的头节点
    Node h = head; // Record old head for check below
    // 设置当前节点为新头节点
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */

// 如果旧的头节点或新的头节点为空或者其等待状态小于0(表示状态为SIGNAL/PROPAGATE)
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
// 需要传播
// 取下一个节点
        Node s = node.next;
// 如果下一个节点为空,或者是需要获取读锁的节点
        if (s == null || s.isShared())
// 唤醒下一个节点
            doReleaseShared();
    }
}

//AQS
// 这个方法只会唤醒一个节点
// 行为跟方法名有点不符,实际是唤醒下一个节点
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    //调用风暴 https://segmentfault.com/a/1190000016447307
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
// 如果头节点状态为SIGNAL,说明要唤醒下一个节点
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
// 唤醒下一个节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
// 把头节点的状态改为PROPAGATE成功才会跳到下面的if
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
// 如果唤醒后head没变,则跳出循环
        if (h == head)                   // loop if head changed
            break;
    }
}

流程

  1. 先尝试获取读锁;
  2. 如果成功了直接结束;
  3. 如果失败了(已经有线程持有写锁),进入doAcquireShared()方法;
  4. doAcquireShared()方法中首先会生成一个新节点并进入AQS队列中;
  5. 如果头节点正好是当前节点的上一个节点,再次尝试获取锁;
  6. 如果成功了,则设置头节点为新节点(头结点后移),并传播;这里会传递唤醒直到不是SIGNAL
  7. 传播即唤醒下一个读节点(如果下一个节点是(共享即)读节点的话);
  8. 如果头节点不是当前节点的上一个节点或者5失败,则阻塞当前线程等待被唤醒;
  9. 唤醒之后继续走5的逻辑;

连续唤醒:这里会有一个连续唤醒读节点的逻辑:在doAcquireShared里面,每个被唤醒的节点都会走setHeadAndPropagate去设置头结点并传播,依次往复。
--lock

unlock

代码

//ReentrantReadWriteLock.ReadLock
public void unlock() {
    sync.releaseShared(1);
}

//AQS
public final boolean releaseShared(int arg) {
// 如果尝试释放成功了,就唤醒下一个节点
    if (tryReleaseShared(arg)) {
// 这个方法实际是唤醒下一个节点
        doReleaseShared();
        return true;
    }
    return false;
}

//ReentrantReadWriteLock.Sync
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
// 如果第一个读者(读线程)是当前线程
// 就把它重入的次数减1
// 如果减到0了就把第一个读者置为空
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
// 如果第一个读者不是当前线程
// 一样地,把它重入的次数减1
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
// 共享锁获取的次数减1
// 如果减为0了说明完全释放了,才返回true
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

// 这个方法只会唤醒一个节点
// 行为跟方法名有点不符,实际是唤醒下一个节点
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    //调用风暴 https://segmentfault.com/a/1190000016447307
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
// 如果头节点状态为SIGNAL,说明要唤醒下一个节点
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
// 唤醒下一个节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
// 把头节点的状态改为PROPAGATE成功才会跳到下面的if
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
// 如果唤醒后head没变,则跳出循环
        if (h == head)                   // loop if head changed
            break;
    }
}

流程

  1. 将共享锁总共被获取的次数减1;
  2. 如果共享锁获取的次数减为0了,说明共享锁完全释放了,那就唤醒下一个节点;

如下图,ABC三个节点各获取了一次共享锁,三者释放的顺序分别为ACB,那么最后B释放共享锁的时候tryReleaseShared()才会返回true,进而才会唤醒下一个节点D。
--unlock

写锁

lock

代码

//ReentrantReadWriteLock.WriteLock
public void lock() {
    sync.acquire(1);
}

//AQS
public final void acquire(int arg) {
// 先尝试获取锁
// 如果失败,则会进入队列中排队,后面的逻辑跟ReentrantLock一模一样了
    if (!tryAcquire(arg) &&
        //注意addWaiter()这里传入的节点模式为独占模式
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

//ReentrantReadWriteLock.Sync
protected final boolean tryAcquire(int acquires) {
    /*
     * Walkthrough:
     * 1. If read count nonzero or write count nonzero
     *    and owner is a different thread, fail.
     * 2. If count would saturate, fail. (This can only
     *    happen if count is already nonzero.)
     * 3. Otherwise, this thread is eligible for lock if
     *    it is either a reentrant acquire or
     *    queue policy allows it. If so, update state
     *    and set owner.
     */
    Thread current = Thread.currentThread();
    int c = getState();
// 互斥锁被获取的次数
    int w = exclusiveCount(c);//写线程只能有一个,但写线程可以多次重入。
    if (c != 0) {//c!=0说明有读线程或者写线程持有锁
        // (Note: if c != 0 and w == 0 then shared count != 0)
        //w==0说明有读线程持有锁,w!=0但是持有写锁的不是自己。

// 如果c!=0且w==0,说明共享锁被获取的次数不为0
// 这句话整个的意思就是
// 如果共享锁被获取的次数不为0,或者被其它线程获取了互斥锁(写锁)
// 那么就返回false,获取写锁失败
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
// 到这里说明当前线程已经获取过写锁,这里是重入了,直接把state加1即可
        setState(c + acquires);
        return true;
    }
    //c==0,开始抢占

// 如果c等于0,就尝试更新state的值(非公平模式writerShouldBlock()返回false)
// 如果失败了,说明获取写锁失败,返回false
// 如果成功了,说明获取写锁成功,把自己设置为占有者,并返回true
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

流程

  1. 尝试获取锁;
  2. 如果有读者占有着读锁,尝试获取写锁失败;
  3. 如果有其它线程占有着写锁,尝试获取写锁失败;
  4. 如果是当前线程占有着写锁,尝试获取写锁成功,state值加1;
  5. 如果没有线程占有着锁(state==0),当前线程尝试更新state的值,成功了表示尝试获取锁成功,否则失败;
  6. 尝试获取锁失败以后,进入同步队列排队,等待被唤醒;

unlock

代码

//ReentrantReadWriteLock.WriteLock
public void unlock() {
    sync.release(1);
}

//AQS
public final boolean release(int arg) {
// 如果尝试释放锁成功(完全释放锁)
// 就尝试唤醒下一个节点
    // 调用AQS实现类的tryRelease()方法释放锁
    if (tryRelease(arg)) {
        Node h = head;
// 如果头节点不为空,且等待状态不是0,就唤醒下一个节点
// 还记得waitStatus吗?
// 在每个节点阻塞之前会把其上一个节点的等待状态设为SIGNAL(-1)
// 所以,SIGNAL的准确理解应该是唤醒下一个等待的线程
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

//RentrantReadWriteLock.Sync
protected final boolean tryRelease(int releases) {
// 如果写锁不是当前线程占有着,抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
// 状态变量的值减1
    int nextc = getState() - releases;
// 是否完全释放锁
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
// 设置状态变量的值
    setState(nextc);//写锁是排他的,直接设置
// 如果完全释放了写锁,返回true
    return free;
}

流程

  1. 先尝试释放锁,即状态变量state的值减1;
  2. 如果减为0了,说明完全释放了锁;
  3. 完全释放了锁才唤醒下一个等待的节点;

公平锁 VS 非公平锁

  • 非公平锁:为了防止写线程“饿死”
    • 头结点为独占模式:只能被独占线程抢占
    • 头结点为共享模式:可以被独占和共享锁线程抢占
  • 公平锁:使用AQS的等待队列排队
comments powered by Disqus