JDK并发集合13:其他队列(并发容器)

ConcurrentLinkedQueue

ConcurrentLinkedQueue没有实现BlockingQueue接口,所以不是阻塞队列,不能用于线程池中,但是它是线程安全的。它由一个无界的单向链表实现。

属性

head头指针,tail尾指针。head/tail的更新可能落后于节点的入队和出队,因为它不是直接对head/tail指针进行CAS操作的,而是对Node中的item进行操作。

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

内部节点:只有next,是单向链表。

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;
    ...//省略
}

初始化

初始化的时候,head和tail都指向null节点。没有容量,是无界的。

--2020-10-18---9.46.28

//在队列初始化时, head 节点存储的元素为空,tail 节点等于 head 节点。
public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}
public ConcurrentLinkedQueue(Collection<? extends E> c) {
// 遍历c,并把它元素全部添加到单链表中
    Node<E> h = null, t = null;
    for (E e : c) {
        checkNotNull(e);
        Node<E> newNode = new Node<E>(e);
        if (h == null)
            h = t = newNode;
        else {
            t.lazySetNext(newNode);
            t = newNode;
        }
    }
    if (h == null)
        h = t = new Node<E>(null);
    head = h;
    tail = t;
}

入队

每次在队尾增加2个节点后,才移动一次tail节点。

假设线程1入队item2:
--2020-10-18---9.48.45

线程1入队item2

初始的时候,队列中有1个节点item1,tail指向该节点,假设线程1要入队item2节点:

  1. 初始的时候p=tail,q=p.next=NULL
  2. 对p的next执行CAS操作,追加item2,成功之后,p=tail。所以if (p != t)casTail(t, newNode);这一段没有执行,直接返回。此时tail指针没有变化。

线程2入队item3

之后,假设线程2要入队item3节点,如下图所示
--2020-10-18---9.50.32

  1. 初始的时候p=tail,q=p.next;
  2. q !=NULL且p != q,走第三个分支,p后移一位,重新回到循环开头,q=p.next;
  3. 这时候q==NULL,对p的next执行CAS操作,入队item3节点。
  4. if (p != t)casTail(t, newNode);成立,执行casTail(t, newNode);,tail指向新节点item3,即后移2个位置,到达队列尾部。

总结

  1. 即使tail指针没有移动,只要对p的next指针成功进行CAS操作,就算成功入队列;
  2. 只有当p !=tail的时候,才会后移tail指针。也就是说,每连续追加2个节点,才后移1次tail指针。即使CAS失败也没关系,可以由下1个线程来移动tail指针。

上面步骤可以归结为:

  • p去找真正的尾结点,然后CAS next
  • 如果tail != p,则casTail到真正尾结点

代码

