目录

Java 线程池

Java Thread Pool

线程池提供了一种限制和管理资源(包括执行一个任务)。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。

这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

为什么使用线程池

  • 统一管理
  • 复用线程
  • 控制并发数量

统一管理不难理解,线程池其实算是一个线程的调度系统。线程池里面有一个调度线程,这个调度线程用于管理布控整个线程池里的各种任务和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等。

复用线程是线程池最大的优势。因为创建和销毁线程开销比较大,如果为每个任务都创建一个新的线程,那其实是不划算的。线程池实现了线程的复用,使得一个线程可以执行多个任务,这在需要大量线程的场景下(比如HTTP请求等),可以很大程度地节约机器资源。

控制并发数量指的是使用线程池可以控制同时运行的线程数量。多线程的优势在于利用计算机的多核心处理能力,但计算机的核心数量是有限的,比如4核、8核等,如果线程数量太多,切换线程有上下文的开销,反而会让整个机器的吞吐量下降。

吞吐量指的是单位时间内能够处理的任务数量。

创建线程,JVM 在后台做了哪些事:

  • 为线程栈分配内存,保存每个线程方法调用的栈帧。
  • 每个栈帧包括本地变量数组、返回值、操作栈和常量池
  • 一些 JVM 支持本地方法,也将分配本地方法栈
  • 每个线程获得一个程序计数器,标识处理器正在执行哪条指令
  • 系统创建本地线程,与 Java 线程对应
  • 和线程相关的描述符被添加到 JVM 内部数据结构
  • 线程共享堆和方法区

当然,这些步骤的具体细节取决于 JVM 和操作系统。

由此可见,创建并开启一个线程开销很大。

线程池原理

在 HotSpot VM 的线程模型中,Java 线程被一对一映射为内核线程。Java 在使用线程执行程序时,需要创建一个内核线程;当该 Java 线程被终止时,这个内核线程也会被回收。因此 Java 线程的创建与销毁将会消耗一定的计算机资源,从而增加系统的性能开销。

除此之外,大量创建线程同样会给系统带来性能问题,因为内存和 CPU 资源都将被线程抢占,如果处理不当,就会发生内存溢出、CPU 使用率超负荷等问题。

为了解决上述两类问题,Java 提供了线程池概念,对于频繁创建线程的业务场景,线程池可以创建固定的线程数量,并且在操作系统底层,轻量级进程将会把这些线程映射到内核。

线程池可以提高线程复用,又可以固定最大线程使用量,防止无限制地创建线程。当程序提交一个任务需要一个线程时,会去线程池中查找是否有空闲的线程,若有,则直接使用线程池中的线程工作,若没有,会去判断当前已创建的线程数量是否超过最大线程数量,如未超过,则创建新线程,如已超过,则进行排队等待或者直接抛出异常。

/images/java/thread-pool-001.png

「核心线程」:线程池中有两类线程,核心线程和非核心线程。核心线程默认情况下创建后,就会一直存在于线程池中,即使这个核心线程什么都不干(铁饭碗),而非核心线程如果长时间的闲置,就会被销毁(临时工)。

「任务队列」:等待队列,维护着等待执行的Runnable任务对象,是一个线程安全的阻塞队列。

「线程池满」:指的是核心线程+非核心线程的总线程数量达到线程池设定的阈值。

「拒绝策略」:线程池满以后,表示当前线程池没有能力处理更多的任务了,那如果来了新的任务该怎么办呢?所以在创建线程池的时候,可以指定这个拒绝策略。

/images/java/thread-pool-002.png

JDK 提供创建线程池 API

ThreadPoolExecutor:是线程池的实现类,也是Executor框架中最核心的类。

Executors:类则扮演着线程池工厂的角色,里面提供了好多静态方法,通过Executors可以取得一个拥有特定功能的线程池。

Executors 类里的静态方法可以创建很多类型的线程池:

JDK 1.5 之后推出了相关的 api

