JDK并发集合5:ReentrantLock(同步工具)

ReentrantLock

我们可以通过ReentrantLock来加深对AQS的理解。

内部API

ReentrantLock内部的同步器Sync继承于AQS。

abstract static class Sync extends AbstractQueuedSynchronizer {}
static final class FairSync extends Sync {}
static final class NonfairSync extends Sync {}

ReentrantLock本身实现了Lock接口,其内部加解锁时的核心API层实现如下:
f30c631c8ebbf820d3e8fcb6eee3c0ef18748

加锁

  1. 用户使用Lock来加锁
  2. lock里面调用到了ReentrantLock内部同步器Sync.lock(),由于它是抽象方法,所以最终ReentrantLock需要自己实现lock()方法。而ReentrantLock内部又有非公平的Sync实现NonfairSync以及公平的Sync实现FairSync。非公平和公平的区别是,是否一开始就去设置statue获取锁。最终都是调用到AQS的acquire方法。
  3. acquire是个模板方法,它会调用子类自己实现的tryAcquire。如果tryAcquire不成功则调用acquireQueued把当前节点自旋两次仍然获取不到锁后CAS入CLH队列。
  4. tryAcquire在NonfairSync和FairSync里面又有不同实现。前者首先尝试获取锁,如果失败则返回false。后者只有当同步队列中没有线程等待才去获取锁,获取不到则返回false。

解锁

  1. 用户使用unlock来解锁
  2. unlock里面调用到了AQS的release()
  3. release会调用ReentrantLock中Sync的tryRelease,即没有是否是公平锁的区别。
  4. tryRelease会减少status,如果减到0则设置exclusiveOwnerThread为null。

各变量

waitStatus

Node阻塞的时候会把上一个节点设置为SIGNAL,意味着需要唤醒下一个等待的线程。这样子在release的时候才会去unpark下一个没有被取消的node。

state

可重入锁,state表示重入的次数。当release减到0的时候,会unpark下一个被阻塞的线程。

  • 当state=0时,没有线程持有锁,exclusiveOwnerThread=null;
  • 当state=1时,有一个线程持有锁,exclusiveOwnerThread=该线程;
  • 当state>1时,说明该线程重入了该锁。

ExclusiveOwnerThread

排它锁。当前线程获取锁之后,把自己设置到exclusiveOwnerThread变量中。释放锁之后,会把exclusiveOwnerThread设置为null。

条件锁

有些场景下,线程获取到了锁之后,还需要等到某一些条件为真,这时候就需要用到条件锁了。典型实现比如ArrayBlockingQueue就是使用到了ReentrantLock来实现阻塞等待,它弹出一个元素需要“notEmpty”这个条件为真,插入一个元素需要“notFull”这个条件为真。

条件队列

除了AQS里面的同步队列,条件锁还是用到了条件队列ConditionObject
----

自己实现一个队列的原因是,这样子可以使用多个条件,不同的线程可以阻塞在不同的条件上面。

ConditionObject是实现在AQS里面的:firstWaiter是条件对立的头部,lastWaiter是条件对立的尾部。

public class ConditionObject implements Condition, java.io.Serializable {
    //每一个Condition对象上面,都阻塞了多个线程。因此,在ConditionObject内部有一个双向链表组成的队列
    //条件队列
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
}

新建一个条件

//ReentrantLock
public Condition newCondition() {
    return sync.newCondition();
}
//Sync
final ConditionObject newCondition() {
    return new ConditionObject();
}
//AQS
public ConditionObject() { }

await等待

主要流程:

  1. 把当前节点放到条件队列里面
  2. 然后释放锁
  3. 之后在一个while循环里面检查是否已经在同步队列里面,没有的话就阻塞自己
  4. 被唤醒后(即条件已经出现)继续竞争锁。如果竞争不到,acquireQueued里面会阻塞。acquireQueued反悔的时候,此线程获取到锁。

这里为什么要判断是否在同步队列里面,因为条件为真之后的signal方法中,会把相关阻塞在该条件上的节点都挪到同步队列中,意思就是已经出现了“条件为真”的情况,那么就去竞争锁吧。当然,竞争到锁之后条件可能已经不为真了,所以需要自己再判断。

