JDK并发集合3:一致性问题(基础篇)

JMM就是为了解决多线程环境下共享变量的一致性问题,那么一致性包含哪些内容呢?它包括可见性、有序性和原子性三方面。

原子性

一个操作是不可中断的,要么全部执行成功要么全部执行失败。

JMM模型中八种操作是原子性的。

可见性

所有线程都能看到共享内存的最新状态。

JMM是通过在变更修改后同步回主内存,在变量读取前从主内存刷新变量值来实现的,它是依赖主内存的。而当某个线程的工作内存对这个变量进行修改时并没有及时同步到主内存中,这种情况下该变量对于其他线程来说是不可见的。

这里需要注意几个关键字:

  • volatile:修改后立即刷回主内存,读取时从主内存读取
  • synchronized:对一个变量执行unlock之前,把变量同步回主内存
  • final:一旦在构造器中初始化完成,则其他线程都能看到这个变量

有序性

数据不相关的变量在并发的情况下,实际执行的结果和单线程的执行结果是一样的,不会因为重排序的问题导致结果不可预知。

之前谈到缓存在执行一致性时通知其他缓存是需要时间的,CPU会将这个指令的执行放入缓冲区,并执行下一条指令,这是内存系统的重排序。还有编译级别的重排序。依据JMM规范java是天然有序的,这种有序是在本线程内观察有序。如果在另一个线程中观察,所有的操作都是无序的。

happen-before

Java语言天然定义了一个“happen-before”原则,它是指如果操作A先行发生于操作B,那么操作A产生的影响能够被操作B感知到,这种影响包括修改了共享内存中变量的值、发送了消息、调用了方法等:

  • 如果一个操作happens-before另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前。如果A happens-before B,那么Java内存模型将向程序员保证——A操作的结果将对B可见,且A的执行顺序排在B之前。
  • 两个操作之间存在happens-before关系,并不意味着Java平台的具体实现必须要按照happens-before关系指定的顺序来执行。如果重排序之后的执行结果,与按happens-before关系来执行的结果一致,那么这种重排序并不非法(也就是说,JMM允许这种重排序)。

happen-before定义了以下几个原则:

  • 程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作。
  • 锁定规则:一个unlock操作先行发生于后面对同一个锁的lock操作.
  • volatile变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作。如果一个线程先去写一个变量,然后另一个线程去进行读取,那么写入操作肯定会先行发生于读操作。
  • 传递规则:如果操作A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C。
  • 线程启动规则:Thread对象的start()方法先行发生于此线程的每个一个动作
  • 线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生
  • 线程终结规则:线程中所有的操作都先行发生于线程的终止检测,我们可以通过Thread.join()方法结束、Thread.isAlive()的返回值手段检测到线程已经终止执行
  • 对象终结规则:一个对象的初始化完成先行发生于他的finalize()方法的开始

volatile

volatile可以说是Java虚拟机提供的最轻量级的同步机制了。

volatile关键字 :当变量的值被该关键字修饰后该值任何读写操作对于其他线程是立即可见的。并且被关键字修饰后的变量被禁止重排序。

即volatile有可见性和有序性的性质。

可见性

volatile关键字在编译的时候:

  • 写操作后添加Store Memory Barrier将工作内存中变量同步回主内存
  • 读操作前添加Load Memory Barrier从主内存读取变量

有序性

1195582-20180518142309854-1020734018
其中各种屏障的含义:

  • StoreStore屏障 该指令之前的写操作不能和该指令之后的写操作重排序.
  • StoreLoad屏障 该指令之前的写操作和该指令之后的读操作重排序.
  • LoadLoad屏障 该指令之前的读操作不能和该指令之后的读操作重排序.
  • LoadStore屏障 该指令之前的读操作不能和该指令之后的写操作重排序。

写操作

  • volatile写和普通写不能交换位置。也就能保证volatile写的值是最新的值。
  • volatile写以及后来的读不能交换位置,也就是后来的读必须在volatile之后执行。

读操作

  • volatile读不能与之后的读操作和写操作交换位置。也就是说volatile读之后的读写操作都必须在volatile读之后完成。

例子

我们来看个例子:

public class Visibility {
    //轮流注释这两行
    public static  boolean a = true;
//    public static volatile boolean a = true;
    public static void main(String[] args) throws Exception {
        System.out.println("我开始了");
        ExecutorService executorService = Executors.newCachedThreadPool();
        //线程开始
        executorService.execute(() -> {
            while(a){

            }
            System.out.println("我退出了");
        });
        Thread.sleep(100);
        a = false;
        executorService.shutdown();
    }
}

