Java 并发编程之美(二):线程池 ThreadPoolExecutor 原理探究

前言

在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。在 Java 中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁,这就是 “池化资源” 技术产生的原因。线程池顾名思义就是事先创建若干个可执行的线程放入一个池(容器)中,需要的时候从池中获取线程,不用自行创建;使用完毕不需要销毁线程而是放回池中,从而减少创建和销毁线程对象的开销。但是要做到合理的利用线程池,必须对其原理了如指掌。

线程池的好处以及使用场景

使用线程池的好处?

合理利用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

什么时候使用线程池?

假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。如果:T1+T3 远大于 T2,则可以采用线程池,以提高服务器性能。线程池技术正是关注如何缩短或调整 T1,T3 时间的技术,从而提高服务器程序性能的。它把 T1,T3 分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有 T1,T3 的开销了。

即,第一:单个任务处理时间比较短;第二:需要处理的任务数量很大。

Java 中的 ThreadPoolExecutor 类

线程池的创建可以通过创建 ThreadPoolExecutor 对象或者调用 Executors 的工厂方法来创建线程池。ThreadPoolExecutor 是 Executors 类的底层实现。但是在阿里巴巴的 Java 开发手册中提到:

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

线程池的创建

在 ThreadPoolExecutor 类中提供了四个构造方法:

1
2
3
4
5
6
7
8
9
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue){}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory){}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler){}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler){}
}

从上面的代码可以得知,ThreadPoolExecutor 继承了 AbstractExecutorService 类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

下面解释下一下构造器中各个参数的含义:

  • corePoolSize:核心线程池大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了 prestartAllCoreThreads()或者 prestartCoreThread()方法,从这 2 个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建 corePoolSize 个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为 0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到缓存队列当中。当有新任务在 execute()方法提交时,会执行以下判断:
    • 如果运行的线程少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;
    • 如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当 workQueue 满时才创建新的线程去处理任务;
    • 如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建的线程池的大小是固定的,这时如果有新任务提交,若 workQueue 未满,则将请求放入 workQueue 中,等待有空闲的线程去 workQueue 中取任务并处理;
    • 如果运行的线程数量大于等于 maximumPoolSize,这时如果 workQueue 已经满了,则通过 handler 所指定的策略来处理任务;
    • 所以,任务提交时,判断的顺序为 corePoolSize –> workQueue –> maximumPoolSize。
  • maximumPoolSize:最大线程池大小,线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
  • keepAliveTime:线程池中超过 corePoolSize 数目的空闲线程最大存活时间;默认情况下,只有当线程池中的线程数大于 corePoolSize 时,keepAliveTime 才会起作用,直到线程池中的线程数不大于 corePoolSize,即当线程池中的线程数大于 corePoolSize 时,如果一个线程空闲的时间达到 keepAliveTime,则会终止,直到线程池中的线程数不超过 corePoolSize。但是如果调用了 allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于 corePoolSize 时,keepAliveTime 参数也会起作用,直到线程池中的线程数为 0。
  • unit:参数 keepAliveTime 的时间单位,有 7 种取值,在 TimeUnit 类中有 7 种静态属性。
  • workQueue:阻塞任务队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响。保存等待执行的任务的阻塞队列,当提交一个新的任务到线程池以后,线程池会根据当前线程池中正在运行着的线程的数量来决定对该任务的处理方式,主要有以下几种处理方式:
    • 直接切换:这种方式常用的队列是 SynchronousQueue。
    • 使用无界队列:一般使用基于链表的阻塞队列 LinkedBlockingQueue。如果使用这种方式,那么线程池中能够创建的最大线程数就是 corePoolSize,而 maximumPoolSize 就不会起作用了。当线程池中所有的核心线程都是 RUNNING 状态时,这时一个新的任务提交就会放入等待队列中。
    • 使用有界队列:一般使用 ArrayBlockingQueue。使用该方式可以将线程池的最大线程数量限制为 maximumPoolSize,这样能够降低资源的消耗,但同时这种方式也使得线程池对线程的调度变得更困难,因为线程池和队列的容量都是有限的值,所以要想使线程池处理任务的吞吐率达到一个相对合理的范围,又想使线程调度相对简单,并且还要尽可能的降低线程池对资源的消耗,就需要合理的设置这两个数量:
      • 如果要想降低系统资源的消耗(包括 CPU 的使用率,操作系统资源的消耗,上下文环境切换的开销等),可以设置较大的队列容量和较小的线程池容量,但这样也会降低线程处理任务的吞吐量。
      • 如果提交的任务经常发生阻塞,那么可以考虑通过调用 setMaximumPoolSize() 方法来重新设定线程池的容量。
      • 如果队列的容量设置的较小,通常需要将线程池的容量设置大一点,这样 CPU 的使用率会相对的高一些。但如果线程池的容量设置的过大,则在提交的任务数量太多的情况下,并发量会增加,那么线程之间的调度就是一个要考虑的问题,因为这样反而有可能降低处理任务的吞吐量。
  • threadFactory:它是 ThreadFactory 类型的变量,用来创建新线程。默认使用 Executors.defaultThreadFactory()来创建线程。使用默认的 ThreadFactory 来创建线程时,会使新创建的线程具有相同的 NORM_PRIORITY 优先级并且是非守护线程,同时也设置了线程的名称。
  • handler:饱和策略,当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是 AbortPolicy,表示无法处理新任务时抛出异常。以下是 JDK 1.5 提供的四种策略:
    • AbortPolicy:直接抛出异常,这是默认策略;
    • CallerRunsPolicy:用调用者所在的线程来执行任务;显然这样不会真的丢弃任务,但是,调用者线程性能可能急剧下降;
    • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,也就是丢弃一个即将被执行的任务,并尝试再次提交当前任务,重复此过程;
    • DiscardPolicy:直接丢弃任务;
    • 当然也可以根据应用场景需要来实现 RejectedExecutionHandler 接口自定义策略,如记录日志或持久化不能处理的任务;