需要注意两个点:

  1. 同步队列和条件队列不一样
    • AQS中next是下一个节点,prev是上一个节点
    • ConditionObject中使用nextWaiter指向下一个节点,没有指向上一个节点
  2. waitStatus的变化
    • 在条件队列中是CONDITION(-2)
    • 挪到同步队列的时候,初始化为0
    • 需要阻塞的时候,设置上一个节点为SIGNAL(-1)
    • 已取消的节点的等待状态都会设置为CANCELLED(1)
public final void await() throws InterruptedException {
    if (Thread.interrupted())//收到中断信号则抛出异常
        throw new InterruptedException();
    //线程调用await()的时候,肯定已经先拿到了锁。所以,在addConditionWaiter()内部,对这个双向链表的操作不需要执行CAS操作
    Node node = addConditionWaiter();//加入Condition的等待队列(其中会清除cancelled节点)
    //完全释放当前线程获取的锁
    //因为锁是可重入的,所以这里要把获取的锁全部释放
    int savedState = fullyRelease(node);//阻塞在Condition之前必须先释放锁,否则会死锁
    int interruptMode = 0;
    //isOnSyncQueue(node)用于判断该Node是否在AQS的同步队列里面。初始的时候,Node只在Condition的队列里,而不在AQS的队列里。但执行notity操作的时候,会放进AQS的同步队列。
    //是否在同步队列中(ConditionObject:条件队列。AQS:同步队列)
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);//自己阻塞自己
        // 上面部分是调用await()时释放自己占有的锁,并阻塞自己等待条件的出现
        // *************************分界线*************************  //
        // 下面部分是条件已经出现,尝试去获取锁
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //线程从wait中被唤醒后,必须用acquireQueued(node,savedState)函数重新拿锁
    //acquireQueued这个方法返回的时候,代表当前线程获取了锁,而且 state == savedState了。

    //该方法的返回值就是代表线程是否被中断,如果返回 true,说明被中断了,
    // 而且若是 interruptMode != THROW_IE,说明在 signal 之前就发生中断了,
    // 这里将 interruptMode 设置为 REINTERRUPT,用于待会重新中断,其实只是要把这个中断标志保留,留给开发人员用。

    // 尝试获取锁,注意第二个参数,这是上一章分析过的方法
    // 如果没获取到会再次阻塞
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//重新拿锁
        interruptMode = REINTERRUPT;

    //这里突然冒出来个nextWaiter,事实上若是正常signal的话,我们前边代码里很清楚,first.nextWaiter = null,
    // 正常signal的话nextWaiter是null;这种不为null的情况实际上就是因为中断引发的,
    // 我们说了中断也会使得线程由条件队列进入阻塞队列,此时并没有对后边的条件节点进行处理,正是在这里处理的。

    //清除取消的节点
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 最后处理中断
    //线程中断相关
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);//被中断唤醒,向外抛出中断异常
}

// 将当前线程构造成条件节点加入condition的条件队列尾部,node即为构造的节点
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    // 如果条件队列的尾节点已取消,从头节点开始清除所有已取消的节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();//去掉cancelled的节点
        //重新获取尾节点
        t = lastWaiter;
    }
    //新建一个节点,它的等待状态是CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    //如果尾节点为空,则把新节点赋值给头节点(相当于初始化队列)
    //否则把新节点赋值给尾节点的nextWaiter指针
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    //尾节点指向新节点
    lastWaiter = node;
    return node;
}