public boolean add(E e) {
    return offer(e);
}
//当一个线程入列一个数据时,会将该数据封装成一个 Node 节点,并先获取到队列的队尾节点,
// 当确定此时队尾节点的 next 值为 null 之后,再通过 CAS 将新队尾节点的 next 值设为新节点。
// 此时 p != t,也就是设置 next 值成功,然后再通过 CAS 将队尾节点设置为当前节点即可。
public boolean offer(E e) {
    checkNotNull(e);
    //创建入队节点
    final Node<E> newNode = new Node<E>(e);
//t,p为尾节点,默认相等,采用失败即重试的方式,直到入队成功
    for (Node<E> t = tail, p = t;;) {
        //获取队尾节点的下一个节点
        Node<E> q = p.next;
        //如果q为null,则代表p就是队尾节点
        if (q == null) {
            //将入列节点设置为当前队尾节点的next节点
            // p is last node
            if (p.casNext(null, newNode)) {//关键点:是对tail的next指针而不是对tail指针执行CAS操作
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                //判断tail节点和p节点距离达到两个节点
                //如果tail不是尾节点则将入队节点设置为tail。
                // 如果失败了,那么说明有其他线程已经把tail移动过
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.//每入两个节点,后移一次tail指针,失败也没有关系
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        // 如果p节点等于p的next节点,则说明p节点和q节点都为空,表示队列刚初始化,所以返回
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;   //已经到达队列尾部
        else
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;//定位队列真正的对尾节点   //后移p指针
    }
}

出队

出队的时候,把item置为null之后出队原来的item。

假设初始的时候head指向空节点,队列中有item1、item2、item3三个节点。
--2020-10-18---10.05.22

出队元素

  1. p=head,head的item为null,p节点的下一个节点(q=p.next)不为null,且p!=q,走最后一个分支,p后移一位,使得p==q;
  2. 重新回到循环开头,这时候p指向的元素不为null,则CAS置换为null。这时候if (p != h)该条件成立,执行updateHead(h, ((q = p.next) != null) ? q : p),让header跳过两个null节点,指向非null的header;

总结:

  1. 出队列的判断并非观察tail 指针的位置,而是依赖于head 指针后续的节点是否为NULL这一条件;
  2. 只要对节点的item 执行CAS操作,置为NULL成功,则出队列成功。即使head指针没有成功移动,也可以由下1个线程继续完成。
//首先获取 head 节点,并判断 item 是否为 null,如果为空,则表示已经有一个线程刚刚进行了出列操作,
// 然后更新 head 节点;如果不为空,则使用 CAS 操作将 head 节点设置为 null,
// CAS 就会成功地直接返回节点元素,否则还是更新 head 节点。
public E poll() {
    // 设置起始点
    restartFromHead:
    for (;;) {
        //p获取head节点
        for (Node<E> h = head, p = h, q;;) {
            //获取头节点元素
            E item = p.item;
//如果头节点元素不为null,通过cas设置p节点引用的元素为null
            if (item != null && p.casItem(item, null)) {//关键点:在出队列的时候,并没有移动head指针,而是把item置为了null
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time//每生产两个null节点,才把head指针后移2位
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            //如果p节点的下一个节点为null,则说明这个队列为空,更新head结点
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            //节点出队失败,重新跳到restartFromHead来进行出队
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

判空

因为head/tail指针有滞后性,所以需要从头遍历队列直到查找非null的节点来判空。

public boolean isEmpty() {
    return first() == null;
}

//寻找第一个不为NULL的节点
/**
 * 这个方法和poll/peek方法差不多,只不过返回的是Node而不是元素。
 *
 * 之所以peek方法没有复用first方法的原因有2点
 * 1. 会增加一次volatile读
 * 2. 有可能会因为和poll方法的竞争,导致出现非期望的结果。
 *    比如first返回的node非null,里面的item也不是null。
 *    但是等到poll方法返回从first方法拿到的node的item的时候,item已经被poll方法CAS为null了。
 *    那这个问题只能再peek中增加重试,这未免代价太高了。
 *
 * 这就是first和peek代码没有复用的原因。
 */
Node<E> first() {
    restartFromHead:
    for (;;) {//从head开始遍历,获取第一个不为null的节点
        for (Node<E> h = head, p = h, q;;) {
            //头节点是否有元素
            boolean hasItem = (p.item != null);
            //头节点有元素或当前链表只有一个节点
            if (hasItem || (q = p.next) == null) {//注意,这里是整个节点==null,而不是item==null。所以这里会一直循环直到找到item非null的节点。或者next就是null了
                updateHead(h, p);
                return hasItem ? p : null;//头节点有值返回节点,否则返回null
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

LinkedTransferQueue

LinkedTransferQueue是阻塞队列,它是SynchronousQueue、LinkedBlockingQueue和ConcurrentLinkedQueue三者的结合体,综合了这三者的方法。

--2020-10-19---8.56.30

LinkedTransferQueue实现了TransferQueue接口,这个接口继承于BlockingQueue,所以LinkedTransferQueue是带阻塞功能的。

存储结构

LinkedTransferQueue使用的数据结构是dual queue,翻译为双重队列。队列中的节点只有一种,要不全是数据节点,要不全是请求节点。

取元素的时候跟头结点比对,如果头结点是数据节点则CAS其item来匹配,如果是请求节点则入队尾。

放元素的时候跟头结点比对,如果头结点是请求节点则CAS item匹配,如果是数据节点则入队尾。

示例:
--2020-10-19---9.22.27

属性

头结点、尾结点。

放取元素的几种方式:

  • NOW:立即返回,用于非超时的poll()和tryTransfer()方法中
  • ASYNC:异步,不会阻塞,用于放元素时,因为内部使用无界单链表存储元素,不会阻塞放元素的过程
  • SYNC:同步,调用的时候如果没有匹配到会阻塞直到匹配到为止
  • TIMED:超时,用于有超时的poll()和tryTransfer()方法中
/** head of the queue; null until first enqueue */
transient volatile Node head;

/** tail of the queue; null until first append */
private transient volatile Node tail;

/*
 * Possible values for "how" argument in xfer method.
 */
private static final int NOW   = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC  = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer

节点类:

  • isData:是否是数据节点
  • 元素的值
  • 下一个节点
  • 当前阻塞的线程
static final class Node {
    final boolean isData;   // false if this is a request node
    volatile Object item;   // initially non-null if isData; CASed to match
    volatile Node next;
    volatile Thread waiter; // null until waiting

构造器

可以看出来没有容量限制。

/**
 * Creates an initially empty {@code LinkedTransferQueue}.
 */
public LinkedTransferQueue() {
}

/**
 * Creates a {@code LinkedTransferQueue}
 * initially containing the elements of the given collection,
 * added in traversal order of the collection's iterator.
 *
 * @param c the collection of elements to initially contain
 * @throws NullPointerException if the specified collection or any
 *         of its elements are null
 */
public LinkedTransferQueue(Collection<? extends E> c) {
    this();
    addAll(c);
}

入队

调用的xfer。

xfer的签名E xfer(E e, boolean haveData, int how, long nanos)

  • e:表示入队的元素
  • haveData:为true表示当前是插入数据
  • how:NOW, ASYNC, SYNC, or TIMED之一,表示放取元素的方式
  • nanos:超时时间
//阻塞模式
public void put(E e) {
    xfer(e, true, ASYNC, 0);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
    xfer(e, true, ASYNC, 0);
    return true;
}
public boolean offer(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}
public boolean add(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}

出队

出队也是使用的xfer。这里传入的how就有集中了:TIMED、NOW、SYNC。

public E take() throws InterruptedException {
    // 同步模式,会阻塞直到取到元素
    E e = xfer(null, false, SYNC, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E e = xfer(null, false, TIMED, unit.toNanos(timeout));
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();
}

public E poll() {
    return xfer(null, false, NOW, 0);
}

移交元素

这三个方法也是放元素的方法。

public boolean tryTransfer(E e) {
// 立即返回
    return xfer(e, true, NOW, 0) == null;
}

// 移交元素
public void transfer(E e) throws InterruptedException {
// 同步模式
    if (xfer(e, true, SYNC, 0) != null) {
        Thread.interrupted(); // failure possible only due to interrupt
        throw new InterruptedException();
    }
}

// 尝试移交元素(有超时时间)
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
// 有超时时间
    if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}

xfer

我们来看看所有函数都用到的xfer到底是怎么样的:

里面用到的函数:

  • tryAppend:从tail开始遍历,把节点放到链表尾部
  • awaitMatch:等待被匹配

大致逻辑:

  1. 从头开始遍历,如果元素与头结点不一致,则尝试CAS Item匹配
    • 成功的话后移head节点,唤醒等待的线程,并返回匹配到的元素;
    • 不成功则往后移动继续匹配,直到到达队尾都匹配不上,则跳到2
  2. 如果元素与头结点一致,则退出遍历的循环,尝试入队
    • 从tail开始遍历,把s放到链表尾端。入队过程中可能尾结点变了,则后移尾结点,直到CAS成功
  3. 入队失败则从1开始重试
  4. 入队成功,如果是异步节点,直接返回。如果是非异步节点,即同步或者有超时,则等待被匹配后返回
    • 不断自旋,过程中检查item是否改变了,改变了则是被匹配了,返回被匹配的元素
    • 如果item没改变,如果前面有节点且正在匹配中则自旋,随机让出CPU,自旋到一定次数还没被匹配则阻塞
private E xfer(E e, boolean haveData, int how, long nanos) {
// 不允许放入空元素
    if (haveData && (e == null))
        throw new NullPointerException();
    Node s = null;                        // the node to append, if needed
    // 外层循环,自旋,失败就重试
    retry:
    for (;;) {                            // restart on append race
        // 下面这个for循环用于控制匹配的过程
        // 同一时刻队列中只会存储一种类型的节点
        // 从头节点开始尝试匹配,如果头节点被其它线程先一步匹配了
        // 就再尝试其下一个,直到匹配到为止,或者到队列中没有元素为止
        for (Node h = head, p = h; p != null;) { // find & match first node
            // p节点的模式
            boolean isData = p.isData;
            // p节点的值
            Object item = p.item;
            // p没有被匹配到
            if (item != p && (item != null) == isData) { // unmatched
                // 如果两者模式一样,则不能匹配,跳出循环后尝试入队
                if (isData == haveData)   // can't match
                    break;
                // 如果两者模式不一样,则尝试匹配
                // 把p的值设置为e(如果是取元素则e是null,如果是放元素则e是元素值)
                if (p.casItem(item, e)) { // match
                    // 匹配成功
                    // for里面的逻辑比较复杂,用于控制多线程同时放取元素时出现竞争的情况的
                    // 处理head,后移
                    for (Node q = p; q != h;) {
                        // 进入到这里可能是头节点已经被匹配,然后p会变成h的下一个节点
                        Node n = q.next;  // update by 2 unless singleton
                        // 如果head还没变,就把它更新成新的节点
                        // 并把它删除(forgetNext()会把它的next设为自己,也就是从单链表中删除了)
                        // 这时为什么要把head设为n呢?因为到这里了,肯定head本身已经被匹配掉了
                        // 而上面的p.casItem()又成功了,说明p也被当前这个元素给匹配掉了
                        // 所以需要把它们俩都出队列,让其它线程可以从真正的头开始,不用重复检查了
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        // 如果新的头节点为空,或者其next为空,或者其next未匹配,就break
                        // 只有当新的头结点不为空,且其next不为空,且其next被匹配了,则重试
                        if ((h = head)   == null ||
                                (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    // 唤醒p中等待的线程
                    LockSupport.unpark(p.waiter);
                    // 并返回匹配到的元素
                    return LinkedTransferQueue.<E>cast(item);
                }
            }
            // p已经被匹配了或者尝试匹配的时候失败了
            // 也就是其它线程先一步匹配了p
            // 这时候又分两种情况,p的next还没来得及修改,p的next指向了自己
            // 如果p的next已经指向了自己,就重新取head重试,否则就取其next重试
            //注:如果一个节点的next指向了自己,则这个节点等待被删除
            Node n = p.next;
            p = (p != n) ? n : (h = head); // Use head if p offlist
        }

        // 到这里肯定是队列中存储的节点类型和自己一样
        // 或者队列中没有元素了
        // 就入队(不管放元素还是取元素都得入队)
        // 入队又分成四种情况:
        // NOW,立即返回,没有匹配到立即返回,不做入队操作
        // ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
        // SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
        // TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身

        // 如果不是立即返回
        if (how != NOW) {                 // No matches available
            // 新建s节点
            if (s == null)
                s = new Node(e, haveData);
            // 尝试入队
            Node pred = tryAppend(s, haveData);
            // 入队失败,重试
            if (pred == null)
                continue retry;           // lost race vs opposite mode
            // 如果不是异步(同步或者有超时)
            // 就等待被匹配
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

private Node tryAppend(Node s, boolean haveData) {
    // 从tail开始遍历,把s放到链表尾端
    for (Node t = tail, p = t;;) {        // move p to last node and append
        Node n, u;                        // temps for reads of next & tail
        // 如果首尾都是null,说明链表中还没有元素
        if (p == null && (p = head) == null) {
            // 就让首节点指向s
            // 注意,这里插入第一个元素的时候tail指针并没有指向s
            if (casHead(null, s))
                return s;                 // initialize
        }
        else if (p.cannotPrecede(haveData))
            // 如果p无法处理,则返回null
            // 这里无法处理的意思是,p和s节点的类型不一样,不允许s入队
            // 比如,其它线程先入队了一个数据节点,这时候要入队一个非数据节点,就不允许,
            // 队列中所有的元素都要保证是同一种类型的节点
            // 返回null后外面的方法会重新尝试匹配重新入队等
            return null;                  // lost race vs opposite mode
        else if ((n = p.next) != null)    // not last; keep traversing
            // 如果p的next不为空,说明不是最后一个节点
            // 则让p重新指向最后一个节点
            p = p != t && t != (u = tail) ? (t = u) : // stale tail
                    (p != n) ? n : null;      // restart if off list
        else if (!p.casNext(null, s))
            // 如果CAS更新s为p的next失败
            // 则说明有其它线程先一步更新到p的next了
            // 就让p指向p的next,重新尝试让s入队
            p = p.next;                   // re-read on CAS failure
        else {
            // 到这里说明s成功入队了
            // 如果p不等于t,就更新tail指针
            // 还记得上面插入第一个元素时tail指针并没有指向新元素吗?
            // 这里就是用来更新tail指针的
            if (p != t) {                 // update if slack now >= 2
                while ((tail != t || !casTail(t, s)) &&
                        (t = tail)   != null &&
                        (s = t.next) != null && // advance and retry
                        (s = s.next) != null && s != t);
            }
            // 返回p,即s的前一个元素
            return p;
        }
    }
}

private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
    // 如果是有超时的,计算其超时时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 当前线程
    Thread w = Thread.currentThread();
    // 自旋次数
    int spins = -1; // initialized after first item and cancel checks
    // 随机数,随机让一些自旋的线程让出CPU
    ThreadLocalRandom randomYields = null; // bound if needed

    for (;;) {
        Object item = s.item;
        // 如果s元素的值不等于e,说明它被匹配到了
        if (item != e) {                  // matched
            // assert item != s;
            // 把s的item更新为s本身
            // 并把s中的waiter置为空
            s.forgetContents();           // avoid garbage
            // 返回匹配到的元素
            return LinkedTransferQueue.<E>cast(item);
        }
        // 如果当前线程中断了,或者有超时的到期了
        // 就更新s的元素值指向s本身
        if ((w.isInterrupted() || (timed && nanos <= 0)) &&
                s.casItem(e, s)) {        // cancel
            // 尝试解除s与其前一个节点的关系
            // 也就是删除s节点
            unsplice(pred, s);
            // 返回元素的值本身,说明没匹配到
            return e;
        }

        // 如果自旋次数小于0,就计算自旋次数
        if (spins < 0) {                  // establish spins at/near front
            // spinsFor()计算自旋次数
            // 如果前面有节点未被匹配就返回0
            // 如果前面有节点且正在匹配中就返回一定的次数,等待
            if ((spins = spinsFor(pred, s.isData)) > 0)
                // 初始化随机数
                randomYields = ThreadLocalRandom.current();
        }
        else if (spins > 0) {             // spin
            // 还有自旋次数就减1
            --spins;
            // 并随机让出CPU
            if (randomYields.nextInt(CHAINED_SPINS) == 0)
                Thread.yield();           // occasionally yield
        }
        else if (s.waiter == null) {
            // 更新s的waiter为当前线程
            s.waiter = w;                 // request unpark then recheck
        }
        else if (timed) {
            // 如果有超时,计算超时时间,并阻塞一定时间
            nanos = deadline - System.nanoTime();
            if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);
        }
        else {
            // 不是超时的,直接阻塞,等待被唤醒
            // 唤醒后进入下一次循环,走第一个if的逻辑就返回匹配的元素了
            LockSupport.park(this);
        }
    }
}

浅谈Disruptor

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。目前已经很稳定的甚少更新,在Apache Storm、Camel、Log4j2等项目中都有应用到。

我们先来回顾一下之前所学的队列:

队列 有界性 数据结构
ArrayBlockingQueue bounded 加锁 arraylist
LinkedBlockingQueue optionally-bounded 加锁 linkedlist
ConcurrentLinkedQueue unbounded 无锁 linkedlist
LinkedTransferQueue unbounded 无锁 linkedlist
PriorityBlockingQueue unbounded 加锁 heap
DelayQueue unbounded 加锁 heap

实际应用中,通常使用的数组或链表,只有在需要有优先级的情况下才使用堆。我们还需要考虑两个条件:

  1. 需要是有界的,可以防止内存溢出;
  2. 为了减少垃圾回收,需要尽量使用arraylist或者heap

综上所述,我们只有ArrayBlockingQueue可以选。

ArrayBlockingQueue的问题

ArrayBlockingQueue在实际应用中,因为加锁和伪共享,会造成一定的性能问题:

加锁

Disruptor论文里面的数据,显示了加锁导致的性能问题:
这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。
机器环境:2.4G 6核
运算: 64位的计数器累加5亿次
实验数据:

Method Time (ms)
Single thread 300
Single thread with CAS 5,700
Single thread with lock 10,000
Single thread with volatile write 4,700
Two threads with CAS 30,000
Two threads with lock 224,000

笔者在自己的笔记本上做的实验如下:

代码:
单线程

public class DisruptorExperimentSingle {

    long cnt = 500_000_000;
    long num = 0;
    private static final long numOffset;
    private static final Unsafe unsafe;

    volatile long vNum = 0;

    static {
        try {
            unsafe = reflectGetUnsafe();
            numOffset = unsafe.objectFieldOffset(DisruptorExperimentSingle.class.getDeclaredField("num"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private static Unsafe reflectGetUnsafe() {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            return (Unsafe) field.get(null);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    void singleUnlock(){
        num = 0;
        long start = System.currentTimeMillis();
        for(long i = 0;i < cnt;++i){
            ++num;
        }
        assert num == cnt;
        System.out.println("singleUnlock consume " + String.valueOf(System.currentTimeMillis() - start));
    }

    void singleCAS(){
        num = 0;
        long start = System.currentTimeMillis();
        for(long i = 0;i < cnt;++i) {
            unsafe.getAndAddLong(this, numOffset, 1);
        }
        assert num == cnt;
        System.out.println("singleCAS consume " + String.valueOf(System.currentTimeMillis() - start));
    }

    void singleLock(){
        num = 0;
        long start = System.currentTimeMillis();
        Lock lock = new ReentrantLock();
        for(long i = 0;i < cnt;++i){
            lock.lock();
            ++num;
            lock.unlock();
        }
        assert num == cnt;
        System.out.println("singleLock consume " + String.valueOf(System.currentTimeMillis() - start));
    }

    void singleSynchronize(){
        num = 0;
        long start = System.currentTimeMillis();
        Lock lock = new ReentrantLock();
        for(long i = 0;i < cnt;++i){
            synchronized (this){
                ++num;
            }
        }
        assert num == cnt;
        System.out.println("singleSynchronize consume " + String.valueOf(System.currentTimeMillis() - start));
    }

    void singleVolatile(){
        vNum = 0;
        long start = System.currentTimeMillis();
        for(long i = 1;i <= cnt;++i){
            vNum = i;
        }
        assert vNum == cnt;
        System.out.println("singleVolatile consume " + String.valueOf(System.currentTimeMillis() - start));
    }

    public static void main(String[] args) {
        DisruptorExperimentSingle disruptorExperiment = new DisruptorExperimentSingle();
        disruptorExperiment.singleUnlock();
        disruptorExperiment.singleCAS();
        disruptorExperiment.singleLock();
        disruptorExperiment.singleSynchronize();
        disruptorExperiment.singleVolatile();
    }
}

多线程

public class DisruptorExperimentMulti implements Runnable{
    public static int NUM_THREADS = 4;

    long cnt = 500_000_000;
    long num = 0;
    private static final long numOffset;
    private static final Unsafe unsafe;

    private static int type;

    static {
        try {
            unsafe = reflectGetUnsafe();
            numOffset = unsafe.objectFieldOffset(DisruptorExperimentMulti.class.getDeclaredField("num"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private static Unsafe reflectGetUnsafe() {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            return (Unsafe) field.get(null);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    void multiCAS() throws InterruptedException {
        num = 0;
        long start = System.currentTimeMillis();
        Thread[] threads = new Thread[NUM_THREADS];
        for(int i = 0;i < NUM_THREADS;++i){
            threads[i] = new Thread(new DisruptorExperimentMulti());
        }
        type = 1;
        for (Thread t : threads) {
            t.start();
        }
        for (Thread t : threads) {
            t.join();
        }
        assert num == cnt;
        System.out.println("multiCAS consume " + String.valueOf(System.currentTimeMillis() - start));
    }

    void multiLock() throws InterruptedException {
        long start = System.currentTimeMillis();
        num = 0;
        Thread[] threads = new Thread[NUM_THREADS];
        for(int i = 0;i < NUM_THREADS;++i){
            threads[i] = new Thread(new DisruptorExperimentMulti());
        }
        type = 0;
        for (Thread t : threads) {
            t.start();
        }
        for (Thread t : threads) {
            t.join();
        }
        assert num == cnt;
        System.out.println("multiLock consume " + String.valueOf(System.currentTimeMillis() - start));
    }

    @Override
    public void run() {
        if(type == 0){//lock
            for(int i = 0;i < cnt/NUM_THREADS;++i) {
                synchronized (DisruptorExperimentMulti.class) {
                    ++num;
                }
            }
        } else {//cas
            for(int i = 0;i < cnt/NUM_THREADS;++i) {
                unsafe.getAndAddLong(this, numOffset, 1);
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        DisruptorExperimentMulti disruptorExperimentMulti = new DisruptorExperimentMulti();
        disruptorExperimentMulti.multiCAS();
        disruptorExperimentMulti.multiLock();
    }
}

实验结果:其中jvm对volatile long的读写是原子的,这里
指出来:

Writes and reads of volatile long and double values are always atomic.

时间(ms) 无锁 lock加锁 synchronize加锁 CAS volatile write
单线程 270 7059 8353 2363 2242
多线程 N/A N/A 28106 1447 N/A

可以看出来其性能:

  • 单线程:不加锁 > CAS > 加锁
  • 多线程:CAS 大约是20倍加锁性能

综上所述,加锁的性能是最差的,比CAS差了很多个数量级,以致于使用CAS来替换加锁所得到的收益非常大。

伪共享

伪共享的性能,在前面讨论过。
这里详细说一下ArrayBlockingQueue为什么有伪共享的问题。

ArrayBlockingQueue----

ArrayBlockingQueue有三个成员变量:

  • takeIndex:取数的下标
  • putIndex:放数的下标
  • count:队列中元素个数

其类型是三个int,他们很可能被放到同一个CacheLine里面去,所以会导致同时put和take的时候,会造成伪共享的问题。

Disruptor的设计方案

那么我们来看看Disruptor是怎么设计来规避上面这些问题的。

  • RingBuffer环形数组:
    • 数组对处理器的缓存友好
    • 相对于使用链表,数组可以避免频繁垃圾回收
  • 无锁设计
    • 每个生产者/消费者线程先申请一段空间作为自己可以操作的元素。申请到之后,在这片区域写入/读取元素

操作示例

单线程写

首先是申请可写的序号,申请到之后就可以在当前cursor到可写序号之内写入数据。

Disruptor1

多线程写

多线程写,需要给每个线程分配一段独享空间。写入后需要设置availableBuffer的相应位置为available。availableBuffer是一个与RingBuffer一样大小的的buffer,写入的时候需要设置相应的位置为已写入。如图,Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。

Disruptor2

多线程写的情况下读数据

首先申请读取到序号n,然后需要遍历availableBuffer中是否当前位置已经写入,才可以消费数据。
--2020-10-20---12.50.49

性能

性能测试报告:

  • Async Appender: ArrayBlockingQueue
  • Loggers all async: Disruptor

单线程情况下,loggers all async与Async Appender吞吐量相差不大,但是在64个线程的时候,loggers all async的吞吐量比Async Appender增加了12倍,是Sync模式的68倍。

Disruptor--

comments powered by Disqus