JDK并发集合4:AQS原理(同步工具)

接下来我们正式进入JDK并发集合的章节。

Treiber Stack

Treiber Stack是一种可扩展的无锁栈,利用细粒度的并发原语CAS来实现的。在FutureTask中会用到。

入栈

ts--

  1. 单链表保存了各个元素
  2. 入栈时,首先创建一个newHead,把next指针指向top
  3. 通过CAS替换top为newHead,CAS的条件是top==oldHead

出栈

ts---1

  1. 创建一个新的指针,指向top的next元素,假设其next元素为newHead
  2. CAS将top指向newHead,CAS的条件同样是top==oldHead

AQS

继承

继承于AbstractOwnableSynchronizer,这个类只有一个变量exclusiveOwnerThread,表示当前排他状态持有锁的线程。

原理

AQS是JDK中几乎所有锁和同步器的基础框架。它作为锁类的模板类,维护了一个队列以及一个state状态变量。它的队列使用双链表实现,用于保存等待锁排队的线程。state状态变量用于同步状态,以控制加锁解锁,该状态由子类来维护。基于AQS自己动手写一个锁非常简单,只需要实现AQS的几个方法即可。

AQS框架主要实现的是:

  • 线程阻塞队列的维护
  • 线程阻塞和唤醒

下面举个例子,在独占模式下,我们可以通过设置state为0为不被占有,state为1为被独占来实现独占锁。

AQS----

在共享模式下,我们可以设置state大于0为可以共享,否则为不可以共享,即控制共享的次数。
AQS-----2

实现

子类继承AQS,只需要实现以下几个方法即可:

tryAcquire //互斥模式:尝试获取锁
tryRelease //互斥模式:尝试释放锁
tryAcquireShared //共享模式:尝试获取锁
tryReleaseShared //共享模式:尝试释放锁
isHeldExclusively //当前线程是否独占锁

这里有个例子:

public class MyLockBaseOnAqs {

    // 定义一个同步器,实现AQS类
    private static class Sync extends AbstractQueuedSynchronizer {
        // 实现tryAcquire(acquires)方法
        @Override
        public boolean tryAcquire(int acquires) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // 实现tryRelease(releases)方法
        @Override
        protected boolean tryRelease(int releases) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    }

    // 声明同步器
    private final Sync sync = new Sync();

    // 加锁
    public void lock() {
        sync.acquire(1);
    }

    // 解锁
    public void unlock() {
        sync.release(1);
    }


    private static int count = 0;

    public static void main(String[] args) throws InterruptedException {
        MyLockBaseOnAqs lock = new MyLockBaseOnAqs();

        CountDownLatch countDownLatch = new CountDownLatch(1000);

        IntStream.range(0, 1000).forEach(i -> new Thread(() -> {
            lock.lock();

            try {
                IntStream.range(0, 10000).forEach(j -> {
                    count++;
                });
            } finally {
                lock.unlock();
            }
//            System.out.println(Thread.currentThread().getName());
            countDownLatch.countDown();
        }, "tt-" + i).start());

        countDownLatch.await();

        System.out.println(count);
    }
}

可以去掉lock.lock();lock.unlock();再运行一次,数字不对。加上去数字就对了,就表明我们的锁实现的是正确的。

CLH队列

CLH列表是AQS中用于保存阻塞线程的双向链表,它的每个节点里面保存了因为等待锁而阻塞的线程。

CLH

我们来看一下这几个属性:

属性

waitStatus

当前被封装成Node结点的等待状态,共有4种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。

  • CANCELLED:值为1,等待的节点被取消。
  • SIGNAL:值为-1,等待被唤醒的节点,如果前继线程释放了同步锁或被取消,将会通知该后继结点的线程执行。
  • CONDITION:值为-2,表示等待着condition的线程。其他线程调用了condition的signal方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE:值为-3,共享模式下,前驱节点不仅会唤醒其后继节点,同时也可能会唤醒后继的后继节点。

状态判断

状态 判断结果 说明
waitStatue>0 取消状态 说明该线程中断或者等待超时,需要移除该线程
waitStatue=0 初始化状态 该节点尚未被初始化完成
waitStatue<0 有效状态 线程处于可以被唤醒的状态

prve next thread

  • prev:前置节点的地址
  • next:后置节点的地址
  • thread:当前被阻塞的线程

nextWaiter

nextWaiter用于条件队列,是单向的,相当于复用了CLH队列,但是不使用其prev和next,而是用nextWaiter来连接,它指向下一个处于CONDITION状态的节点。

同时在同步队列中,nextWaiter还被复用与表示当前node是等待在共享还是独占模式中:

模式 含义
SHARED 表示线程以共享的模式等待锁
EXCLUSIVE 表示线程正在以独占的方式等待锁

出入队

入队

入队因为是多线程可能调用的,所以需要使原子操作CAS。CAS将tail指向新的节点后,将原来的tail的next指向新的节点。

1817013129-5cbd6a7a93674_articlex

出队

出队因为是获取同步状态成功后的节点调用的,所以不需要CAS。首节点释放同步状态后,将会唤醒后续节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。只需要将首节点设置为原来首节点的后续节点,并断开原来首节点的next即可。

2751180000-5cbd6b431be55_articlex

CAS修改

Node的几个变量都是可以通过CAS来修改的,其中包装了方法:

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

static {
    try {
        stateOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        headOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        tailOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        waitStatusOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("waitStatus"));
        nextOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("next"));

    } catch (Exception ex) { throw new Error(ex); }
}

同步状态state

state表示当前临界资源的获锁情况:

/**
 * The synchronization state.
 */
//控制加锁解锁的状态变量
private volatile int state;

AQS提供了以下final方法给子类使用:

方法 描述
protected final int getState() 获取State的值
protected final void setState(int newState) 设置State的值
protected final boolean compareAndSetState(int expect, int update) 使用CAS方式更新State

refer

https://segmentfault.com/a/1190000012463330
https://www.cnblogs.com/yanlong300/p/10953185.html

comments powered by Disqus