接下来我们正式进入JDK并发集合的章节。
Treiber Stack
Treiber Stack是一种可扩展的无锁栈,利用细粒度的并发原语CAS来实现的。在FutureTask中会用到。
入栈
- 单链表保存了各个元素
- 入栈时,首先创建一个newHead,把next指针指向top
- 通过CAS替换top为newHead,CAS的条件是top==oldHead
出栈
- 创建一个新的指针,指向top的next元素,假设其next元素为newHead
- CAS将top指向newHead,CAS的条件同样是top==oldHead
AQS
继承
继承于AbstractOwnableSynchronizer,这个类只有一个变量exclusiveOwnerThread,表示当前排他状态持有锁的线程。
原理
AQS是JDK中几乎所有锁和同步器的基础框架。它作为锁类的模板类,维护了一个队列以及一个state状态变量。它的队列使用双链表实现,用于保存等待锁排队的线程。state状态变量用于同步状态,以控制加锁解锁,该状态由子类来维护。基于AQS自己动手写一个锁非常简单,只需要实现AQS的几个方法即可。
AQS框架主要实现的是:
- 线程阻塞队列的维护
- 线程阻塞和唤醒
下面举个例子,在独占模式下,我们可以通过设置state为0为不被占有,state为1为被独占来实现独占锁。
在共享模式下,我们可以设置state大于0为可以共享,否则为不可以共享,即控制共享的次数。
实现
子类继承AQS,只需要实现以下几个方法即可:
tryAcquire //互斥模式:尝试获取锁
tryRelease //互斥模式:尝试释放锁
tryAcquireShared //共享模式:尝试获取锁
tryReleaseShared //共享模式:尝试释放锁
isHeldExclusively //当前线程是否独占锁
这里有个例子:
public class MyLockBaseOnAqs {
// 定义一个同步器,实现AQS类
private static class Sync extends AbstractQueuedSynchronizer {
// 实现tryAcquire(acquires)方法
@Override
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 实现tryRelease(releases)方法
@Override
protected boolean tryRelease(int releases) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
// 声明同步器
private final Sync sync = new Sync();
// 加锁
public void lock() {
sync.acquire(1);
}
// 解锁
public void unlock() {
sync.release(1);
}
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
MyLockBaseOnAqs lock = new MyLockBaseOnAqs();
CountDownLatch countDownLatch = new CountDownLatch(1000);
IntStream.range(0, 1000).forEach(i -> new Thread(() -> {
lock.lock();
try {
IntStream.range(0, 10000).forEach(j -> {
count++;
});
} finally {
lock.unlock();
}
// System.out.println(Thread.currentThread().getName());
countDownLatch.countDown();
}, "tt-" + i).start());
countDownLatch.await();
System.out.println(count);
}
}
可以去掉lock.lock();
和lock.unlock();
再运行一次,数字不对。加上去数字就对了,就表明我们的锁实现的是正确的。
CLH队列
CLH列表是AQS中用于保存阻塞线程的双向链表,它的每个节点里面保存了因为等待锁而阻塞的线程。
我们来看一下这几个属性:
属性
waitStatus
当前被封装成Node结点的等待状态,共有4种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。
- CANCELLED:值为1,等待的节点被取消。
- SIGNAL:值为-1,等待被唤醒的节点,如果前继线程释放了同步锁或被取消,将会通知该后继结点的线程执行。
- CONDITION:值为-2,表示等待着condition的线程。其他线程调用了condition的signal方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
- PROPAGATE:值为-3,共享模式下,前驱节点不仅会唤醒其后继节点,同时也可能会唤醒后继的后继节点。
状态判断
状态 | 判断结果 | 说明 |
---|---|---|
waitStatue>0 | 取消状态 | 说明该线程中断或者等待超时,需要移除该线程 |
waitStatue=0 | 初始化状态 | 该节点尚未被初始化完成 |
waitStatue<0 | 有效状态 | 线程处于可以被唤醒的状态 |
prve next thread
- prev:前置节点的地址
- next:后置节点的地址
- thread:当前被阻塞的线程
nextWaiter
nextWaiter用于条件队列,是单向的,相当于复用了CLH队列,但是不使用其prev和next,而是用nextWaiter来连接,它指向下一个处于CONDITION状态的节点。
同时在同步队列中,nextWaiter还被复用与表示当前node是等待在共享还是独占模式中:
模式 | 含义 |
---|---|
SHARED | 表示线程以共享的模式等待锁 |
EXCLUSIVE | 表示线程正在以独占的方式等待锁 |
出入队
入队
入队因为是多线程可能调用的,所以需要使原子操作CAS。CAS将tail指向新的节点后,将原来的tail的next指向新的节点。
出队
出队因为是获取同步状态成功后的节点调用的,所以不需要CAS。首节点释放同步状态后,将会唤醒后续节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。只需要将首节点设置为原来首节点的后续节点,并断开原来首节点的next即可。
CAS修改
Node的几个变量都是可以通过CAS来修改的,其中包装了方法:
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
同步状态state
state表示当前临界资源的获锁情况:
/**
* The synchronization state.
*/
//控制加锁解锁的状态变量
private volatile int state;
AQS提供了以下final方法给子类使用:
方法 | 描述 |
---|---|
protected final int getState() | 获取State的值 |
protected final void setState(int newState) | 设置State的值 |
protected final boolean compareAndSetState(int expect, int update) | 使用CAS方式更新State |
refer
https://segmentfault.com/a/1190000012463330
https://www.cnblogs.com/yanlong300/p/10953185.html