newCachedThreadPool为什么使用SynchronousQueue

问题

Executors包装的newCachedThreadPool使用的是无容量的SynchronousQueue,为什么一定要用SynchronousQueue呢?

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

SynchronousQueue

核心方法

SynchronousQueue缓存未匹配(put匹配take,take匹配put)的元素,并阻塞相关的生产者、消费者线程,核心代码如下:

//TransferQueue 的 transfer方法。transfer谁在调用。每个节点的线程调用,共享同一个队列。
//transfer里面有加入队列,阻塞awaitFulfill,阻塞后唤醒,取节点内容,唤醒节点,取到节点返回,被唤醒返回 代码。
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
    /* Basic algorithm is to loop trying to take either of
     * two actions:
     *
     * 1. If queue apparently empty or holding same-mode nodes,
     *    try to add node to queue of waiters, wait to be
     *    fulfilled (or cancelled) and return matching item.
     *
     * 2. If queue apparently contains waiting items, and this
     *    call is of complementary mode, try to fulfill by CAS'ing
     *    item field of waiting node and dequeuing it, and then
     *    returning matching item.
     *
     * In each case, along the way, check for and try to help
     * advance head and tail on behalf of other stalled/slow
     * threads.
     *
     * The loop starts off with a null check guarding against
     * seeing uninitialized head or tail values. This never
     * happens in current SynchronousQueue, but could if
     * callers held non-volatile/final ref to the
     * transferer. The check is here anyway because it places
     * null checks at top of loop, which is usually faster
     * than having them implicitly interspersed.
     */

    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);//生产者isData=true,消费者isData=false

    for (;;) {
        //多线程操作queue的成员变量tail,head,是会跟着改变的。t,h就不会变。t,h还是之前的那个tail和head。
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        //队列为空或者当前线程和队列中元素为同一种模式,都是put或者get
        //1 加入队列排队阻塞(即可能是生产者队列也可能是消费者队列),然后等待唤醒。
        if (h == t || t.isData == isData) { // empty or same-mode//第一个节点或者进来的节点跟尾节点生产消费类型一样,就加入排队,否则就是来取节点的。
            QNode tn = t.next;
            if (t != tail)                  // inconsistent read//  其他线程改变了尾节点,再次来
                continue;
            if (tn != null) {               // lagging tail//其他线程为尾节点添加了next,但是还没有将tail指向新的节点,再次来
                advanceTail(t, tn);//帮助将tail指向新的节点
                continue;
            }
            if (timed && nanos <= 0)        // can't wait
                return null;
            if (s == null)
                s = new QNode(e, isData);//新建一个节点
            //加入尾部
            //加入队列,成功则T一定是是S的前一个节点
            // 尾节点的next从null变为新节点,每一处都有可能被停住。失败了说明有其他线程将尾节点的next从null变为其他新节点
            if (!t.casNext(null, s))        // failed to link in
                continue;

            //后移tail指针
            advanceTail(t, s);              // swing tail and wait// 改变尾节点,失败了,说明别的线程已经做了,自己什么都不用做。
            //进入阻塞状态
            //当前线程再这个函数的这里阻塞,唤醒時候,从这里执行,函數参数还是原来的。
            //2 加入队列后排队阻塞等待
            Object x = awaitFulfill(s, e, timed, nanos);//生产者或者消费者阻塞,等待唤醒。即可能是生产者被唤醒也可能是消费者被唤醒。
            if (x == s) {                   // wait was cancelled//线程中断超时返回的x是节点s,否则就是正常取出返回。
                clean(t, s);//节点的删除是节点上的线程自己来删除的,失效节点不影响正常的入队和出队,跳过即可。
                return null;
            }

            //不是中断的返回。x是item,只不过被改变了。
            //从阻塞中唤醒,确定已经处于队列中的第一个元素
            if (!s.isOffList()) {           // not already unlinked// 若s节点还没有从队列删除
                advanceHead(t, s);          // unlink if head// 改变头节点
                if (x != null)              // and forget fields// item != null,说明是消费者被生产者唤醒了,
                    s.item = s;//item变为自己
                s.waiter = null;//节点等待的线程变为null。
            }
            return (x != null) ? (E)x : e;//生产者被唤醒x=null,返回原来的e,消费者被唤醒x!=null,返回x。

            //当前线程可以和队列中的第一个元素进行配对
            //取节点,取到节点后,取唤醒别人。
        } else {                            // complementary-mode//如果队列里面之前是生产者,现在消费者过来了。
            //取队列中的第一个元素// 获取头节点的下一个节点,,每次都是从头结点下一个节点开始获取数据
            QNode m = h.next;               // node to fulfill
            //不一致读,重新执行for循环
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read// 其他线程改变了头尾节点,再次来

            Object x = m.item;
            //已经配对过
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled// m节点的item=节点自己,这个节点m的线程被中断了或者超时了,就跳过这个节点,不使用,
                    // 消费者过来,就把节点m的内容从节点内容变为null,生产者过来,就把节点内容从null变为生产者的内容。此时节点的内容不再等于节点的内容。
                !m.casItem(x, e)) {         // lost CAS 尝试配对
                advanceHead(h, m);          // dequeue and retry 已经配对过,直接出队列 // 将m设置为头结点,h出列,然后重试
                continue;
            }

            // 成功匹配了,m设置为头结点h出列,向前推进,移除匹配到的节点
            //配对成功,出队列
            advanceHead(h, m);              // successfully fulfilled
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}