向线程池提交任务

1112095-20170411160744969-1849744379

提交任务有 execute()和 submit()两个方法,下面看看他俩的区别:
1、接收参数不同:execute()的参数是 Runnable,submit()参数可以是 Runnable,也可以是 Callable。
2、返回值不同:execute()没有返回值,submit()有返回值 Future。通过 Future 可以获取各个线程的完成情况,是否有异常,还能试图取消任务的执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void execute(Runnable command) {
// 1. 判断提交的任务是否为 null, 是则抛出异常
if (command == null)
throw new NullPointerException();

/* 2. 获取线程池控制状态: clt 记录着 runState 和 workerCount
* ctl 是一个 AtomicInteger 变量
* jdk 8 中通过一个 int 值的前 28 位表示工作线程数量 workerCount, 剩余高位来表示 线程池状态
* 计算 workerCount 和 runState 时通过掩码计算。 CAPACITY = (1 << 29) - 1
*/
int c = ctl.get();

// 3.workerCountOf 方法取出低 29 位的值, 表示当前活动的线程数; 如果当前活动线程数小于 corePoolSize, 则新建一个线程放入线程池中; 并把任务添加到该线程中.
if (workerCountOf(c) < corePoolSize) {
/* 3.1 addWorker 中的第二个参数表示限制添加线程的数量是根据 corePoolSize 来判断还是 maximumPoolSize 来判断;
* 如果为 true, 根据 corePoolSize 来判断; 如果为 false, 则根据 maximumPoolSize 来判断.
*/
if (addWorker(command, true))
return;
// 3.2 如果添加失败, 则重新获取 ctl 值.
c = ctl.get();
}

// 4.(worker 线程数量大于核心线程池容量时)如果线程池处于 RUNNING 状态,将命令加入 workQueue 队列
if (isRunning(c) && workQueue.offer(command)) {
// 4.1 重新获取 ctl 值, 再次检查防止状态突变
int recheck = ctl.get();
// 4.2 再次判断线程池的运行状态, 如果不是运行状态, 由于之前已经把 command 添加到 workQueue 中了, 这时需要移除该 command, 执行过后通过 handler 使用拒绝策略对该任务进行处理, 整个方法返回
if (!isRunning(recheck) && remove(command))
// 4.2.1 使用拒绝策略对该任务进行处理, 整个方法返回
reject(command);
else if (workerCountOf(recheck) == 0)
/* 4.3 获取线程池中的有效线程数, 如果数量是 0(即核心线程数为 0), 则执行 addWorker 方法. 如果判断 workerCount 大于 0, 则直接返回, 在 workQueue 中新增的 command 会在将来的某个时刻被执行.
* 这里传入的参数表示: 1. 第一个参数为 null, 表示在线程池中创建一个线程, 但不去启动; 2. 第二个参数为 false, 将线程池的有限线程数量的上限设置为 maximumPoolSize, 添加线程时根据 maximumPoolSize 来判断;
*/
addWorker(null, false);
}
/* 5. 如果执行到这里, 有两种情况: 1. 线程池已经不是 RUNNING 状态; 2. 线程池是 RUNNING 状态, 但 workerCount >= corePoolSize 并且 workQueue 已满.
* 这时, 再次调用 addWorker 方法, 但第二个参数传入为 false, 将线程池的有限线程数量的上限设置为 maximumPoolSize;
*/
else if (!addWorker(command, false))
// 5.1 如果失败则拒绝该任务.
reject(command);
}
  • 如果运行的线程小于 corePoolSize,则尝试使用用户定义的 Runnalbe 对象创建一个新的线程。调用 addWorker()函数会原子性的检查 runState 和 workCount,通过返回 false 来防止在不应该添加线程时添加了线程
  • 如果一个任务能够成功入队列,在添加一个线程时仍需要进行双重检查(可能因为在前一次检查后该线程死亡了),或者当进入到此方法时,线程池已经 shutdown 了,所以需要再次检查状态。若线程此时的状态不是 RUNNING,则需要回滚入队列操作;或者当线程池没有工作线程时,需要创建一个新的工作线程。
  • 如果无法入队列,那么需要增加一个新工作线程,如果此操作失败,那么就意味着线程池已经 SHUTDOWN 或者已经饱和了,所以拒绝任务

