Java 并发编程之美(三):异步执行框架 Eexecutor

前言

在 Java5 之后,并发编程引入了一堆新的启动、调度和管理线程的 API。Executor 框架便是 Java5 中引入的,其内部使用了线程池机制,它在 java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。Eexecutor 作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者 - 消费者模式,其提交任务的线程相当于生产者,执行任务的线程相当于消费者,并用 Runnable 来表示任务,Executor 的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制。

Executor 包括:ThreadPoolExecutor,Executor,Executors,ExecutorService,CompletionService,Future,Callable 等。

Executor 简介

Executor 的 UML 图:(常用的几个接口和子类)

776259-20160426201537486-1323529733

  • Executor:一个接口,其定义了一个接收 Runnable 对象的方法 executor,其方法签名为 executor(Runnable command)。
  • ExecutorService:是一个比 Executor 使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回 Future 的方法;ExecutorService 接口继承自 Executor 接口,它提供了更丰富的实现多线程的方法,比如:ExecutorService 提供了关闭自己的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。可以调用 ExecutorService 的 shutdown()方法来平滑地关闭 ExecutorService,调用该方法后,将导致 ExecutorService 停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭 ExecutorService。因此我们一般用该接口来实现和管理多线程。
  • AbstractExecutorService:ExecutorService 执行方法的默认实现。
  • ScheduledExecutorService:一个可定时调度任务的接口。
  • ScheduledThreadPoolExecutor:ScheduledExecutorService 的实现,一个可定时调度任务的线程池。
  • ThreadPoolExecutor:线程池,是线程池的核心实现类,用来执行被提交的任务。可以通过调用 Executors 以下静态工厂方法来创建线程池并返回一个 ExecutorService 对象。
  • Executors:提供了一系列静态工厂方法用于创建各种线程池。

Executors 简介

Executors:提供了一系列静态工厂方法用于创建各种线程池。

Executors 提供的线程池配置方案

newFixedThreadPool

构造一个固定线程数目的线程池,配置的 corePoolSize 与 maximumPoolSize 大小相同,同时使用了一个无界 LinkedBlockingQueue 存放阻塞任务,因此多余的任务将存放在阻塞队列,不会由 RejectedExecutionHandler 处理。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

方法签名:

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

newCachedThreadPool

构造一个缓冲功能的线程池,配置 corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一个无容量的阻塞队列 SynchronousQueue,因此任务提交之后,将会创建新的线程执行;线程空闲超过 60s 将会销毁。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60 秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。

此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说 JVM)能够创建的最大线程大小,极端情况下会因为创建过多线程而耗尽系统资源。这里虽然指定 maximumPool 为 Integer.MAX_VALUE,但没什么意义,如果不能满足任务执行需求,CachedThreadPool 还会继续创建新的线程。

方法签名:

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

newSingleThreadExecutor

构造一个只支持一个线程的线程池,配置 corePoolSize=maximumPoolSize=1,无界阻塞队列 LinkedBlockingQueue;保证任务由一个线程串行执行;如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

方法签名:

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

使用案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);

/**
* 1.schedule: 初始化延迟 3s 开始执行.
*/
scheduledThreadPool.schedule(() -> System.out.println("delay 3 seconds"), 3, TimeUnit.SECONDS);

/**
* 2.scheduleAtFixedRate: 按指定频率周期执行某个任务;
* 初始化延迟 3s 开始执行, 每隔 3s 重新执行一次任务[以第一个任务开始计时].
* 以第一个任务开始的时间计时, 3 秒过去后, 检测上一个任务是否执行完毕, 如果上一个任务执行完毕, 则当前任务立即执行, 如果上一个任务没有执行完毕, 则需要等上一个任务执行完毕后立即执行.
*/
scheduledThreadPool.scheduleAtFixedRate(() -> System.out.println("delay 3 seconds, and scheduleAtFixedRate every 3 seconds"), 3, 3, TimeUnit.SECONDS);

/**
* 3.scheduleAtFixedRate: 按指定频率间隔执行某个任务;
* 初始化时延时 3s 开始执行, 本次执行结束后延迟 3s 开始下次执行.
*/
scheduledThreadPool.scheduleWithFixedDelay(() -> System.out.println("delay 3 seconds, and scheduleWithFixedDelay every 3 seconds"), 3, 3, TimeUnit.SECONDS);

newScheduledThreadPool

构造一个有定时功能的线程池,配置 corePoolSize,无界延迟阻塞队列 DelayedWorkQueue;有意思的是:maximumPoolSize=Integer.MAX_VALUE,由于 DelayedWorkQueue 是无界队列,所以这个值是没有意义的。

方法签名:

1
2
3
4
5
6
7
8
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

newWorkStealingPool

newWorkStealingPool 是 Jdk1.8 新增一个线程池,会根据所需的并行层次来动态创建和关闭线程,通过使用多个队列减少竞争,底层用的 ForkJoinPool 来实现的。ForkJoinPool 的优势在于,可以充分利用多 CPU,多核 CPU 的优势,把一个任务拆分成多个 “小任务”,把多个“小任务” 放到多个处理器核心上并行执行;当多个 “小任务” 执行完成之后,再将这些执行结果合并起来即可。

方法签名:

1
2
3
4
5
6
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

五种线程池的使用场景

  • newSingleThreadExecutor:一个单线程的线程池,可以用于需要保证顺序执行的场景,并且只有一个线程在执行。
  • newFixedThreadPool:一个固定大小的线程池,可以用于已知并发压力的情况下,对线程数做限制。FixedThreadPool 满足了资源管理的需求,可以限制当前线程数量。适用于负载较重的服务器环境。
  • newCachedThreadPool:一个可以无限扩大的线程池,比较适合处理执行时间比较小的任务。CachedThreadPool 适用于执行很多短期异步任务的小程序,适用于负载较轻的服务器。
  • newScheduledThreadPool:一个有定时功能的线程池,适用于需要多个后台线程执行周期任务的场景,并且为了满足资源管理需求而限制后台线程数量的场景。
  • newWorkStealingPool:一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用 CPU 数量的线程来并行执行。

参考博文

[1]. java 并发编程 –Executor 框架


Java 并发编程之美系列


谢谢你长得那么好看,还打赏我!😘
0%