offer方法

offer方法调用transfer(e, true, 0),所以ThreadPoolExecutor调用的时候会直接返回null。

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    return transferer.transfer(e, true, 0) != null;
}

take方法

public E take() throws InterruptedException {
// 直接调用传输器的transfer()方法
// 三个参数分别是:null,是否需要超时,超时的时间
// 第一个参数为null表示是消费者,要取元素
    E e = transferer.transfer(null, false, 0);
// 如果取到了元素就返回
    if (e != null)
        return e;
// 否则让线程中断并抛出中断异常
    Thread.interrupted();
    throw new InterruptedException();
}

ThreadPoolExecutor

提交任务:

workQueue.offer(command)这里会返回null,所以会走下面的逻辑“入队失败,开新线程”。

// 提交任务,任务并非立即执行
public void  execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
// 控制变量(高3位存储状态,低29位存储工作线程的数量)
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        //当前线程数小于core,开新线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //当前线程数大于等于core,调用workQueue.offer入队
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
// 再次检查线程池状态,如果不是运行状态,就移除任务并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
// 容错检查工作线程数量是否为0,如果为0就创建一个
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //入队失败,开新线程
    else if (!addWorker(command, false))
        reject(command);//线程数大于maxPoolSize,调用拒绝策略
}

addWorker方法:

//新开一个线程。如果core为true,则用corePoolSize为上界。如果为false,则用maxPoolSize为上界
private boolean addWorker(Runnable firstTask, boolean core) {
// 判断有没有资格创建新的工作线程
// 主要是一些状态/数量的检查等等
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        //只要状态大于或者等于shutdown,说明线程池进入了关闭的进程
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))//大于上界,返回false
                return false;
            if (compareAndIncrementWorkerCount(c))//workCount成功加一,跳出整个for循环
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)//runState在这个过程中发生了变化,重新开始for循环
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

// 如果上面的条件满足,则会把工作线程数量加1,然后执行下面创建线程的动作
    //workCount成功+1,开始添加线程操作
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
// 创建工作线程
        w = new Worker(firstTask);//创建一个线程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
// 再次检查线程池的状态
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
// 添加到工作线程队列
                    workers.add(w);//把线程加入线程集合
// 还在池子中的线程数量(只能在mainLock中使用)
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
// 标记线程添加成功
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
// 线程添加成功之后启动线程
                t.start();//成功加入,则启动该线程
                workerStarted = true;
            }
        }
    } finally {
// 线程启动失败,执行失败方法(线程数量减1,执行tryTerminate()方法等)
        if (! workerStarted)//加入失败,调用下面的函数
            addWorkerFailed(w);//函数内部把workCount减一
    }
    return workerStarted;
}

其中Worker是存储工作线程的HashSet:

//工作线程集合,Worker继承了AQS接口和Runnable接口,是具体处理任务的线程对象
//Worker实现AQS,并自己实现了简单不可重入独占锁,其中state=0表示当前锁未被获取状态,state=1表示锁被获取,
//state=-1表示Work创建时候的默认状态,创建时候设置state=-1是为了防止runWorker方法运行前被中断
private final HashSet<Worker> workers = new HashSet<Worker>();//线程集合

注意addWorker的时候w = new Worker(firstTask)这里是:把Worker自身作为Runnable传给线程。

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
// 使用线程工厂生成一个线程
// 注意,这里把Worker本身作为Runnable传给线程
    this.thread = getThreadFactory().newThread(this);
}

之后final Thread t = w.thread;取出线程,调用t.start()启动线程,这里实际调用的是Worker的run方法。