线程池的关闭

shutdown()

shutdown()方法要将线程池切换到 SHUTDOWN 状态,并调用 interruptIdleWorkers()方法请求中断所有空闲的 worker,最后调用 tryTerminate()尝试结束线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 安全策略判断
checkShutdownAccess();
// 切换状态为 SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断空闲线程
interruptIdleWorkers();
// hook for ScheduledThreadPoolExecutor
onShutdown();
} finally {
mainLock.unlock();
}
// 尝试结束线程池
tryTerminate();
}

1、停止接收新的 submit 的任务;
2、已经提交的任务(包括正在跑的和队列中等待的),会继续执行完成;
3、等到第 2 步完成后,才真正停止;

shutdownNow()

shutdown() 方法要将线程池切换到 STOP 状态,并调用 interruptIdleWorkers() 方法请求中断所有工作线程,无论是否是空闲的,然后取出阻塞队列中没有被执行的任务并返回,最后调用 tryTerminate() 尝试结束线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 安全策略判断
checkShutdownAccess();
// 切换状态为 STOP
advanceRunState(STOP);
// 中断所有工作线程,无论是否空闲
interruptWorkers();
// 取出队列中没有被执行的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试结束线程池
tryTerminate();
// 返回没有被执行的任务
return tasks;
}

1、跟 shutdown()一样,先停止接收新 submit 的任务;
2、忽略队列里等待的任务;
3、尝试将正在执行的任务 interrupt 中断;
4、返回未执行的任务列表;

说明:它试图终止线程的方法是通过调用 Thread.interrupt()方法来实现的,这种方法的作用有限,如果线程中没有 sleep、wait、Condition、定时锁等应用,interrupt()方法是无法中断当前的线程的。所以,shutdownNow()并不代表线程池就一定立即就能退出,它也可能必须要等待所有正在执行的任务都执行完成了才能退出。但是大多数时候是能立即退出的。

深入剖析线程池实现原理

