如何保证集合是线程安全的? ConcurrentHashMap如何实现高效地线程安全? - 《java核心技术》笔记

简述

问:为什么需要ConcurrentHashMap?
答:因为Hashtable本身比较低效,它在所有的put、get、size方法上面加上了“synchronized”。而Collections提供的同步包装器,只是将“this”作为互斥的mutex。

早期的ConcurrentHashMap基于:

  • 分段锁,将内部进行分段(Segment),里面是HashEntry的数组;
  • HashEntry内部使用volatile的value字段保证可见性,也利用了不可变对象的机制以改进利用Unsafe提供的底层能力,比如volatile access;

d45bcf9a34da2ef1ef335532b0198bd9

简略源码

get源码

重点是读取Segment的时候使用了UNSAFE.getObjectVolatile使用volatile的加载语义从对象的指定偏移量处获取变量的引用。

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key.hashCode());
    // 利用位操作替换普通数学运算
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 以 Segment 为单位,进行定位
    // 利用 Unsafe 直接进行 volatile access
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
       // 省略
      }
    return null;
}

put源码

通过二次哈希避免哈希冲突,然后以Unsafe调用方法,直接获取相应的Segment,然后进行线程安全的put操作。其中getObject的用法:获得给定对象的指定地址偏移量的值,与此类似操作还有:getInt,getDouble,getLong,getChar等。

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 二次哈希,以保证数据的分散性,避免哈希冲突
    int hash = hash(key.hashCode());
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
}

其最后一句的核心逻辑:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // scanAndLockForPut 会去查找是否有 key 相同 Node
    // 无论如何,确保获取锁
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                // 更新已有 value...
            }
            else {
                // 放置 HashEntry 到特定位置,如果超过阈值,进行 rehash
                // ...
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}
  • ConcurrentHashMap会获取再入锁,以保持数据一致性。Segment本身扩展于ReentrantLock,所以在并发修改期间,相应Segment是被锁定的;
  • 扩容的时候,单独对Segment进行扩容。

scanAndLockForPut

while循环每执行一次,都会尝试获取锁,成功则会返回。retries 初始值设为-1是为了遍历当前hash对应桶的链表,找到则停止遍历,未找到则会预创建一个节点;同时,如果头节点发生变化,则会重新进行遍历,直到自旋次数大于MAX_SCAN_RETRIES,使用lock加锁,获取锁失败则会进入等待队列。

为什么scanAndLockForPut中要遍历一次链表?

前面已经提过scanAndLockForPut使用自旋次数受限制的自旋锁进行优化加锁的方式,此外,遍历一次链表也是一种优化方法,主要是尽可能使当前链表中的节点进入CPU高速缓存,提高缓存命中率,以便获取锁定后的遍历速度更快。实际上加锁后并没有使用已经找到的节点,因为它们必须在锁定下重新获取,以确保更新的顺序一致性,但是遍历一次后通常可以更快地重新定位。这是一种预热优化的方式,scanAndLock中也使用了该优化方式。

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);//根据key的hash值查找头节点
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
    while (!tryLock()) {//尝试获取锁,成功则直接返回,失败则开始自旋
        HashEntry<K,V> f; // 用于后续重新检查头节点
        if (retries < 0) {
            if (e == null) {//结束遍历节点
                if (node == null) // 创建节点
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))//找到节点,结束遍历
                retries = 0;
            else
                e = e.next;
        }
        else if (++retries > MAX_SCAN_RETRIES) {//达到最大尝试次数
            lock();//进入加锁方法,失败则会进入排队,阻塞当前线程
            break;
        }
        else if ((retries & 1) == 0 &&
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // 头节点变化,需要重新遍历,说明有新节点加入或被移除
            retries = -1;
        }
    }
    return node;
}

size

