AtomicInteger底层实现原理是什么?如何在自己的产品代码中应用CAS操作? - 《Java核心技术》笔记

AtomicInteger

AtomicInteger是对int的一个封装,利用CAS保证了原子性操作。

它依赖Unsafe提供的一些底层能力进行底层操作:

private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");
private volatile int value;
public final int getAndIncrement() {
    return U.getAndAddInt(this, VALUE, 1);
}

getAndAddInt需要返回原始数组,所以需要添加失败重试逻辑。

public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!weakCompareAndSetInt(o, offset, v, v + delta));
    return v;
}

CompareAndSet之类的函数,返回值就是成功与否。

public final boolean compareAndSet(int expectedValue, int newValue)

Atomic*FieldUpdater

比AtomicLong等更加紧凑。

atomic包的LongAdder在高度竞争环境下,可能是比AtomicLong更佳的选择。尽管其本质是空间换时间。

Java9之后可以使用Variable Handle API:获取相应句柄,然后调用其CAS方法。

private static final VarHandle HANDLE = MethodHandles.lookup().findStaticVarHandle
        (AtomicBTreePartition.class, "lock");

private void acquireLock(){
    long t = Thread.currentThread().getId();
    while (!HANDLE.compareAndSet(this, 0L, t)){
        // 等待一会儿,数据库操作可能比较慢
        …
    }
}

CAS的副作用

常见的失败重试机制,都隐含着竞争情况是短暂的的假设。大多数场景下确实是只发生一次就获得了成功,但是总有意外情况,所以在有需要的时候,需要考虑限制自旋的次数,以免过度消耗CPU。

ABA问题

Java提供了AtomicStampedReference工具类,通过为引用简历类似版本号的方式,保证CAS的正确性。

AQS

详细可以参考http://yizhanggou.top/aqs/ ,这里做下总结。

将基础的同步相关操作抽象在AbstractQueuedSynchronizer中,利用AQS为我们构建同步结构提供范本。

AQS内部数据和方法

  • 一个volatile的证书成员表征状态,同时提供了setState和getState。private volatile int state;
  • 一个FIFO的等待线程队列,以实现多线程间竞争和等待。
  • 各种基于CAS的基础操作方法,以及各种期望具体同步结构去实现的acquire/release方法。

利用CAS去实现一个同步结构,至少要实现acquire和release,分别是获取资源的独占权,以及释放对这个资源的独占。

以ReentrantLock为例

内部通过扩展AQS实现了Sync类型,以AQS的state来反映锁的持有情况。

private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer { …}

acquire/release操作

public void lock() {
    sync.acquire(1);
}
public void unlock() {
    sync.release(1);
}

acquire实现在AQS内部:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

首先看下tryAcquire,在ReentrantLock,tryAcquire逻辑是现在NonfairSync和FairSync中,而AQS内部tryAcquire仅仅是个接近为未实现的方法,需要实现者自己定义。

公平性:

public ReentrantLock() {
        sync = new NonfairSync(); // 默认是非公平的
    }
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

非公平的tryAcquire:在锁无人占有时,不检查是否有其他等待者。

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();// 获取当前 AQS 内部状态量
    if (c == 0) { // 0 表示无人占有,则直接用 CAS 修改状态位,
      if (compareAndSetState(0, acquires)) {// 不检查排队情况,直接争抢
          setExclusiveOwnerThread(current);  // 并设置当前线程独占锁
          return true;
      }
    } else if (current == getExclusiveOwnerThread()) { // 即使状态不是 0,也可能当前线程是锁持有者,因为这是再入锁
      int nextc = c + acquires;
      if (nextc < 0) // overflow
          throw new Error("Maximum lock count exceeded");
      setState(nextc);
      return true;
  }
  return false;
}

如果tryAcquire失败,则进入排队竞争阶段:如果当前节点的前面是头结点,则试图获取锁,一切顺利则成为新的头结点。否则,有必要则等待。

final boolean acquireQueued(final Node node, int arg) {
      boolean interrupted = false;
      try {
      for (;;) {// 循环
          final Node p = node.predecessor();// 获取前一个节点
          if (p == head && tryAcquire(arg)) { // 如果前一个节点是头结点,表示当前节点合适去 tryAcquire
              setHead(node); // acquire 成功,则设置新的头节点
              p.next = null; // 将前面节点对当前节点的引用清空
              return interrupted;
          }
          if (shouldParkAfterFailedAcquire(p, node)) // 检查是否失败后需要 park
              interrupted |= parkAndCheckInterrupt();
      }
       } catch (Throwable t) {
      cancelAcquire(node);// 出现异常,取消
      if (interrupted)
              selfInterrupt();
      throw t;
      }
}
comments powered by Disqus