线程池状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// 即高 3 位为 111, 该状态的线程池会接收新任务, 并处理阻塞队列中的任务;
private static final int RUNNING = -1 << COUNT_BITS;
// 即高 3 位为 000, 该状态的线程池不会接收新任务, 但会处理阻塞队列中的任务;
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 即高 3 位为 001, 该状态的线程不会接收新任务, 也不会处理阻塞队列中的任务, 而且会中断正在运行的任务;
private static final int STOP = 1 << COUNT_BITS;
// 即高 3 位为 010;
private static final int TIDYING = 2 << COUNT_BITS;
// 即高 3 位为 011;
private static final int TERMINATED = 3 << COUNT_BITS;

ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量(workerCount),这里可以看到,使用了 Integer 类型来保存,高 3 位保存 runState,低 29 位保存 workerCount。COUNT_BITS 就是 29,CAPACITY 就是 1 左移 29 位减 1(29 个 1),这个常量表示 workerCount 的上限值,大约是 5 亿。

ctl 相关方法:

1
2
3
4
5
6
// 获取运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取活动线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
// 获取运行状态和活动线程数的值
private static int ctlOf(int rs, int wc) { return rs | wc; }

下面再介绍下线程池的运行状态。 线程池一共有五种状态,分别是:

  • RUNNING(运行状态):能接受新提交的任务,并且也能处理阻塞队列中已保存的任务;
  • SHUTDOWN(关闭状态):不能接受新提交的任务,但却可以处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize()方法在执行过程中也会调用 shutdown()方法进入该状态);
  • STOP(停止状态):不能接受新提交的任务,也不能处理阻塞队列中已保存的任务,并且会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow()方法会使线程池进入到该状态;
  • TIDYING(整理状态):如果所有的任务都已终止了,workerCount(有效线程数)为 0,线程池进入该状态后会调用 terminated()方法进入 TERMINATED 状态;
  • TERMINATED(终止状态):在 terminated()方法执行完后进入该状态,默认 terminated()方法中什么也没有做;

threadpool-status

源码分析

3ed00002b9590094a8ed

addWorker