final int fullyRelease(Node node) {//先释放锁
    boolean failed = true;
    try {
// 获取状态变量的值,重复获取锁,这个值会一直累加
// 所以这个值也代表着获取锁的次数
        int savedState = getState();
// 一次性释放所有获得的锁
        if (release(savedState)) {//其实就是修改state啊什么的
            failed = false;
// 返回获取锁的次数
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

// 判断node是否处于阻塞队列
final boolean isOnSyncQueue(Node node) {//当前node是否在sync队列中
    // 阻塞队列中的节点状态不是CONDITION,prev是null说明也不在阻塞队列

// 如果等待状态是CONDITION,或者前一个指针为空,返回false
// 说明还没有移到AQS的队列中
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;

// 如果next指针有值,说明已经移到AQS的队列中了

    // 此时node的状态必不为CONDITION且node.prev不为null
    //如果next不为空,肯定在阻塞队列;

// 从AQS的尾节点开始往前寻找看是否可以找到当前节点,找到了也说明已经在AQS的队列中了
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse from tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
    // 从队尾往前找node,找到的话返回true,否则返回false
    return findNodeFromTail(node);//从尾部往回搜索node,找到则返回true。意图是当前node是否在队列中。
}

signal通知

signal通知条件已经出现。

主要流程是:

  1. 从队列的头结点获取一个非取消状态的节点
  2. 将节点从条件队列转移到阻塞队列
  3. 之后应用程序会调用unlock,unlock之后之前阻塞的线程就会执行await的“分界线”下面的代码。
//唤醒线程,转移到阻塞队列
public final void signal() {
// 如果不是当前线程占有着锁,调用这个方法抛出异常
// 说明signal()也要在获取锁之后执行
    if (!isHeldExclusively())//只有持有锁的线程,才有资格调用signal()
        throw new IllegalMonitorStateException();
// 条件队列的头节点
    Node first = firstWaiter;
    // 唤醒条件队列中第一个节点,等待最久
// 如果有等待条件的节点,则通知它条件已成立
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {//唤醒队列中的第一个线程
    // while循环,若first节点迁移不成功,选择first后第一个节点进行转移
    do {
// 移到条件队列的头节点往后一位
        // 将 firstWaiter 指向 first 节点后面的第一个,因为 first马上要被迁移到阻塞队列
        // 若将 first 移除后,后面没有节点在等待了,那么需要将 lastWaiter 置为 null
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 相当于把头节点从队列中出队
        first.nextWaiter = null;
        // 转移节点到AQS队列中
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

// 将节点从条件队列转移到阻塞队列(从condition的条件队列转移到AQS的同步队列)
final boolean transferForSignal(Node node) {
// 把节点的状态更改为0,也就是说即将移到AQS队列中
// 如果失败了,说明节点已经被改成取消状态了
// 返回false,通过上面的循环可知会寻找下一个可用节点
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    // cas失败说明其他线程完成了转移,返回继续转移下个节点;成功的话,waitStatus置为0,上一节说过,阻塞队列节点初始状态时waitstatus为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */

    // 将该节点自旋加入阻塞队列尾部,p是加入阻塞队列后的前驱节点

    //从队列中取出firstWait,唤醒它。在通过调用unpark唤醒它之前,先用enq(node)函数把这个Node放入AQS的锁对应的阻塞队列中。
    // 也正因为如此,才有了await()函数里面的判断条件while (!isOnSyncQueue(node)),这个判断条件被满足,说明await线程不是被中断,而是被unpark唤醒的。

// 调用AQS的入队方法把节点移到AQS的队列中
// 注意,这里enq()的返回值是node的上一个节点,也就是旧尾节点

    Node p = enq(node);//先把Node放入互斥锁的同步队列里,再调用下面的unpark

// 上一个节点的等待状态
    int ws = p.waitStatus;
// 如果上一个节点已取消了,或者更新状态为SIGNAL失败(也是说明上一个节点已经取消了)
// 则直接唤醒当前节点对应的线程
    // ws<=0且cas成功的话直接返回true;否则unpark唤醒线程返回true,总之到了这一步迁移已经完成
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
// 如果更新上一个节点的等待状态为SIGNAL成功了
// 则返回true,这时上面的循环不成立了,退出循环,也就是只通知了一个节点
// 此时当前节点还是阻塞状态
// 也就是说调用signal()的时候并不会真正唤醒一个节点
// 只是把节点从条件队列移到AQS队列中
    return true;
}

signalAll

signalAll与signal流程一致,区别就是signalAll会把所有阻塞等待在该条件的节点都挪到同步队列里面去。

条件锁流程

上面的流程可以归结为下面这张图:

-----

jdk8中的bug

在使用await和signal的时候,需要先lock才能调用。jdk8的视线中,在signal和signalAll的时候在方法一开始就调用了isHeldExclusively来做持有锁的判断,但是await没有。如果在非正确的情况下调用await,会出现问题。

在没有持有锁的情况下调用await,首先await调用addConditionWaiter把节点入队列,之后fullyRelease释放锁的时候,因为没有持有锁所以会抛出IllegalMonitorStateException,但是这时候节点已经进入了同步队列。这就会产生一个问题,当addConditionWaiter的时候,修改firstWaiter和nextWaiter的时候node会被排挤出队列,在不出现中断的情况下,会导致该线程一直被阻塞,没有得到被挪到同步队列的机会。

private Node addConditionWaiter() {
    Node t = lastWaiter;
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);

    // 存在竞争时将会导致节点入队出错
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

来看一个例子:持有锁和没有锁的线程同时调用await。
定位到“注释/打开lock”这里,分别打开和注释掉lock。打开的时候输出是正确的队列里节点数2000,如果注释掉的话,队列里面节点数就不是2000了。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.CountDownLatch;

public class ConditionObjectTest2 {
    public static void main(String[] args) throws Exception {
        final int loop = 1000;
        final ReentrantLock lock = new ReentrantLock();
        final Condition cond = lock.newCondition();
        final CountDownLatch done = new CountDownLatch(loop);

        for (int i = 0; i < loop; i++) {
            new Thread(() -> {
                lock.lock();
                try {
                    done.countDown();
                    cond.await();
                } catch (InterruptedException unexpected) {
                    throw new AssertionError("interrupted?");
                } finally {
                    lock.unlock();
                }
            }, "good-" + i).start();

            new Thread(() -> {
                //注释/打开lock
                lock.lock();
                try {
                    cond.awaitUninterruptibly(); // not holding lock?
//                    cond.await();
                    throw new AssertionError("should throw");
//                } catch (InterruptedException unexpected) {
//                    throw new AssertionError("interrupted?");
                } catch (IllegalMonitorStateException expected) {
//                    expected.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }, "evil-" + i).start();
        }

        done.await();
        for (int prev = -1;; Thread.yield()) {
            lock.lock();
            try {
                int qLength = lock.getWaitQueueLength(cond);
                if (qLength != prev) {
                    System.err.println(qLength);
                    prev = qLength;
                }
                if (qLength == loop) {
                    cond.signalAll();
                    break;
                }
            } finally {
                lock.unlock();
            }
        }
    }
}

这个bug:https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8187408
已经在JDK10中得到了修复,我们来看下JDK12的代码:

private Node addConditionWaiter() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }

    Node node = new Node(Node.CONDITION);

    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

ReentrantLock VS synchronized

使用对比

我们来看下这两个的对比:

功能 ReentrantLock synchronized
可重入 支持 支持
非公平 支持(默认) 支持
加锁/解锁方式 需要手动加锁、解锁,一般使用try..finally..保证锁能够释放 手动加锁,无需刻意解锁
按key锁 不支持,比如按用户id加锁 支持,synchronized加锁时需要传入一个对象
公平锁 支持,new ReentrantLock(true) 不支持
中断 支持,lockInterruptibly() 不支持
尝试加锁 支持,tryLock() 不支持
超时锁 支持,tryLock(timeout, unit) 不支持
获取当前线程获取锁的次数 支持,getHoldCount() 不支持
获取等待的线程 支持,getWaitingThreads() 不支持
检测是否被当前线程占有 支持,isHeldByCurrentThread() 不支持
检测是否被任意线程占有 支持,isLocked() 不支持
条件锁 可支持多个条件,condition.await(),condition.signal(),condition.signalAll() 只支持一个,obj.wait(),obj.notify(),obj.notifyAll()

性能对比

synchronized的性能大概是ReentrantLock的两倍。

public class ReentrantLockVsSynchronizedTest {
    public static AtomicInteger a = new AtomicInteger(0);
    public static LongAdder b = new LongAdder();
    public static int c = 0;
    public static int d = 0;
    public static int e = 0;

    public static final ReentrantLock fairLock = new ReentrantLock(true);
    public static final ReentrantLock unfairLock = new ReentrantLock();


    public static void main(String[] args) throws InterruptedException {
        System.out.println("-------------------------------------");
        testAll(1, 100000);
        System.out.println("-------------------------------------");
        testAll(2, 100000);
        System.out.println("-------------------------------------");
        testAll(4, 100000);
        System.out.println("-------------------------------------");
        testAll(6, 100000);
        System.out.println("-------------------------------------");
        testAll(8, 100000);
        System.out.println("-------------------------------------");
        testAll(10, 100000);
        System.out.println("-------------------------------------");
        testAll(50, 100000);
        System.out.println("-------------------------------------");
        testAll(100, 100000);
        System.out.println("-------------------------------------");
        testAll(200, 100000);
        System.out.println("-------------------------------------");
        testAll(500, 100000);
        System.out.println("-------------------------------------");
//        testAll(1000, 1000000);
        System.out.println("-------------------------------------");
        testAll(500, 10000);
        System.out.println("-------------------------------------");
        testAll(500, 1000);
        System.out.println("-------------------------------------");
        testAll(500, 100);
        System.out.println("-------------------------------------");
        testAll(500, 10);
        System.out.println("-------------------------------------");
        testAll(500, 1);
        System.out.println("-------------------------------------");
    }

    public static void testAll(int threadCount, int loopCount) throws InterruptedException {
        testAtomicInteger(threadCount, loopCount);
        testLongAdder(threadCount, loopCount);
        testSynchronized(threadCount, loopCount);
        testReentrantLockUnfair(threadCount, loopCount);
//        testReentrantLockFair(threadCount, loopCount);
    }

    public static void testAtomicInteger(int threadCount, int loopCount) throws InterruptedException {
        long start = System.currentTimeMillis();

        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                for (int j = 0; j < loopCount; j++) {
                    a.incrementAndGet();
                }
                countDownLatch.countDown();
            }).start();
        }

        countDownLatch.await();

        System.out.println("testAtomicInteger: result=" + a.get() + ", threadCount=" + threadCount + ", loopCount=" + loopCount + ", elapse=" + (System.currentTimeMillis() - start));
    }

    public static void testLongAdder(int threadCount, int loopCount) throws InterruptedException {
        long start = System.currentTimeMillis();

        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                for (int j = 0; j < loopCount; j++) {
                    b.increment();
                }
                countDownLatch.countDown();
            }).start();
        }

        countDownLatch.await();

        System.out.println("testLongAdder: result=" + b.sum() + ", threadCount=" + threadCount + ", loopCount=" + loopCount + ", elapse=" + (System.currentTimeMillis() - start));
    }

    public static void testReentrantLockFair(int threadCount, int loopCount) throws InterruptedException {
        long start = System.currentTimeMillis();

        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                for (int j = 0; j < loopCount; j++) {
                    fairLock.lock();
                    // 消除try的性能影响
//                    try {
                    c++;
//                    } finally {
                    fairLock.unlock();
//                    }
                }
                countDownLatch.countDown();
            }).start();
        }

        countDownLatch.await();

        System.out.println("testReentrantLockFair: result=" + c + ", threadCount=" + threadCount + ", loopCount=" + loopCount + ", elapse=" + (System.currentTimeMillis() - start));
    }

    public static void testReentrantLockUnfair(int threadCount, int loopCount) throws InterruptedException {
        long start = System.currentTimeMillis();

        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                for (int j = 0; j < loopCount; j++) {
                    unfairLock.lock();
                    // 消除try的性能影响
//                    try {
                    d++;
//                    } finally {
                    unfairLock.unlock();
//                    }
                }
                countDownLatch.countDown();
            }).start();
        }

        countDownLatch.await();

        System.out.println("testReentrantLockUnfair: result=" + d + ", threadCount=" + threadCount + ", loopCount=" + loopCount + ", elapse=" + (System.currentTimeMillis() - start));
    }

    public static void testSynchronized(int threadCount, int loopCount) throws InterruptedException {
        long start = System.currentTimeMillis();

        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                for (int j = 0; j < loopCount; j++) {
                    synchronized (ReentrantLockVsSynchronizedTest.class) {
                        e++;
                    }
                }
                countDownLatch.countDown();
            }).start();
        }

        countDownLatch.await();

        System.out.println("testSynchronized: result=" + e + ", threadCount=" + threadCount + ", loopCount=" + loopCount + ", elapse=" + (System.currentTimeMillis() - start));
    }

}

refer

https://segmentfault.com/a/1190000014751308
https://www.cnblogs.com/micrari/p/7219751.html

comments powered by Disqus