如果不进行同步,简单的计算Segment的总值,可能会因为并发put而导致结果不正确,但是直接锁定所有Segment进行计算的话,又会十分昂贵,所以ConcurrentHashMap实现是通过重试机制(RETRIES_BEFORE_LOCK,指定重试次数2)来试图获得可靠性,如果没有监控到发生变化(通过对比Segment.modCount)就直接返回,否则获取锁进行操作。

Java8后的变化

  • 总体结构上,与HashMap相似,同样是大的桶(bucket)数组,内部也是一个个链表结构(bin),同步的粒度要更细致一些;
  • 内部仍然有Segment定义,但是仅仅只是为了保证序列化时的兼容性,不再有任何结构上的用处。
  • 因为不用Segment,初始化操作大大简化,修改为lazy-load形式,避免初始开销;
  • 数据存储利用volatile来保证可见性;
  • 使用CAS等操作,在特定场景进行无所并发操作;
  • 使用Unsafe、LongAdder之类底层手段,进行极端情况的优化。
  • 锁加在链表头上,这个是思路上的突破。

数据存储

可以发现key是final的,因为在生命周期中一个条目的key不可能改变。val声明为volatile,保证可见性。

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    // … 
}

put

注意,在同步逻辑上,使用的是synchronized,而不是通常建议的ReentrantLock之类,因为现代JDK中,synchronized已经被不断优化,可以不用过分担心性能差异,相比于ReentrantLock也可以减少内存消耗。(https://my.oschina.net/pingpangkuangmo/blog/817973)

ReentrantLock和synchronized区别:https://juejin.im/post/5bc87409f265da0ad701da35

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh; K fk; V fv;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 如果 bin 是空的,不需要锁定,直接利用 CAS 去进行无锁线程安全操作
            // 如果CAS失败,则有其他节点已经插入,继续下一步
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                break; 
        }
        // 如果bin不为空,且bin的hash值为-1,
        // 则有其他线程在执行扩容操作,帮助他们一起扩容,提高性能
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else if (onlyIfAbsent // 不加锁,进行检查
                 && fh == hash
                 && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                 && (fv = f.val) != null)
            return fv;
        else {//如果没有在扩容
            V oldVal = null;
            synchronized (f) {//注意!!!这里加锁是在链表的根节点
                   // 细粒度的同步修改操作... 
                }
            }
            // Bin 超过阈值,进行树化
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

与此同时,更多细节使用Unsafe进行优化,比如tabAt使用了getObjectAcquire,避免了间接调用的开销。

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE);
}

HashEntry<K,V>[] tab = table有什么好处?

从Segment源码可知,table被声明为volatile,为了保证内存可见性,table上的修改都必须立即更新到主存,volatile写实际是具有一定开销的。由于put中的代码都在加锁区执行,锁既能保证可见性,也能保证原子性,因此,不需要针对table进行volatile写,将table引用赋值给局部变量以实现编译、运行时的优化。

initTable

初始化操作都在initTable。这是一个典型的CAS使用场景,利用volatile的sizeCtl作为互斥手段:如果发现竞争性的初始化,则spin在那里等待条件恢复。否则利用CAS设置排他标志,如果成功了则进行初始化。否则重试。

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // 如果发现冲突,进行 spin 等待
        if ((sc = sizeCtl) < 0)
            Thread.yield(); 
        // CAS 成功返回 true,则进入真正的初始化逻辑
        else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

补充知识:
sizeCtl:默认为0,用来控制table的初始化和扩容操作.它的数值有以下含义

  • -1 :代表table正在初始化,其他线程应该交出CPU时间片,退出
  • -N: 表示正有N-1个线程执行扩容操作
  • 0: 如果table已经初始化,代表table容量,默认为table大小的0.75,如果还未初始化,代表需要初始化的大小

size

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

其中CounterCell的部分:

static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}

基于LongAdder,是一种jvm利用空间换取更高效率的方法。

参考:https://zhuanlan.zhihu.com/p/27149377
https://my.oschina.net/7001/blog/896587

comments powered by Disqus