前言
在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。在 Java 中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁,这就是 “池化资源” 技术产生的原因。线程池顾名思义就是事先创建若干个可执行的线程放入一个池(容器)中,需要的时候从池中获取线程,不用自行创建;使用完毕不需要销毁线程而是放回池中,从而减少创建和销毁线程对象的开销。但是要做到合理的利用线程池,必须对其原理了如指掌。
线程池的好处以及使用场景
使用线程池的好处?
合理利用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
什么时候使用线程池?
假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。如果:T1+T3 远大于 T2,则可以采用线程池,以提高服务器性能。线程池技术正是关注如何缩短或调整 T1,T3 时间的技术,从而提高服务器程序性能的。它把 T1,T3 分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有 T1,T3 的开销了。
即,第一:单个任务处理时间比较短;第二:需要处理的任务数量很大。
Java 中的 ThreadPoolExecutor 类
线程池的创建可以通过创建 ThreadPoolExecutor 对象或者调用 Executors 的工厂方法来创建线程池。ThreadPoolExecutor 是 Executors 类的底层实现。但是在阿里巴巴的 Java 开发手册中提到:
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
线程池的创建
在 ThreadPoolExecutor 类中提供了四个构造方法:
1 | public class ThreadPoolExecutor extends AbstractExecutorService { |
从上面的代码可以得知,ThreadPoolExecutor 继承了 AbstractExecutorService 类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。
下面解释下一下构造器中各个参数的含义:
- corePoolSize:核心线程池大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了 prestartAllCoreThreads()或者 prestartCoreThread()方法,从这 2 个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建 corePoolSize 个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为 0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到缓存队列当中。当有新任务在 execute()方法提交时,会执行以下判断:
- 如果运行的线程少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;
- 如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当 workQueue 满时才创建新的线程去处理任务;
- 如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建的线程池的大小是固定的,这时如果有新任务提交,若 workQueue 未满,则将请求放入 workQueue 中,等待有空闲的线程去 workQueue 中取任务并处理;
- 如果运行的线程数量大于等于 maximumPoolSize,这时如果 workQueue 已经满了,则通过 handler 所指定的策略来处理任务;
- 所以,任务提交时,判断的顺序为 corePoolSize –> workQueue –> maximumPoolSize。
- maximumPoolSize:最大线程池大小,线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
- keepAliveTime:线程池中超过 corePoolSize 数目的空闲线程最大存活时间;默认情况下,只有当线程池中的线程数大于 corePoolSize 时,keepAliveTime 才会起作用,直到线程池中的线程数不大于 corePoolSize,即当线程池中的线程数大于 corePoolSize 时,如果一个线程空闲的时间达到 keepAliveTime,则会终止,直到线程池中的线程数不超过 corePoolSize。但是如果调用了 allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于 corePoolSize 时,keepAliveTime 参数也会起作用,直到线程池中的线程数为 0。
- unit:参数 keepAliveTime 的时间单位,有 7 种取值,在 TimeUnit 类中有 7 种静态属性。
- workQueue:阻塞任务队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响。保存等待执行的任务的阻塞队列,当提交一个新的任务到线程池以后,线程池会根据当前线程池中正在运行着的线程的数量来决定对该任务的处理方式,主要有以下几种处理方式:
- 直接切换:这种方式常用的队列是 SynchronousQueue。
- 使用无界队列:一般使用基于链表的阻塞队列 LinkedBlockingQueue。如果使用这种方式,那么线程池中能够创建的最大线程数就是 corePoolSize,而 maximumPoolSize 就不会起作用了。当线程池中所有的核心线程都是 RUNNING 状态时,这时一个新的任务提交就会放入等待队列中。
- 使用有界队列:一般使用 ArrayBlockingQueue。使用该方式可以将线程池的最大线程数量限制为 maximumPoolSize,这样能够降低资源的消耗,但同时这种方式也使得线程池对线程的调度变得更困难,因为线程池和队列的容量都是有限的值,所以要想使线程池处理任务的吞吐率达到一个相对合理的范围,又想使线程调度相对简单,并且还要尽可能的降低线程池对资源的消耗,就需要合理的设置这两个数量:
- 如果要想降低系统资源的消耗(包括 CPU 的使用率,操作系统资源的消耗,上下文环境切换的开销等),可以设置较大的队列容量和较小的线程池容量,但这样也会降低线程处理任务的吞吐量。
- 如果提交的任务经常发生阻塞,那么可以考虑通过调用 setMaximumPoolSize() 方法来重新设定线程池的容量。
- 如果队列的容量设置的较小,通常需要将线程池的容量设置大一点,这样 CPU 的使用率会相对的高一些。但如果线程池的容量设置的过大,则在提交的任务数量太多的情况下,并发量会增加,那么线程之间的调度就是一个要考虑的问题,因为这样反而有可能降低处理任务的吞吐量。
- threadFactory:它是 ThreadFactory 类型的变量,用来创建新线程。默认使用 Executors.defaultThreadFactory()来创建线程。使用默认的 ThreadFactory 来创建线程时,会使新创建的线程具有相同的 NORM_PRIORITY 优先级并且是非守护线程,同时也设置了线程的名称。
- handler:饱和策略,当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是 AbortPolicy,表示无法处理新任务时抛出异常。以下是 JDK 1.5 提供的四种策略:
- AbortPolicy:直接抛出异常,这是默认策略;
- CallerRunsPolicy:用调用者所在的线程来执行任务;显然这样不会真的丢弃任务,但是,调用者线程性能可能急剧下降;
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,也就是丢弃一个即将被执行的任务,并尝试再次提交当前任务,重复此过程;
- DiscardPolicy:直接丢弃任务;
- 当然也可以根据应用场景需要来实现 RejectedExecutionHandler 接口自定义策略,如记录日志或持久化不能处理的任务;
向线程池提交任务
提交任务有 execute()和 submit()两个方法,下面看看他俩的区别:
1、接收参数不同:execute()的参数是 Runnable,submit()参数可以是 Runnable,也可以是 Callable。
2、返回值不同:execute()没有返回值,submit()有返回值 Future。通过 Future 可以获取各个线程的完成情况,是否有异常,还能试图取消任务的执行。
1 | public void execute(Runnable command) { |
- 如果运行的线程小于 corePoolSize,则尝试使用用户定义的 Runnalbe 对象创建一个新的线程。调用 addWorker()函数会原子性的检查 runState 和 workCount,通过返回 false 来防止在不应该添加线程时添加了线程
- 如果一个任务能够成功入队列,在添加一个线程时仍需要进行双重检查(可能因为在前一次检查后该线程死亡了),或者当进入到此方法时,线程池已经 shutdown 了,所以需要再次检查状态。若线程此时的状态不是 RUNNING,则需要回滚入队列操作;或者当线程池没有工作线程时,需要创建一个新的工作线程。
- 如果无法入队列,那么需要增加一个新工作线程,如果此操作失败,那么就意味着线程池已经 SHUTDOWN 或者已经饱和了,所以拒绝任务
线程池的关闭
shutdown()
shutdown()方法要将线程池切换到 SHUTDOWN 状态,并调用 interruptIdleWorkers()方法请求中断所有空闲的 worker,最后调用 tryTerminate()尝试结束线程池。
1 | public void shutdown() { |
1、停止接收新的 submit 的任务;
2、已经提交的任务(包括正在跑的和队列中等待的),会继续执行完成;
3、等到第 2 步完成后,才真正停止;
shutdownNow()
shutdown() 方法要将线程池切换到 STOP 状态,并调用 interruptIdleWorkers() 方法请求中断所有工作线程,无论是否是空闲的,然后取出阻塞队列中没有被执行的任务并返回,最后调用 tryTerminate() 尝试结束线程池。
1 | public List<Runnable> shutdownNow() { |
1、跟 shutdown()一样,先停止接收新 submit 的任务;
2、忽略队列里等待的任务;
3、尝试将正在执行的任务 interrupt 中断;
4、返回未执行的任务列表;
说明:它试图终止线程的方法是通过调用 Thread.interrupt()方法来实现的,这种方法的作用有限,如果线程中没有 sleep、wait、Condition、定时锁等应用,interrupt()方法是无法中断当前的线程的。所以,shutdownNow()并不代表线程池就一定立即就能退出,它也可能必须要等待所有正在执行的任务都执行完成了才能退出。但是大多数时候是能立即退出的。
深入剖析线程池实现原理
线程池状态
1 | private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); |
ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量(workerCount),这里可以看到,使用了 Integer 类型来保存,高 3 位保存 runState,低 29 位保存 workerCount。COUNT_BITS 就是 29,CAPACITY 就是 1 左移 29 位减 1(29 个 1),这个常量表示 workerCount 的上限值,大约是 5 亿。
ctl 相关方法:
1 | // 获取运行状态 |
下面再介绍下线程池的运行状态。 线程池一共有五种状态,分别是:
- RUNNING(运行状态):能接受新提交的任务,并且也能处理阻塞队列中已保存的任务;
- SHUTDOWN(关闭状态):不能接受新提交的任务,但却可以处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize()方法在执行过程中也会调用 shutdown()方法进入该状态);
- STOP(停止状态):不能接受新提交的任务,也不能处理阻塞队列中已保存的任务,并且会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow()方法会使线程池进入到该状态;
- TIDYING(整理状态):如果所有的任务都已终止了,workerCount(有效线程数)为 0,线程池进入该状态后会调用 terminated()方法进入 TERMINATED 状态;
- TERMINATED(终止状态):在 terminated()方法执行完后进入该状态,默认 terminated()方法中什么也没有做;
源码分析
addWorker
addWorker 方法的主要工作是在线程池中创建一个新的线程并执行,firstTask 参数用于指定新增的线程执行的第一个任务,core 参数为 true 表示在新增线程时会判断当前活动线程数是否少于 corePoolSize,false 表示新增线程前需要判断当前活动线程数是否少于 maximumPoolSize。
1 | private boolean addWorker(Runnable firstTask, boolean core) { |
runWorker
线程池创建线程时,会将线程封装成工作线程 Worker,ThreadPool 维护的其实就是一组 Worker 对象,Worker 在执行完任务后,还会无限循环获取工作队列里的任务来执行。我们可以从 Worker 的 runWorker 方法里看到:
1 | final void runWorker(Worker w) { |
总结一下 runWorker 方法的执行过程:
- while 循环不断地通过 getTask()方法获取任务;
- getTask()方法从阻塞队列中取任务;
- 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
- 调用 task.run()执行任务;
- 如果 task 为 null 则跳出循环,执行 processWorkerExit()方法;
- runWorker 方法执行完毕,也代表着 Worker 中的 run 方法执行完毕,销毁线程。
getTask
getTask 方法的主要工作是从阻塞队列中获取任务。
1 | private Runnable getTask() { |
processWorkerExit
getTask 方法返回 null 时,在 runWorker 方法中会跳出 while 循环,然后会执行 processWorkerExit 方法。
1 | private void processWorkerExit(Worker w, boolean completedAbruptly) { |
至此,processWorkerExit 执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从 execute 方法开始,Worker 使用 ThreadFactory 创建新的工作线程,runWorker 通过 getTask 获取任务,然后执行任务,如果 getTask 返回 null,进入 processWorkerExit 方法,整个线程结束。
线程池的监控
通过线程池提供的参数进行监控。
- getTaskCount:线程池已经执行的和未执行的任务总数;
- getCompletedTaskCount:线程池已完成的任务数量,该值小于等于 taskCount;
- getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了 maximumPoolSize;
- getPoolSize:线程池当前的线程数量;
- getActiveCount:当前线程池中正在执行任务的线程数量。
通过这些方法,可以对线程池进行监控,在 ThreadPoolExecutor 类中提供了几个空方法,如 beforeExecute(线程执行之前调用)方法,afterExecute(线程执行之后调用)方法和 terminated(线程池退出时候调用)方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自 ThreadPoolExecutor 来进行扩展。
线程池容量的动态调整
ThreadPoolExecutor 提供了动态调整线程池容量大小的方法:setCorePoolSize()和 setMaximumPoolSize()。
- setCorePoolSize:设置核心池大小
- setMaximumPoolSize:设置线程池最大能创建的线程数目大小
当上述参数从小变大时,ThreadPoolExecutor 进行线程赋值,还可能立即创建新的线程来执行任务。
如何合理配置线程池的大小
一般需要根据任务的类型来配置线程池大小:
任务性质不同的任务可以用不同规模的线程池分开处理。CPU 密集型任务配置尽可能少的线程数量,如配置 Ncpu+1 个线程的线程池。IO 密集型任务则由于需要等待 IO 操作,线程并不是一直在执行任务,则配置尽可能多的线程,如 2*Ncpu。混合型的任务,如果可以拆分,则将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的 CPU 个数。
《服务器性能 IO 优化》中发现一个估算公式:
最佳线程数目 =((线程等待时间 + 线程 CPU 时间)/ 线程 CPU 时间)*CPU 数目
比如平均每个线程 CPU 运行时间为 0.5s,而线程等待时间(非 CPU 运行时间,比如 IO)为 1.5s,CPU 核心数为 8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)*8=32。这个公式进一步转化为:
最佳线程数目 =(线程等待时间与线程 CPU 时间之比 + 1)*CPU 数目
可以得出一个结论:线程等待时间所占比例越高,需要越多线程。线程 CPU 时间所占比例越高,需要越少线程。当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。
参考博文
[1]. 深入理解 Java 线程池:ThreadPoolExecutor
[2]. java8 线程池
[3]. Java 并发编程:线程池的使用
[4]. 如何合理地估算线程池大小?
Java 并发编程之美系列
- Java 并发编程之美(一):并发队列 Queue 原理剖析
- Java 并发编程之美(二):线程池 ThreadPoolExecutor 原理探究
- Java 并发编程之美(三):异步执行框架 Eexecutor
- Java 并发编程之美(四):深入剖析 ThreadLocal
- Java 并发编程之美(五):揭开 InheritableThreadLocal 的面纱
- Java 并发编程之美(六):J.U.C 之线程同步辅助工具类
- Java 并发编程之美(七):透彻理解 Java 并发编程
- Java 并发编程之美(八):循序渐进学习 Java 锁机制