分别注释和打开这两行:

public static  boolean a = true;
public static volatile boolean a = true;

可以注意到,如果不用volatile修饰变量a,则线程池无法结束。

编译

我们来看下volatile的编译实现:
volatile

可以看到编译后在volatile write后添加一个lock指令作为StoreLoad屏障来保证可见性和防止重排序的。

lock前缀确保处理器可以独享共享内存,锁住系统总线,让其他CPU不能访问总线,也就是不能访问系统内存,或者依赖缓存一致性来保证加锁操作的自动执行。然后处理器的缓存回写到内存,导致其他处理器的缓存无效,线程接下来将从主内存中读取共享变量,也就是MESI协议所做的事(相当于volatile读的内存语义)

HotSpot源码

我们也可以找下hotspot的源码:
volatile-hotspot
可以看到确实是通过加lock前缀来实现的fence。

CAS

CAS操作,CompareAndSwap,jdk中整个Atomic类的基础。

下面是整个Concurrent包的层次体系,我们将简单介绍一下Atomic,然后往上讲到并发容器。

--2020-10-23---1.13.13

悲观锁和乐观锁

悲观锁

使用者认为数据发生并发冲突的概率很大,所以拿出来前先上锁,修改完赋值回去后再解锁。

乐观锁

使用者认为数据发生并发冲突的概率很小,所以拿出来的时候不上锁,等到修改完赋值回去的时候再查看这个值是否改变,如果没有改变则直接赋值回去,如果变了则重新拿出来修改再赋值回去,直到修改成功。其中对比原值和赋值回去这两个操作需要在一个原子操作里面。

juc中的CAS是典型的乐观锁实现,在MySQL中也有类似的实现思路。

源码实现

我们先来看下AtomicInteger的源码。

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;//封装了一个int变量,对其进行CAS操作

    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }
    
    public final int getAndDecrement() {
        return unsafe.getAndAddInt(this, valueOffset, -1);
    }
}

里面引用到了Unsafe类:

public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!compareAndSwapInt(o, offset, v, v + delta));
    return v;
}

public final native boolean compareAndSwapInt(Object o, long offset,
                                              int expected,
                                              int x);

追踪到Hotspot源码那边:
unsafe.cpp

...//省略
{CC"compareAndSwapInt",  CC"("OBJ"J""I""I"")Z",      FN_PTR(Unsafe_CompareAndSwapInt)},
...//省略

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

可以看到调用到了Atomic::cmpxchg,其在linux下的实现如下:

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}

可以看到编译出来的汇编是使用了cmpxchgl,这是内嵌汇编代码。cmpxchg是汇编命令,作用是比较并交换两个操作数,如果相等则交换。

cmpxchg

cmpxchg是汇编指令
作用:比较并交换操作数.
如:CMPXCHG r/m,r 将累加器AL/AX/EAX/RAX中的值与首操作数(目的操作数)比较,如果相等,第2操作数(源操作数)的值装载到首操作数,zf置1。如果不等, 首操作数的值装载到AL/AX/EAX/RAX并将zf清0
该指令只能用于486及其后继机型。第2操作数(源操作数)只能用8位、16位或32位寄存器。第1操作数(目地操作数)则可用寄存器或任一种存储器寻址方式。

这里可以看到,cmpxchgl之前会加上LOCK_IF_MP带上lock命令:
atomic_linux_x86.inline.hpp

// Adding a lock prefix to an instruction on MP machine
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "

我们来看下CMPXCHG在intel指令手册中的说明:

Description
Compares the value in the AL, AX, EAX, or RAX register with the first operand (destination operand). If the two values are equal, the second operand (source operand) is loaded into the destination operand. Otherwise, the destination operand is loaded into the AL, AX, EAX or RAX register. RAX register is available only in 64-bit mode.
This instruction can be used with a LOCK prefix to allow the instruction to be executed atomically. To simplify the interface to the processor’s bus, the destination operand receives a write cycle without regard to the result of the comparison. The destination operand is written back if the comparison fails; otherwise, the source operand is written into the destination. (The processor never produces a locked read without also producing a locked write.)

可以看到,CMPXCHG对比寄存器中的值跟第一个操作数的值,如果相等,则把第二个操作数的值赋值到目标操作数中。然后我们还可以看到,这个命令需要一个LOCK前缀以保证被原子的执行。

我们再看看LOCK前缀在Intel指令手册中的说明:

Causes the processor’s LOCK# signal to be asserted during execution of the accompanying instruction (turns the instruction into an atomic instruction). In a multiprocessor environment, the LOCK# signal ensures that the processor has exclusive use of any shared memory while the signal is asserted.

