Java并发类库提供的线程池有哪几种? 分别有什么特点? - 《Java核心》笔记

通常都是使用Executors提供的通用线程池创建方法去创建不同配置的线程池,主要区别在于不同的ExecutorService类型或者不同的初始参数,主要分为五类:

  • newCachedThreadPool() 用于处理大量短时间工作任务的线程池。会试图缓存线程并重用,当无缓存线程可用时,会创建新的工作线程。如果线程闲置时间超过60秒,则被终止并移出缓存。长时间闲置时,这种线程池不会消耗什么资源。其内部使用SynchronousQueue作为工作队列。
  • newFixedThreadpool(int nThreads) 重用固定数目的线程,背后使用的是无界的工作队列。如果任务数量超过了nThread,将在工作队列中等待空闲线程出现。如果有工作线程退出,将会有新的工作线程被创建,以补足指定数目nThreads。
  • newSingleThreadExecutor() 工作线程数目为1,有一个无界的工作队列。保证了所有任务都是按顺序执行,最多只会有一个任务处于活动状态。
  • newSingleThreadScheduledExecutor() 、newScheduledThreadPool(int corePoolSize)定时或周期性的工作调度,区别在于单一工作线程还是多个工作线程。
  • newWorkStealingPoll(int parallelism)内部用ForkJoinPool,利用Work-Stealing算法,并行地处理任务,不保证处理顺序。

Executor

Executor的组成

fc70c37867c7fbfb672fa3e37fe14b5b

  • Executor将任务提交和任务执行细节解耦:void execute(Runnable command);
  • ExecutorService不仅提供了service的管理功能,比如shutdown等,也提供了更加全面的提交任务机制,如返回Future而不是void的submit方法:<T> Future<T> submit(Callable<T> task);

18b64aee22c67f488171a73133e4d465

线程池:

public ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory,
                        RejectedExecutionHandler handler)
  • 工作队列负责存储用户提交的任务。比如private final BlockingQueue<Runnable> workQueue;
  • 内部的“线程池”是指保持工作线程的集合。比如private final HashSet<Worker> workers = new HashSet<>();。线程池的工作线程被抽象为静态内部类Worker,基于AQS实现。
  • ThreadFactory提供了创建线程逻辑;
  • 如果任务提交时被拒绝,比如线程池处于SHUTDOWN状态,需要为其提供处理逻辑。可以使用ThreadPoolExecutor.AbortPolicy等默认实现,也可以按照实际需求自定义。
  • corePoolSize,核心线程数,可以理解为长期驻留的线程数目。newFixedThreadPool会将其设置为nThreads,newCachedThreadPool是0。
  • maximunPoolSize,线程不够时能够创建的最大线程数。newFixedThreadPool是nThreads,而newCachedThreadPool是Integer.MAX_VALUE。
  • keepAliveTime、TimeUnit指定额外的线程能够闲置多久。
  • workQueue,工作队列,必须是BlockingQueue。

线程池的状态表征

ctl变量被赋予了双重角色,高位代表了线程池状态,低位代表了工作线程数目。这是一个典型的高效优化。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 真正决定了工作线程数的理论上限 
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// 线程池状态,存储在数字的高位
private static final int RUNNING = -1 << COUNT_BITS;
…
// Packing and unpacking ctl
private static int runStateOf(int c)  { return c & ~COUNT_MASK; }
private static int workerCountOf(int c)  { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程的状态流转:
c50ce5f2ff4ae723c6267185699ccda1

注意,实际java代码中不存在idle状态。

execute方法

public void execute(Runnable command) {
…
  int c = ctl.get();
// 检查工作线程数目,低于 corePoolSize 则添加 Worker
  if (workerCountOf(c) < corePoolSize) {
      if (addWorker(command, true))
          return;
      c = ctl.get();
  }
// isRunning 就是检查线程池是否被 shutdown
// 工作队列可能是有界的,offer 是比较友好的入队方式
  if (isRunning(c) && workQueue.offer(command)) {
      int recheck = ctl.get();
// 再次进行防御性检查
      if (! isRunning(recheck) && remove(command))
          reject(command);
      else if (workerCountOf(recheck) == 0)
          addWorker(null, false);
  }
// 尝试添加一个 worker,如果失败意味着已经饱和或者被 shutdown 了
  else if (!addWorker(command, false))
      reject(command);
}

使用线程池要注意的点

  1. 避免任务堆积。newFixedThreadPool工作队列是无界的,如果工作线程数目太少,跟不上入队的速度,就可能造成工作队列积压,占用大量系统内存。诊断时可以使用jmap等工具,查看是否有大量任务对象入队。
  2. 避免过度扩展线程。我们在处理大量短时认为时,使用缓存的线程池,很难明确设定一个线程数目;
  3. 线程数目不断增长(使用jstack等工具检查),可能是因为线程泄露。
  4. 避免死锁。
  5. 尽量避免使用ThreadLocal。

线程池大小的选择

  • 计算密集型,建议按照CPU核数N或者N+1。
  • I/O密集型,建议Brain Goetz推荐的:线程数 = CPU 核数 × 目标 CPU 利用率 ×(1 + 平均等待时间 / 平均工作时间)
  • 其他系统资源考虑,比如说可用端口范围。
comments powered by Disqus