执行任务:run方法

public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
// 工作线程
    Thread wt = Thread.currentThread();
// 任务
   Runnable task = w.firstTask;
   w.firstTask = null;
// 强制释放锁(shutdown()里面有加锁)
// 这里相当于无视那边的中断标记
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
// 取任务,如果有第一个任务,这里先执行第一个任务
// 只要能取到任务,这就是个死循环
// 正常来说getTask()返回的任务是不可能为空的,因为前面execute()方法是有空判断的
// 那么,getTask()什么时候才会返回空任务呢?
        //不断从队列中取任务执行
        while (task != null || (task = getTask()) != null) {
            w.lock();//关键点,在任务执行之前要先加锁,此处就对应了前面讲shutdown()时的tryLock
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
// 检查线程池的状态
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();//拿到任务了,在执行之前重新检测线程池的状态,如果发现已经开始关闭,则自己给自己发中断信号
            try {
// 钩子方法,方便子类在任务执行前做一些处理
                beforeExecute(wt, task);//任务之前的钩子函数,目前是空实现
                Throwable thrown = null;
                try {
// 真正任务执行的地方
                   task.run();//执行任务代码
               } catch (RuntimeException x) {
                   thrown = x; throw x;
               } catch (Error x) {
                   thrown = x; throw x;
               } catch (Throwable x) {
                   thrown = x; throw new Error(x);
               } finally {
                   afterExecute(task, thrown);//任务之后的钩子函数,目前是空实现
               }
           } finally {
// task置为空,重新从队列中取
                task = null;
// 完成任务数加1
                w.completedTasks++;//成功完成,completedTasks加一
                w.unlock();
            }
        }
        completedAbruptly = false;//判断这个Worker是正常退出,还是受到中断退出,或者其他某种异常退出
    } finally {
// 到这里肯定是上面的while循环退出了
        processWorkerExit(w, completedAbruptly);//Worker退出
    }
}

其中getTask方法:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        //关键点:
        // 1. 如果rs >= STOP,即调用了shutdownNow(),此处会返回null
        // 2. 如果rs >= SHUTDOWN,即调用了shutdown(),并且队列为空,此处也会返回null
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;//此处返回null,调用这个的worker就会退出while循环,死亡
        }

        int wc = workerCountOf(c);

// 是否允许超时,有两种情况:
// 1. 是允许核心线程数超时,这种就是说所有的线程都可能超时
// 2. 是工作线程数大于了核心数量,这种肯定是允许超时的
// 注意,非核心线程是一定允许超时的,这里的超时其实是指取任务超时
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
// 真正取任务的地方
// 默认情况下,只有当工作线程数量大于核心线程数量时,才会调用poll()方法触发超时调用
            //关键点:
            // 1. 队列为空,就会阻塞此处的poll或者take,线程空闲。前置带超时,后置不带超时。
            // 2. 一旦收到中断信号,此处就会抛出中断异常
// 真正响应中断是在poll()方法或者take()方法中
            //这里取任务会根据工作线程的数量判断是使用BlockingQueue的poll(timeout, unit)方法还是take()方法。
            //poll(timeout, unit)方法会在超时时返回null,如果timeout<=0,队列为空时直接返回null。
            //take()方法会一直阻塞直到取到任务或抛出中断异常。
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

对于newCachedThreadPool的情况,超时时间是60s,则如果60s内没有其他线程把数据放到Synchronous里面,则会返回null,当前线程被kill掉。

即流程如下:

线程池调用execute—>创建Worker(设置属性thead、firstTask)—>worker.thread.start()—>实际上调用的是worker.run()—>线程池的runWorker(worker)—>worker.firstTask.run()(如果firstTask为null就从等待队列中拉取一个)。

总结

为什么用SynchronousQueue

newCachedThreadPool使用SynchronousQueue可以做到以下效果:当前所有线程都在运行的时候则创建新的线程,新的线程执行完当前任务则等到60s取任务,取不到则退出。

如果使用普通的ArrayBlockingQueue或者LinkedBlockingQueue,size设置为0则不能达到保留60s运行新任务的效果(size为0则永远取不到任务),如果size设置大于0则当前躺在queue里面的任务需要等待工作线程来取,达不到立即执行的效果。

应用场景

执行很多短小的任务的时候,并且需要极致的速度的时候。可以复用已经创建了的工作线程。

REFER

https://blog.csdn.net/qq_41634872/article/details/109393730
https://stackoverflow.com/questions/16142567/how-does-newcachedthreadpool-cache-thread

comments powered by Disqus