即与操作码一起用,发出LOCK信号保证原子性。

LOCK

我们可以看到,volatile使用的时候其汇编是加了LOCK前缀的,以保证该操作能原子的进行,以保证该操作的可见性。而CMPXCHG也需要与LOCK前缀一起使用,以保证该操作的原子性。那么LOCK具体做了什么?

Intel系统开发手册中提到:

8.1.4 Effects of a LOCK Operation on Internal Processor Caches
For the Intel486 and Pentium processors, the LOCK# signal is always asserted on the bus during a LOCK operation, even if the area of memory being locked is cached in the processor.
For the P6 and more recent processor families, if the area of memory being locked during a LOCK operation is cached in the processor that is performing the LOCK operation as write-back memory and is completely contained in a cache line, the processor may not assert the LOCK# signal on the bus. Instead, it will modify the memory location internally and allow it’s cache coherency mechanism to ensure that the operation is carried out atomically. This operation is called “cache locking.” The cache coherency mechanism automatically prevents two or more processors that have cached the same area of memory from simultaneously modifying data in that area.

Intel486和Pentium中,即使需要被锁住的数据已经被缓存在cache中,也需要发送LOCK锁住总线。如果是P6和更高的处理器,如果同时满足以下三个条件:

  1. 需要被锁住的内存区域已经被缓存再cpu cache中了
  2. 被完全包含在一个cache line中
  3. 内存同步策略是write-back
    那么处理器可能不会发送LOCK锁住总线,它会在本地cache中修改数据,然后通过缓存一致性机制来保证操作的原子性。这种操作被称为“缓存锁”。缓存一致性机制能保证多核不会同时修改同一块被缓存的内存区域。

关于write-back,我们再看看Intel系统开发手册的解释:

Write-back (WB) — Writes and reads to and from system memory are cached. Reads come from cache lines on cache hits; read misses cause cache fills. Speculative reads are allowed. Write misses cause cache line fills (in processor families starting with the P6 family processors), and writes are performed entirely in the cache, when possible. Write combining is allowed. The write-back memory type reduces bus traffic by eliminating many unnecessary writes to system memory. Writes to a cache line are not immediately forwarded to system memory; instead, they are accumulated in the cache. The modified cache lines are written to system memory later, when a write-back operation is performed. Write-back operations are triggered when cache lines need to be deallocated, such as when new cache lines are being allocated in a cache that is already full. They also are triggered by the mechanisms used to maintain cache consistency. This type of cache-control provides the best performance, but it requires that all devices that access system memory on the system bus be able to snoop memory accesses to insure system memory and cache coherency.

即write-back情况下,写操作是写到缓存行,直到这个缓存行被写回内存,通常是需要分配新的缓存行的时候才需要把脏数据写回内存。

我们再看看一下这篇资料

In the days of Intel 486 processors, the lock prefix used to assert a lock on the bus along with a large hit in performance. Starting with the Intel Pentium Pro architecture, the bus lock is transformed into a cache lock. A lock will still be asserted on the bus in the most modern architectures if the lock resides in uncacheable memory or if the lock extends beyond a cache line boundary splitting cache lines. Both of these scenarios are unlikely, so most lock prefixes will be transformed into cache lock which is much less expensive.

以及VTune的资料

Intel processors provide a LOCK# signal that is asserted automatically during certain critical memory operations to lock the system bus or equivalent link. While this output signal is asserted, requests from other processors or bus agents for control of the bus are blocked. This metric measures the ratio of bus cycles, during which a LOCK# signal is asserted on the bus. The LOCK# signal is asserted when there is a locked memory access due to uncacheable memory, locked operation that spans two cache lines, and page-walk from an uncacheable page table.

即当代的CPU,当缓存写回策略为write-back的且数据被缓存在一个缓存行里面的时候,都会走缓存锁,而不会发LOCK信号到总线上锁住总线。

我们知道当CPU要修改缓存的数据的时候,需要发送invalidate信号。每个CPU中又有store buffer和invalidate queue,在发送和接收invalidate信号的时候不是立即就处理的。那么如果多个CPU同时要修改一块数据的时候,当前都是write-back,他们是怎么保持一致性的呢?例如两个CPU都要处理数据,都发送了invalidate,但是他们都被扔到了invalidate queue里面,对方的CPU都没有去处理,那么都同时修改了数据,状态为M。

我们来看看这两份资料:
MESI protocol

if a MESI message needs to be sent regarding a cache line in the invalidate queue then wait until the line is invalidated

Cache Coherency