addWorker 方法的主要工作是在线程池中创建一个新的线程并执行,firstTask 参数用于指定新增的线程执行的第一个任务,core 参数为 true 表示在新增线程时会判断当前活动线程数是否少于 corePoolSize,false 表示新增线程前需要判断当前活动线程数是否少于 maximumPoolSize。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (; ; ) {
// 1. 获取线程池控制状态 clt
int c = ctl.get();

// 2. 获取线程池运行状态
int rs = runStateOf(c);

/* 3. 检查线程池状况, 确保此时可以添加新的线程.
* 如果是 RUNNING, 表示线程池处于运行状态, 那么跳过 if.
* 如果 rs>=SHUTDOWN, 同时不等于 SHUTDOWN, 即为 SHUTDOWN 以上的状态, 那么不接受新线程.
* 如果 rs>=SHUTDOWN, 同时等于 SHUTDOWN, 接着判断以下 3 个条件, 只要有 1 个不满足, 则返回 false;
* 1. rs == SHUTDOWN, 这时表示关闭状态, 不再接受新提交的任务, 但却可以继续处理阻塞队列中已保存的任务;
* 2. firsTask 为空; rs == SHUTDOWN, 不再接受新提交的任务, 所以在 firstTask 不为空的时候会返回 false;
* 3. 阻塞队列不为空; 因为队列中已经没有任务了, 不需要再添加线程了;
*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;

for (; ; ) {
// 4. 获取线程池线程数
int wc = workerCountOf(c);

/*
* 5. 判断线程池是否已满, 如果线程池已满返回 false
* 如果线程数大于等于最大容量 CAPACITY 直接返回 false
* core 是一个 boolean 参数, 表明调用者想把此线程添加到哪个线程池, 根据 core 的值判断要添加的线程池是否已满
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;

// 6.CAS 操作增加工作线程数, 尝试增加 workerCount, 如果成功, 则跳出第一个 for 循环
if (compareAndIncrementWorkerCount(c))
break retry;

// 7. 如果增加 workerCount 失败, 则重新获取 ctl 的值
c = ctl.get(); // Re-read ctl

// 8. 如果当前的运行状态不等于 rs, 说明状态已被改变, 返回第一个 for 循环继续执行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 9. 根据 firstTask 来创建 Worker 对象, 每一个 Worker 对象都会创建一个线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 10. 获取到锁
mainLock.lock();
try {
// 11. 再次检查状态, 因为状态可能在获取锁之前改变
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

/* 12. 判断线程池状态
* rs < SHUTDOWN 表示是 RUNNING 状态; 如果 rs 是 RUNNING 状态或者 rs 是 SHUTDOWN 状态并且 firstTask 为 null, 向线程池中添加线程.
* 因为在 SHUTDOWN 时不会在添加新的任务, 但还是会执行 workQueue 中的任务
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers 是一个 HashSet
workers.add(w);
int s = workers.size();
// largestPoolSize 记录着线程池中出现过的最大线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 13. 线程添加成功后就可以启动线程准备执行了
t.start();
workerStarted = true;
}
}
} finally {
// 14. 若线程启动失败, 则进入线程添加失败方法
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

runWorker

线程池创建线程时,会将线程封装成工作线程 Worker,ThreadPool 维护的其实就是一组 Worker 对象,Worker 在执行完任务后,还会无限循环获取工作队列里的任务来执行。我们可以从 Worker 的 runWorker 方法里看到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
final void runWorker(Worker w) {
// 1. 获取当前线程
Thread wt = Thread.currentThread();
// 2. 获取 w 的 firstTask
Runnable task = w.firstTask;
// 3. 设置 w 的 firstTask 为 null
w.firstTask = null;
// 4. 释放锁(设置 state 为 0, 允许中断)
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 5. 如果 task 为空, 则通过 getTask 来获取任务
while (task != null || (task = getTask()) != null) {
// 6. 获取锁
w.lock();

// 7. 这里的检查主要是确保线程池此时还能接收新的任务去执行, 如果不在接收新的任务, 则应该中断当前线程
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 8. 在执行之前调用钩子函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 9. 运行给定的任务
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
// 10. 执行完后调用钩子函数
afterExecute(task, thrown);
}
} finally {
task = null;
// 11. 增加给 worker 完成的任务数量
w.completedTasks++;
// 12. 释放锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 13. 处理完成后, 调用钩子函数
processWorkerExit(w, completedAbruptly);
}
}

总结一下 runWorker 方法的执行过程:

  • while 循环不断地通过 getTask()方法获取任务;
  • getTask()方法从阻塞队列中取任务;
  • 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
  • 调用 task.run()执行任务;
  • 如果 task 为 null 则跳出循环,执行 processWorkerExit()方法;
  • runWorker 方法执行完毕,也代表着 Worker 中的 run 方法执行完毕,销毁线程。

getTask

getTask 方法的主要工作是从阻塞队列中获取任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private Runnable getTask() {
// 1.timeOut 变量的值表示上次从阻塞队列中取任务时是否超时
boolean timedOut = false; // Did the last poll() time out?

for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);

/*
* 2. 检查线程池状态
* 如果线程池状态 rs >= SHUTDOWN, 也就是非 RUNNING 状态, 再进行以下判断:
* 1. rs >= STOP, 线程池是否正在 stop;
* 2. 阻塞队列是否为空.
* 如果以上条件满足, 则将 workerCount 减 1 并返回 null.
* 因为如果当前线程池状态的值是 SHUTDOWN 或以上时, 不允许再向阻塞队列中添加任务.
*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

/*
* 3.timed 变量用于判断是否需要进行超时控制.
* allowCoreThreadTimeOut 默认是 false, 也就是核心线程不允许进行超时;
* wc > corePoolSize, 表示当前线程池中的线程数量大于核心线程数量;
* 对于超过核心线程数量的这些线程, 需要进行超时控制
*/
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

/*
* 4. 对线程池进行超时控制.
* wc > maximumPoolSize 的情况是因为可能在此方法执行阶段同时执行了 setMaximumPoolSize 方法;
* timed && timedOut 如果为 true, 表示当前操作需要进行超时控制, 并且上次从阻塞队列中获取任务发生了超时
* 接下来判断, 如果有效线程数量大于 1, 或者阻塞队列是空的, 那么尝试将 workerCount 减 1;
* 如果减 1 失败, 则返回重试.
* 如果 wc == 1 时, 也就说明当前线程是线程池中唯一的一个线程了.
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
/*
* 5. 根据 timed 来判断.
* 如果为 true, 则通过阻塞队列的 poll 方法进行超时控制, 如果在 keepAliveTime 时间内没有获取到任务, 则返回 null;
* 否则通过 take 方法, 如果这时队列为空, 则 take 方法会阻塞直到队列不为空.
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 6. 如果 r == null, 说明已经超时, timedOut 设置为 true
timedOut = true;
} catch (InterruptedException retry) {
// 7. 如果获取任务时当前线程发生了中断, 则设置 timedOut 为 false 并返回循环重试
timedOut = false;
}
}
}

processWorkerExit

getTask 方法返回 null 时,在 runWorker 方法中会跳出 while 循环,然后会执行 processWorkerExit 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 1. 如果 completedAbruptly 值为 true, 则说明线程执行时出现了异常, 需要将 workerCount 减 1; 如果线程执行时没有出现异常, 说明在 getTask()方法中已经已经对 workerCount 进行了减 1 操作, 这里就不必再减了.
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();,
try {
// 2. 统计完成的任务数
completedTaskCount += w.completedTasks;
// 3. 从 workers 中移除, 也就表示着从线程池中移除了一个工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}

// 4. 根据线程池状态进行判断是否结束线程池
tryTerminate();

int c = ctl.get();
/*
* 5. 当线程池是 RUNNING 或 SHUTDOWN 状态时, 如果 worker 是异常结束, 那么会直接 addWorker;
* 如果 allowCoreThreadTimeOut=true, 并且等待队列有任务, 至少保留一个 worker;
* 如果 allowCoreThreadTimeOut=false,workerCount 不少于 corePoolSize.
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

至此,processWorkerExit 执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从 execute 方法开始,Worker 使用 ThreadFactory 创建新的工作线程,runWorker 通过 getTask 获取任务,然后执行任务,如果 getTask 返回 null,进入 processWorkerExit 方法,整个线程结束。

线程池的监控

通过线程池提供的参数进行监控。

  • getTaskCount:线程池已经执行的和未执行的任务总数;
  • getCompletedTaskCount:线程池已完成的任务数量,该值小于等于 taskCount;
  • getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了 maximumPoolSize;
  • getPoolSize:线程池当前的线程数量;
  • getActiveCount:当前线程池中正在执行任务的线程数量。

通过这些方法,可以对线程池进行监控,在 ThreadPoolExecutor 类中提供了几个空方法,如 beforeExecute(线程执行之前调用)方法,afterExecute(线程执行之后调用)方法和 terminated(线程池退出时候调用)方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自 ThreadPoolExecutor 来进行扩展。

线程池容量的动态调整

ThreadPoolExecutor 提供了动态调整线程池容量大小的方法:setCorePoolSize()和 setMaximumPoolSize()。

  • setCorePoolSize:设置核心池大小
  • setMaximumPoolSize:设置线程池最大能创建的线程数目大小

当上述参数从小变大时,ThreadPoolExecutor 进行线程赋值,还可能立即创建新的线程来执行任务。

如何合理配置线程池的大小

一般需要根据任务的类型来配置线程池大小:

任务性质不同的任务可以用不同规模的线程池分开处理。CPU 密集型任务配置尽可能少的线程数量,如配置 Ncpu+1 个线程的线程池。IO 密集型任务则由于需要等待 IO 操作,线程并不是一直在执行任务,则配置尽可能多的线程,如 2*Ncpu。混合型的任务,如果可以拆分,则将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的 CPU 个数。

《服务器性能 IO 优化》中发现一个估算公式:

最佳线程数目 =((线程等待时间 + 线程 CPU 时间)/ 线程 CPU 时间)*CPU 数目

比如平均每个线程 CPU 运行时间为 0.5s,而线程等待时间(非 CPU 运行时间,比如 IO)为 1.5s,CPU 核心数为 8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)*8=32。这个公式进一步转化为:

最佳线程数目 =(线程等待时间与线程 CPU 时间之比 + 1)*CPU 数目

可以得出一个结论:线程等待时间所占比例越高,需要越多线程。线程 CPU 时间所占比例越高,需要越少线程。当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。


参考博文

[1]. 深入理解 Java 线程池:ThreadPoolExecutor
[2]. java8 线程池
[3]. Java 并发编程:线程池的使用
[4]. 如何合理地估算线程池大小?


Java 并发编程之美系列


谢谢你长得那么好看,还打赏我!😘
0%