ThreadPoolExecutor:

  • newSingleThreadExecutor():包含单个线程和无界队列的线程池,同一时间只能执行一个任务
  • newFixedThreadPool():包含固定数量线程并共享无界队列的线程池;当所有线程处于工作状态,有新任务提交时,任务在队列中等待,直到一个线程变为可用状态
  • newCachedThreadPool():一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,当需求增加时,则可以添加新的线程,线程池的规模不存在任何的限制。
1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

ScheduledThreadPoolExecutor:

一个固定大小的线程池,并且以延迟或者定时的方式来执行任务,类似于Timer。

1
2
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
public static ScheduledExecutorService newSingleThreadScheduledExecutor()

上面的方法都有一个带自定义的 ThreadFactory 方法构造。

JDK 1.8 增加了基于工作窃取(work-stealing)算法的线程池

ForkJoinPool:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

实际上还是利用 ThreadPoolExecutor 类实现的

Executor 框架

位于Java 并发工具包 java.util.concurrent

/images/java/thread-pool-ThreadPoolExecutor.png

Executor 接口只有一个 execute() 方法,执行提交的 Runnable 任务,一般执行不需要返回值的任务使用。

1
2
3
public interface Executor {
    void execute(Runnable command);
}

ExecutorService 接口主要实现:ThreadPoolExecutorScheduledThreadPoolExecutorForkJoinPool

1
2
3
public interface ExecutorService extends Executor {
    <T> Future<T> submit(Callable<T> task);
}

