AtomicInteger
AtomicInteger是对int的一个封装,利用CAS保证了原子性操作。
它依赖Unsafe提供的一些底层能力进行底层操作:
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");
private volatile int value;
public final int getAndIncrement() {
return U.getAndAddInt(this, VALUE, 1);
}
getAndAddInt需要返回原始数组,所以需要添加失败重试逻辑。
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}
CompareAndSet之类的函数,返回值就是成功与否。
public final boolean compareAndSet(int expectedValue, int newValue)
Atomic*FieldUpdater
比AtomicLong等更加紧凑。
atomic包的LongAdder在高度竞争环境下,可能是比AtomicLong更佳的选择。尽管其本质是空间换时间。
Java9之后可以使用Variable Handle API:获取相应句柄,然后调用其CAS方法。
private static final VarHandle HANDLE = MethodHandles.lookup().findStaticVarHandle
(AtomicBTreePartition.class, "lock");
private void acquireLock(){
long t = Thread.currentThread().getId();
while (!HANDLE.compareAndSet(this, 0L, t)){
// 等待一会儿,数据库操作可能比较慢
…
}
}
CAS的副作用
常见的失败重试机制,都隐含着竞争情况是短暂的的假设。大多数场景下确实是只发生一次就获得了成功,但是总有意外情况,所以在有需要的时候,需要考虑限制自旋的次数,以免过度消耗CPU。
ABA问题
Java提供了AtomicStampedReference工具类,通过为引用简历类似版本号的方式,保证CAS的正确性。
AQS
详细可以参考http://yizhanggou.top/aqs/ ,这里做下总结。
将基础的同步相关操作抽象在AbstractQueuedSynchronizer中,利用AQS为我们构建同步结构提供范本。
AQS内部数据和方法
- 一个volatile的证书成员表征状态,同时提供了setState和getState。
private volatile int state;
- 一个FIFO的等待线程队列,以实现多线程间竞争和等待。
- 各种基于CAS的基础操作方法,以及各种期望具体同步结构去实现的acquire/release方法。
利用CAS去实现一个同步结构,至少要实现acquire和release,分别是获取资源的独占权,以及释放对这个资源的独占。
以ReentrantLock为例
内部通过扩展AQS实现了Sync类型,以AQS的state来反映锁的持有情况。
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer { …}
acquire/release操作
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
acquire实现在AQS内部:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先看下tryAcquire,在ReentrantLock,tryAcquire逻辑是现在NonfairSync和FairSync中,而AQS内部tryAcquire仅仅是个接近为未实现的方法,需要实现者自己定义。
公平性:
public ReentrantLock() {
sync = new NonfairSync(); // 默认是非公平的
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
非公平的tryAcquire:在锁无人占有时,不检查是否有其他等待者。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();// 获取当前 AQS 内部状态量
if (c == 0) { // 0 表示无人占有,则直接用 CAS 修改状态位,
if (compareAndSetState(0, acquires)) {// 不检查排队情况,直接争抢
setExclusiveOwnerThread(current); // 并设置当前线程独占锁
return true;
}
} else if (current == getExclusiveOwnerThread()) { // 即使状态不是 0,也可能当前线程是锁持有者,因为这是再入锁
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如果tryAcquire失败,则进入排队竞争阶段:如果当前节点的前面是头结点,则试图获取锁,一切顺利则成为新的头结点。否则,有必要则等待。
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {// 循环
final Node p = node.predecessor();// 获取前一个节点
if (p == head && tryAcquire(arg)) { // 如果前一个节点是头结点,表示当前节点合适去 tryAcquire
setHead(node); // acquire 成功,则设置新的头节点
p.next = null; // 将前面节点对当前节点的引用清空
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node)) // 检查是否失败后需要 park
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);// 出现异常,取消
if (interrupted)
selfInterrupt();
throw t;
}
}