JDK并发集合9:其他(同步工具)

CyclicBarrier

CyclicBarrier是一个类似于CountDownLatch功能的类,其用于“循环栅栏”,它会阻塞一些线程直到所有线程同时到达了await再继续往下执行。和CountDownLatch区别是:

  1. 不用countdown
  2. 可复用

属性

其内部实现是使用了ReentrantLock和条件锁:

  • 未到齐,trip.await()
  • 到齐了,trip.signalAll()
/** The lock for guarding barrier entry */
// 重入锁
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
// 条件锁,名称为trip,绊倒的意思,可能是指线程来了先绊倒,等达到一定数量了再唤醒
private final Condition trip = lock.newCondition();//线程之间相互唤醒
/** The number of parties */
// 需要等待的线程数量
private final int parties;//总线程数

await方法

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
// 加锁
    lock.lock();
    try {
// 当前代
        final Generation g = generation;
// 检查
        if (g.broken)
            throw new BrokenBarrierException();
// 中断检查
        if (Thread.interrupted()) {//响应中断
            breakBarrier();//唤醒所有线程
            throw new InterruptedException();
        }
// count的值减1
        int index = --count;//每个线程调用一次await(),count--
// 如果数量减到0了,走这段逻辑(最后一个线程走这里)
        if (index == 0) {  // tripped 当count减到0时,此线程唤醒其他所有线程
            boolean ranAction = false;
            try {
// 如果初始化的时候传了命令,这里执行
                final Runnable command = barrierCommand;
                if (command != null)//一起唤醒后做一个回调
                    command.run();
                ranAction = true;
// 调用下一代方法
                nextGeneration();//唤醒其他所有线程,同时把count值复原,用于下一次的CyclicBarrier(重复使用)
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

// 这个循环只有非最后一个线程可以走到
        // loop until tripped, broken, interrupted, or timed out
        for (;;) {//人还没到齐,阻塞自己
            try {//
                if (!timed)
// 调用condition的await()方法
                    trip.await();//await在阻塞自己的时候,会把锁释放掉
                else if (nanos > 0L)
// 超时等待方法
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }
// 检查
            if (g.broken)
                throw new BrokenBarrierException();
// 正常来说这里肯定不相等
// 因为上面打破栅栏的时候调用nextGeneration()方法时generation的引用已经变化了
            if (g != generation)//从阻塞中唤醒,返回
                return index;

            // 超时检查
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

private void nextGeneration() {
    // signal completion of last generation
// 调用condition的signalAll()将其队列中的等待者全部转移到AQS的队列中
    trip.signalAll();
    // set up next generation
// 重置count
    count = parties;
// 进入下一代
    generation = new Generation();
}
  1. 最后一个线程走上面的逻辑,当count减为0的时候,打破栅栏,它调用nextGeneration()方法通知条件队列中的等待线程转移到AQS的队列中等待被唤醒,并进入下一代。
  2. 非最后一个线程走下面的for循环逻辑,这些线程会阻塞在condition的await()方法处,它们会加入到条件队列中,等待被通知,当它们唤醒的时候已经更新换“代”了,这时候返回。

图解

其实是一个一个顺序被唤醒的。假如初始时 count=parties=3,当第一个线程到达栅栏处,count减1,然后把它加入到Condition的队列中,第二个线程到达栅栏处也是如此,第三个线程到达栅栏处,count减为0,调用Condition的signalAll()通知另外两个线程,然后把它们加入到AQS的队列中,等待当前线程运行完毕,调用lock.unlock()的时候依次从AQS的队列中唤醒一个线程继续运行,也就是说实际上三个线程先依次(排队)到达栅栏处,再依次往下运行。

CyclicBarrier

Phaser

Phaser适用于一个大任务分为多个阶段,每个阶段的任务可以多线程执行,但是必须上一个阶段的任务都完成了才可以执行下一个阶段的任务的场景。它的工作CyclicBarrier和CountDownLatch也可以完成,但是它们两者不能动态调整线程个数,而Phaser可以。

属性

  1. State:状态变量。高32位存储当前阶段phase,中间16位存储参与者的数量,低16位存储未完成参与者的数量。
    --2020-10-13---11.07.17-1

  2. eventQ、oddQ:为了减少并发冲突,这里定义了2个链表,也就是2个Treiber Stack。当phase为奇数轮的时候,阻塞线程放在oddQ里面;当phase为偶数轮的时候,阻塞线程放在evenQ里面。当最后一个参与者完成任务后唤醒队列中的参与者继续执行下一个阶段的任务,或者结束任务。

  3. 已完成的参与者存储的队列,当最后一个参与者完成任务后唤醒队列中的参与者继续执行下一个阶段的任务,或者结束任务。

等待其他线程完成

当前线程当前阶段执行完毕,等待其它线程完成当前阶段,Phaser使用arriveAndAwaitAdvance方法来处理。如果当前线程是该阶段最后一个到达的,则当前线程会执行onAdvance()方法,并唤醒其它线程进入下一个阶段。

public int arriveAndAwaitAdvance() {
    // Specialization of doArrive+awaitAdvance eliminating some reads/paths
    final Phaser root = this.root;
    for (;;) {
// state的值
        long s = (root == this) ? state : reconcileState();
        int phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            return phase;
// parties和unarrived的值
        int counts = (int)s;
// unarrived的值(state的低16位)
        int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
        if (unarrived <= 0)
            throw new IllegalStateException(badArrive(s));
// 修改state的值
        if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
                                      s -= ONE_ARRIVAL)) {
// 如果不是最后一个到达的,则调用internalAwaitAdvance()方法自旋或进入队列等待
            if (unarrived > 1)
// 这里是直接返回了,internalAwaitAdvance()方法的源码见register()方法解析
                return root.internalAwaitAdvance(phase, null);
// 到这里说明是最后一个到达的参与者
            if (root != this)
                return parent.arriveAndAwaitAdvance();
// n只保留了state中parties的部分,也就是中16位
            long n = s & PARTIES_MASK;  // base of next state
// parties的值,即下一次需要到达的参与者数量
            int nextUnarrived = (int)n >>> PARTIES_SHIFT;
// 执行onAdvance()方法,返回true表示下一阶段参与者数量为0了,也就是结束了
            if (onAdvance(phase, nextUnarrived))
                n |= TERMINATION_BIT;
            else if (nextUnarrived == 0)
                n |= EMPTY;
            else
// n 加上unarrived的值
                n |= nextUnarrived;
// 下一个阶段等待当前阶段加1
            int nextPhase = (phase + 1) & MAX_PHASE;
// n 加上下一阶段的值
            n |= (long)nextPhase << PHASE_SHIFT;
// 修改state的值为n
            if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
                return (int)(state >>> PHASE_SHIFT); // terminated
// 唤醒其它参与者并进入下一个阶段
            releaseWaiters(phase);
// 返回下一阶段的值
            return nextPhase;
        }
    }
}

private void releaseWaiters(int phase) {
    QNode q;   // first element of queue
    Thread t;  // its thread
    AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
    while ((q = head.get()) != null &&
           q.phase != (int)(root.state >>> PHASE_SHIFT)) {
        if (head.compareAndSet(q, q.next) &&
            (t = q.thread) != null) {
            q.thread = null;
            LockSupport.unpark(t);
        }
    }
}
  1. 修改state中unarrived部分的值减1;
  2. 如果不是最后一个到达的,则调用internalAwaitAdvance()方法自旋或排队等待;
  3. 如果是最后一个到达的,则调用onAdvance()方法,然后修改state的值为下一阶段对应的值,并唤醒其它等待的线程;
  4. 返回下一阶段的值;
comments powered by Disqus