How the bus is used

  • broadcast medium
  • entire coherency operation is atomic wrt other processors
    • keep-the-bus protocol: master holds the bus until the entire operation has completed
    • split-transaction buses:
      • request & response are different phases
      • state value that indicates that an operation is in progress
      • do not initiate another operation for a cache block that has
        one in progress

可以看到,当CPU要修改数据的时候,如果它的invalidate queue里面有该cache,那么需要处理完该信息之后才能发送MESI信息。

即,当CPU当前有个S状态的cache,该CPU需要对它进行写入,那么会首先占用CPU Cache之间的Ring Bus发送invalidate message。Ring Bus被占用住的期间,其他CPU需要等待。发完后其他CPU会立即回复ack,然后Ring Bus解锁。然后另一个CPU也想要发送invalidate message的时候发现该cache在自己的invalidate queue中,那么就需要处理自己的invalidate queue的信息,然后该cache被invalidate了。最后只有一个CPU的cache能够变为M,其他都变为I了。

Unsafe

原子更新基本类型、引用类型:AtomicXXX

我们回到Unsafe类。在Unsafe中,提供了三种类型的操作:int、long、Object,实现了以下几个类:

  1. AtomicInteger:使用int
  2. AtomicLong:使用long
  3. AtomicBoolean:使用int
  4. AtomicReference:通过泛型使用Object
  5. AtomicStampedReference:原子更新引用类型,解决ABA问题,把数据和版本号封装在一个object里面。
  6. AtomicMarkableReference:和AtomicStampedReference原理一样,其版本号是boolean。

原子更新对象中的字段:AtomicXXXFieldUpdater

如果你不能修改类的源码,但是又想要类变量有CAS的功能,那么可以使用Unsafe类的AtomicXXXFieldUpdater。比如我们先来AtomicIntegerFieldUpdater的实现。

其构造函数式protected的,必须通过newUpdater获取一个AtomicIntegerFieldUpdater:

protected AtomicIntegerFieldUpdater() {
}
@CallerSensitive
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
                                                          String fieldName) {
    return new AtomicIntegerFieldUpdaterImpl<U>
        (tclass, fieldName, Reflection.getCallerClass());
}

AtomicIntegerFieldUpdaterImpl(final Class<T> tclass,
                              final String fieldName,
                              final Class<?> caller) {
    final Field field;
    final int modifiers;
    try {
        field = AccessController.doPrivileged(
            new PrivilegedExceptionAction<Field>() {
                public Field run() throws NoSuchFieldException {
                    return tclass.getDeclaredField(fieldName);
                }
            });
        modifiers = field.getModifiers();
        sun.reflect.misc.ReflectUtil.ensureMemberAccess(
            caller, tclass, null, modifiers);
        ClassLoader cl = tclass.getClassLoader();
        ClassLoader ccl = caller.getClassLoader();
        if ((ccl != null) && (ccl != cl) &&
            ((cl == null) || !isAncestor(cl, ccl))) {
          sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
        }
    } catch (PrivilegedActionException pae) {
        throw new RuntimeException(pae.getException());
    } catch (Exception ex) {
        throw new RuntimeException(ex);
    }

    Class<?> fieldt = field.getType();
    if (fieldt != int.class)
        throw new IllegalArgumentException("Must be integer type");

    if (!Modifier.isVolatile(modifiers))
        throw new IllegalArgumentException("Must be volatile type");

    this.cclass = (Modifier.isProtected(modifiers) &&
                   caller != tclass) ? caller : null;
    this.tclass = tclass;
    offset = unsafe.objectFieldOffset(field);
}

newUpdater需要传入要修改的类和变量的名字,然后通过反射获取这个类的成员变量,再包装为一个AtomicIntegerFieldUpdater变量。

修改的时候:原理也是一样,调用了compare and swap。

public boolean compareAndSet(T obj, int expect, int update) {
    if (obj == null || obj.getClass() != tclass || cclass != null) fullCheck(obj);
    return unsafe.compareAndSwapInt(obj, offset, expect, update);
}

限制条件:要使用AtomicIntegerFieldUpdater修改变量,则该变量必须是volatile修饰的int类型,在其构造函数中可以看到:

Class<?> fieldt = field.getType();
if (fieldt != int.class)
    throw new IllegalArgumentException("Must be integer type");

if (!Modifier.isVolatile(modifiers))
    throw new IllegalArgumentException("Must be volatile type");

AtomicLongFieldUpdater、AtomicReferenceFieldUpdater也是同理。

原子更新数组中元素:AtomicXXXArray

针对数组中的某个元素进行原子操作。
以AtomicIntegerArray为例子,其getAndIncrement操作:

public final int getAndIncrement(int i) {
    return getAndAdd(i, 1);
}
public final int getAndAdd(int i, int delta) {
    return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
}

private long checkedByteOffset(int i) {
    if (i < 0 || i >= array.length)
        throw new IndexOutOfBoundsException("index " + i);

    return byteOffset(i);
}

private static long byteOffset(int i) {
    return ((long) i << shift) + base;
}

byteOffset是用于把下标i转换成当前数组对应的第i个元素的内存地址。
其中shift和base的取值:base表示数组的首地址的位置,scale表示一个数组元素的大小,那么i的偏移量应该等于base + i * scale

为了优化性能,这里使用了移位操作来计算。numberOfLeadingZero是获取int值前面有多少个0,31-Integer.numberOfLeadingZeros(scale)表示scale中从最低位数起1的位置(scale是2的整数方次),所以偏移量的计算 base + i * scale可以表示为base + (i << shift)

private static final int base = unsafe.arrayBaseOffset(int[].class);

static {
    int scale = unsafe.arrayIndexScale(int[].class);
    if ((scale & (scale - 1)) != 0)
        throw new Error("data type scale not a power of two");
    shift = 31 - Integer.numberOfLeadingZeros(scale);
}

public static int numberOfLeadingZeros(int i) {
    // HD, Figure 5-6
    if (i == 0)
        return 32;
    int n = 1;
    if (i >>> 16 == 0) { n += 16; i <<= 16; }
    if (i >>> 24 == 0) { n +=  8; i <<=  8; }
    if (i >>> 28 == 0) { n +=  4; i <<=  4; }
    if (i >>> 30 == 0) { n +=  2; i <<=  2; }
    n -= i >>> 31;
    return n;
}

Unsafe中是调用到了对Object的CAS,即array转换为Object,数组元素转换为offset调用的CAS。

public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!compareAndSwapInt(o, offset, v, v + delta));
    return v;
}

AtomicLongArray和AtomicReferenceArray同理。

高性能原子类

JDK8新增了对于原子操作Long和Double的高性能原子类LongAdder、LongAccumulator和DoubleAdder、DoubleAccumulator,它们采用分段的思想,把不同的线程hash到不同的段上去更新,统计的时候再全部加起来。

--2020-10-24---1.59.51

LongAdder原理

我们先来看LongAdder是怎么add的:

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

首先尝试CAS 父类Striped64的base变量,如果不成功,则计算线程的probe,hash到数组cells中,尝试cas其值。如果数组修改都失败了,则调用父类Striped64的longAccumulate,该函数在一个无限循环中做了以下几件事情:

  1. 如果cells不为null
    • 如果数组的目标位置为null且cellsBusy为0,则cas cellsBusy为1后创建该Cell,成功则退出
    • 尝试cas 数组目标位置的值,成功则退出
    • 如果上面都不通过,尝试扩容
  2. 如果cell为null,cas cellsBusy后尝试初始化
  3. 否则,依然尝试cas base,成功则退出

即在无限循环中尝试更新base或者cells,直到成功。

Striped64.class

transient volatile long base;
transient volatile Cell[] cells;

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

我们可以看到LongAdder的基本原理:把要更新的数拆成一个base变量和一个Cells数组,如果base变量更新不成功,即意味着多线程更新压力大,则把多线程更新的压力分散到每个cell单元里面去。
--2020-10-24---2.15.25

其sum函数就是计算base+cells的值:

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

DoubleAdder和LongAdder同理,使用的也是long类型。

LongAccumulator

LongAccumulator实现原理和LongAdder类似,但是可以自定义二元操作符并传入一个初始值:

public LongAccumulator(LongBinaryOperator accumulatorFunction,
                       long identity) {
    this.function = accumulatorFunction;
    base = this.identity = identity;
}

操作符的左值为base变量或者Cells[]中元素的当前值,右值可类比于LongAdder的add函数传入的x。我们来看其accumulate函数:

public void accumulate(long x) {
    Cell[] as; long b, v, r; int m; Cell a;
    if ((as = cells) != null ||
        (r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended =
              (r = function.applyAsLong(v = a.value, x)) == v ||
              a.cas(v, r)))
            longAccumulate(x, function, uncontended);
    }
}

与LongAdder的区别就是目标值计算的方式不一样。其他流程都是一样的。

DoubleAdder与DoubleAccumulator也是使用的long类型。

refer

https://www.cnblogs.com/yanlong300/p/9050733.html
https://blog.csdn.net/lotluck/article/details/78793468
https://yemablog.com/posts/cache-locking
https://yemablog.com/posts/cache-locking-2

comments powered by Disqus