继承 Executor 除了 execute() 方法,接口也定义了相似的 submit() 方法,这个方法可以返回一个 Future 对象表示等待完成任务的结果。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
ExecutorService executor = Executors.newFixedThreadPool(10);
Callable<Double> callableTask = () -> {
    return 100d;
};
Future<Double> future = executor.submit(callableTask);
// execute other operations
try {
    if (future.isDone()) {
        double result = future.get();
    }
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
executor.shutdown();

当没有任务等待执行时,ExecutorService 并不会自动销毁,所以你可以使用 shutdown() 或 shutdownNow() 来显式关闭它。

ScheduledExecutorServiceExecutorService 的一个子接口 ,增加了执行任务的调度方法,延迟周期性执行任务。

  • Executors#newScheduledThreadPool(int)
  • Executors#newSingleThreadScheduledExecutor()
1
2
3
4
5
6
7
8
9
ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
// 延时 2 毫秒执行任务
Future<Double> future = executor.schedule(callableTask, 2, TimeUnit.MILLISECONDS);
// 延时 2 毫秒执行任务,然后每 2 秒重复一次 不管上一次有没有执行完成
executor.scheduleAtFixedRate(
  () -> System.out.println("Fixed Rate Scheduled"), 2, 2000, TimeUnit.MILLISECONDS);
// 延时 2 毫秒后执行第一次,然后在上一次执行完成 2 秒后再次重复执行。
executor.scheduleWithFixedDelay(
  () -> System.out.println("Fixed Delay Scheduled"), 2, 2000, TimeUnit.MILLISECONDS);

ThreadPoolExecutor

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize 为线程池的基本大小。
  • maximumPoolSize 为线程池最大线程大小。
  • keepAliveTimeunit 则是线程空闲后的存活时间。
  • workQueue 用于存放任务的阻塞队列。
  • handler 当队列和最大线程池都满了之后的饱和策略。

7个参数说明

corePoolSize 线程池核心线程数大小

最小保持存活(keep alive)的工作线程数量

ThreadPoolExecutor.allowCoreThreadTimeOut(boolean)

如果设置 allowCoreThreadTimeOut,那么该值为 0

在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中

maximumPoolSize 线程池最大线程数量。

最大的线程数量,受限于CAPACITY。

keepAliveTime 线程池中非核心线程空闲的存活时间大小

闲置线程等待工作的超时时间,精确到纳秒。时间单位由 TimeUnit 指定。

当线程池中的线程数大于 corePoolSize 时 或者设置 allowCoreThreadTimeOut 的时候生效,否则永远不会超时。

unit 线程空闲存活时间单位

keepAliveTime 的时间单位 TimeUnit

[workQueue](#排队策略 BlockingQueue) 存放任务的阻塞队列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
/**
 * The queue used for holding tasks and handing off to worker
 * threads.  We do not require that workQueue.poll() returning
 * null necessarily means that workQueue.isEmpty(), so rely
 * solely on isEmpty to see if the queue is empty (which we must
 * do for example when deciding whether to transition from
 * SHUTDOWN to TIDYING).  This accommodates special-purpose
 * queues such as DelayQueues for which poll() is allowed to
 * return null even if it may later return non-null when delays
 * expire.
 */
 private final BlockingQueue<Runnable> workQueue;

工作队列,一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,阻塞队列有以下几种选择:

  • ArrayBlockingQueue
  • PriorityBlockingQueue
  • LinkedBlockingQueue
  • Synchronous

[threadFactory](#ThreadFactory 工作线程的创建) 用于设置创建线程的工厂

创建线程的工厂 ,用于批量创建线程,统一在创建线程时设置一些参数,如线程名称、是否守护线程、线程的优先级等。ThreadFactory也是一个接口。如果不指定,会使用DefaultThreadFactory新建一个默认的线程工厂。

很多时候我们会自己实现一个ThreadFactory,在里面指定线程的名称前缀,这样在排查问题的时候就能一眼看到这个线程是在这个线程池里面创建的。

可以给创建的线程设置有意义的名字,可方便排查问题,也可以设置线程执行出现异常的处理策略

handler 线程池的饱和策略

「拒绝处理策略」,线程数量大于最大线程数就会采用拒绝处理策略,

排队策略 BlockingQueue

SynchronousQueue 直接提交

最多只有一个元素的同步队列,一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直是阻塞状态。

newCachedThreadPool采用的便是这种策略。

LinkedBlockingQueue 无界队列

使用无界队列(典型的便是采用预定义容量的 LinkedBlockingQueue,理论上是该缓冲队列可以对无限多的任务排队)将导致在所有 corePoolSize 线程都工作的情况下将新任务加入到缓冲队列中。这样,创建的线程就不会超过 corePoolSize,也因此,maximumPoolSize 的值也就无效了。当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列。newFixedThreadPool采用的便是这种策略。

ArrayBlockingQueue 有界队列

当使用有限的 maximumPoolSizes 时,有界队列(一般缓冲队列使用ArrayBlockingQueue,并制定队列的最大长度)有助于防止资源耗尽,但是可能较难调整和控制,队列大小和最大池大小需要相互折衷,需要设定合理的参数。

拒绝处理策略

「拒绝处理策略」,线程数量大于最大线程数就会采用拒绝处理策略,四种RejectedExecutionHandler 接口实现的拒绝处理策略为:

  • 「ThreadPoolExecutor.AbortPolicy」「默认拒绝处理策略」,丢弃任务并抛出RejectedExecutionException异常。
  • 「ThreadPoolExecutor.DiscardPolicy」:丢弃新来的任务,但是不抛出异常
  • 「ThreadPoolExecutor.DiscardOldestPolicy」:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。不抛异常
  • 「ThreadPoolExecutor.CallerRunsPolicy」:由调用线程处理该任务。不抛异常,executor 如果关闭,任务会丢弃。
  • 可以通过实现 RejectedExecutionHandler 来实现自己的策略。
1
private volatile RejectedExecutionHandler handler;

线程池状态及生命周期

以下内容都是来自ThreadPoolExecutor代码注释。 线程池内的线程状态都是有一个AtomicInteger ctl保持的,是一个原子整数,包装了两个领域含义。

workerCount 有效的线程数 ,线程总数2 ^ 29 -1 ,线程启动数量不包括线程停止的数量,而该值可能是 与活动线程的实际数量暂时不同。例如当ThreadFactory创建线程失败时,线程正在执行退出,统计线程数量依然包括退出的线程。

runState线程状态

  • RUNNING正在接受新的任务并且处理队列中的任务
  • SHUTDOWN 不接受新的任务,但是能处理任务
  • STOP 不能接受新的任务,不能处理队列中的任务,但是可以中断正在执行的任务。
  • TIDYING 所有的任务终止,workerCount为0 ,线程全部过渡到TIDYING状态,即将运行terminated() 钩子方法
  • TERMINATEDterminated() 钩子方法执行完成

/images/java/thread-pool-ThreadPoolExecutor-runstate.png

/images/java/thread-pool-003.png

状态转换顺序

RUNNING -> SHUTDOWN 执行shutdown()

(RUNNING or SHUTDOWN) -> STOP 执行shutdownNow()

SHUTDOWN -> TIDYING 当任务队列和线程池都是空

STOP -> TIDYING 线程池都是空

TIDYING -> TERMINATED 当 terminated()钩子方法执行完 这些状态具体代码实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    //获取线程池的状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //获取线程的数量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //组装状态和数量,成为ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }

原子类 AtomicInteger ctl 来表示状态位,它的高3位表示线程池的状态,低29位表示线程池中现有的线程数,这也是Doug Lea一个天才的设计,用最少的变量来减少锁竞争,提高并发效率。

执行任务 execute

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
  1. 获取当前线程池的状态。
  2. 当前线程数量小于 corePoolSize 时创建一个新的线程运行。
  3. 如果当前线程处于运行状态,并且写入阻塞队列成功。
  4. 双重检查,再次获取线程状态;如果线程状态变了(非运行状态)就需要从阻塞队列移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
  5. 如果当前线程池为空就新创建一个线程并执行。
  6. 如果在第三步的判断为非运行状态,尝试新建线程,如果失败则执行拒绝策略。

线程池运行的四个阶段

  1. poolSize < corePoolSize 且队列为空,此时会新建线程来处理提交的任务
  2. poolSize == corePoolSize,此时提交的任务进入工作队列,工作线程从队列中获取任务执行,此时队列不为空且未满。
  3. poolSize == corePoolSize,并且队列已满,此时也会新建线程来处理提交的任务,但是poolSize < maxPoolSize
  4. poolSize == maxPoolSize,并且队列已满,此时会触发拒绝策略

线程池中的 Worker

1
private final HashSet<Worker> workers = new HashSet<Worker>();

Worker 继承了 AbstractQueuedSynchronizer和Runnable,前者给Worker提供锁的功能,后者执行工作线程的主要方法runWorker(Worker w)(从任务队列捞任务执行)。Worker 引用存在workers集合里面,用mainLock守护。

核心函数 runWorker

每个工作线程的run都执行下面的函数

  1. 从getTask()中获取任务
  2. 锁住 worker
  3. 执行beforeExecute(wt, task),这是ThreadPoolExecutor提供给子类的扩展方法
  4. 运行任务,如果该worker有配置了首次任务,则先执行首次任务且只执行一次。
  5. 执行afterExecute(task, thrown);
  6. 解锁 worker
  7. 如果获取到的任务为 null,关闭 worker

获取任务 getTask()

线程池内部的任务队列是一个阻塞队列,具体实现在构造时传入。

getTask()从任务队列中获取任务,支持阻塞和超时等待任务,四种情况会导致返回null,让worker关闭。

  1. 现有的线程数量超过最大线程数量
  2. 线程池处于STOP状态
  3. 线程池处于SHUTDOWN状态且工作队列为空
  4. 线程等待任务超时,且线程数量超过保留线程数量

核心逻辑:根据timed在阻塞队列上超时等待或者阻塞等待任务,等待任务超时会导致工作线程被关闭。

在以下两种情况下等待任务会超时:

  1. 允许核心线程等待超时,即allowCoreThreadTimeOut(true)
  2. 当前线程是普通线程,此时wc > corePoolSize

如何复用线程

ThreadPoolExecutor在创建线程时,会将线程封装成**「工作线程worker」,并放入「工作线程组」**中,然后这个worker反复从阻塞队列中拿任务去执行。这个Worker是一个内部类,它继承了AQS,实现了Runnable:

1
2
3
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    // 省略
}
1
private final HashSet<Worker> workers = new HashSet<Worker>();

通过线程池的execute(Runnable command)方法,扔进线程池的线程,并没有像我们平时创建线程一样,新建一个Thread,然后调用start方法去启动,而是由一个个worker直接调用run()方法去执行的,这样就达到了复用线程的目的。

ForkJoinPool

实现了 ExecutorService 接口 ,Java 7 中 fork/join 框架的重要组件。

ForkJoinPool 适用于任务创建子任务的情况,或者外部客户端创建大量小任务到线程池。

这种线程池的工作流程如下:

  • 创建 ForkJoinTask 子类
  • 根据某种条件将任务切分成子任务
  • 调用执行任务
  • 将任务结果合并
  • 实例化对象并添加到池中

创建一个 ForkJoinTask,你可以选择 RecursiveActionRecursiveTask 这两个子类,后者有返回值。

计算阶乘,并把任务根据阈值划分成子任务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class FactorialTask extends RecursiveTask<BigInteger> {
    private int start = 1;
    private int n;
    private static final int THRESHOLD = 20;
 
    // standard constructors
    @Override
    protected BigInteger compute() {
        if ((n - start) >= THRESHOLD) {
            return ForkJoinTask.invokeAll(createSubtasks())
              .stream()
              .map(ForkJoinTask::join)
              .reduce(BigInteger.ONE, BigInteger::multiply);
        } else {
            return calculate(start, n);
        }
    }
    
    private Collection<FactorialTask> createSubtasks() {
    	List<FactorialTask> dividedTasks = new ArrayList<>();
    	int mid = (start + n) / 2;
    	dividedTasks.add(new FactorialTask(start, mid));
    	dividedTasks.add(new FactorialTask(mid + 1, n));
    	return dividedTasks;
	}
    private BigInteger calculate(int start, int n) {
    	return IntStream.rangeClosed(start, n)
      			.mapToObj(BigInteger::valueOf)
      			.reduce(BigInteger.ONE, BigInteger::multiply);
	}
}
// 添加到线程池
ForkJoinPool pool = ForkJoinPool.commonPool();
BigInteger result = pool.invoke(new FactorialTask(100));

当选择线程池时,非常重要的一点是牢记创建、管理线程以及线程间切换执行会带来的开销。

ThreadPoolExecutor 可以控制线程数量和每个线程执行的任务。这很适合你需要在不同的线程上执行少量巨大的任务。

相比较而言,ForkJoinPool 基于线程从其他线程“窃取”任务。正因如此,当任务可以分割成小任务时可以提高效率。

为了实现工作窃取算法,fork/join 框架使用两种队列:

  • 包含所有任务的主要队列
  • 每个线程的任务队列

当线程执行完自己任务队列中的任务,它们试图从其他队列获取任务。为了使这一过程更加高效,线程任务队列使用双端队列(double ended queue)数据结构,一端与线程交互,另一端用于“窃取”任务。

/images/java/thread-pool-004.png

和这种模型相比,ThreadPoolExecutor 只使用一个主要队列。

最后要注意的一点 ForkJoinPool 只适用于任务可以创建子任务。否则它和 ThreadPoolExecutor没区别,甚至开销更大。

ScheduledThreadPoolExecutor

继承 ThreadPoolExecutor 实现接口 ScheduledExecutorService

1
2
3
4
public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
}

总结

线程池的实现原理,Java 线程的创建和消耗会给系统带来性能开销,因此 Java 提供了线程池来复用线程,提高程序的并发效率。

Java 通过用户线程与内核线程结合的 1:1 线程模型来实现,Java 将线程的调度和管理设置在了用户态,提供了一套 Executor 框架来帮助开发人员提高效率。Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,可以说 Executor 框架为并发编程提供了一个完善的架构体系。

在不同的业务场景以及不同配置的部署机器中,线程池的线程数量设置是不一样的。

其设置不宜过大,也不宜过小,要根据具体情况,计算出一个大概的数值,再通过实际的性能测试,计算出一个合理的线程数量。

我们要提高线程池的处理能力,一定要先保证一个合理的线程数量,也就是保证 CPU 处理线程的最大化。在此前提下,我们再增大线程池队列,通过队列将来不及处理的线程缓存起来。在设置缓存队列时,我们要尽量使用一个有界队列,以防因队列过大而导致的内存溢出问题。