目录

Thread Pool 实践

J.U.C 提供的线程池:ThreadPoolExecutor类,帮助开发人员管理线程并方便地执行并行任务。了解并合理使用线程池,是一个开发人员必修的基本功。

使用线程池的潜在风险:

  • 用的线程池过大或过小:如果线程池包含太多线程,会明显的影响应用的性能;另一方面,线程池太小并不能带来所期待的性能提升。
  • 正如其他多线程情形一样,死锁也会发生。举个例子,一个任务可能等待另一个任务完成,而后者并没有可用线程处理执行。所以说避免任务之间的依赖是个好习惯。
  • 等待执行时间很长的任务:为了避免长时间阻塞线程,你可以指定最大等待时间,并决定过期任务是拒绝处理还是重新加入队列。

为了降低风险,你必须根据要处理的任务,来谨慎选择线程池的类型和参数。对你的系统进行压力测试也是值得的,它可以帮你获取真实环境下的系统行为数据。

《阿里巴巴 Java 手册》中关于线程池的部分:

  • 【强制】创建线程或线程池请指定有意义的线程名称,方便出错回溯。
  • 【强制】线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。
  • 【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
    • FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM
    • CachedThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

推荐手动创建线程池

线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors返回的线程池对象的弊端如下: 1)FixedThreadPool和SingleThreadPool:   允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。 2)CachedThreadPool:   允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

Positive example 1:

1
2
3
//org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());

Positive example 2:

1
2
3
4
5
6
7
8
9
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
        .setNameFormat("demo-pool-%d").build();
//Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(5, 200,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

pool.execute(()-> System.out.println(Thread.currentThread().getName()));
pool.shutdown();//gracefully shutdown

ThreadFactory 工作线程的创建

新的线程是通过 ThreadFactory 来创建的,如果没有指定,默认的 Executors#defaultThreadFactory 将被使用,这个时候创建的线程将都属于同一个线程组,拥有同样的优先级和 daemon 状态。扩展配置 ThreadFactory,我们可以配置线程的名字、线程组合 daemon 状态。如果调用 ThreadFactory#createThread 的时候失败,将返回null,executor将不会执行任何任务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public class UserThreadFactory implements ThreadFactory{
  private final String namePrefix
    private final AtomicInterger nextId = new AtomicInteger(1);
  UserThreadFactory(String whatFeaturOfGroup){
    namePrefix = "From UserThreadFactory's" + whatFeaturOfGroup + "-Worker-";
  }
  @Override
  public Thread newThread(Runnable task){
    String name =  namePrefix+ nextId.getAndIncrement();
    Thread thread = new Thread(null,task,name,0,false);
    System.out.println(thread.getName());
    return thread;
  }
}

使用 guava 提供的 ThreadFactoryBuilder 来创建线程池。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
    .setNameFormat("demo-pool-%d").build();
 
private static ExecutorService pool = new ThreadPoolExecutor(5, 200,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
 
public static void main(String[] args) {
    for (int i = 0; i < Integer.MAX_VALUE; i++) {
        pool.execute(new SubThread());
    }
}

钩子方法(Hook methods)

ThreadPoolExecutor 提供了 protected 类型可以被覆盖的钩子方法,beforeExecuteafterExecute 允许用户在任务执行前后调用。比如重新初始化 ThreadLocal、收集统计信息、如记录日志等操作。terminated() 可以用来在任务被执行完的时候让用户插入逻辑。如果 hook 方法执行失败,则内部的工作线程的执行将会失败或被中断。

1
2
3
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

队列维护

getQueue() 方法可以用来访问工作以进行一些统计或者debug工作,我们不建议用作其他用途。同时 remove() 方法和 purge 方法可以用来将任务从队列中移除。

优雅的关闭线程池

当线程池不在被引用并且工作线程数为0的时候,线程池将被终止。我们也可以调用shutdown来手动终止线程池。如果我们忘记调用shutdown,为了让线程资源被释放,我们还可以使用 keepAliveTime 和 allowCoreThreadTimeOut 来达到目的。

有运行任务自然也有关闭任务,从上文提到的 5 个状态就能看出如何来关闭线程池。

其实无非就是两个方法 shutdown()/shutdownNow()

但他们有着重要的区别:

  • shutdown() 执行后停止接受新任务,会把队列的任务执行完毕。
  • shutdownNow() 也是停止接受新任务,但会中断所有的任务,将线程池状态变为 stop。

两个方法都会中断线程,用户可自行判断是否需要响应中断。

shutdownNow() 要更简单粗暴,可以根据实际场景选择不同的方法。

通常是按照以下方式关闭线程池的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
long start = System.currentTimeMillis();
for (int i = 0; i <= 5; i++) {
    pool.execute(new Job());
}

pool.shutdown();

while (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
    LOGGER.info("线程还在执行。。。");
}
long end = System.currentTimeMillis();
LOGGER.info("一共处理了【{}】", (end - start));

pool.awaitTermination(1, TimeUnit.SECONDS) 会每隔一秒钟检查一次是否执行完毕(状态为 TERMINATED),当从 while 循环退出时就表明线程池已经完全终止了。

Spring Boot 使用线程池

创建了一个线程池的 bean,在使用时直接从 Spring 中取出即可。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@Configuration
public class TreadPoolConfig {
    /**
     * 消费队列线程
     * @return
     */
    @Bean(value = "consumerQueueThreadPool")
    public ExecutorService buildConsumerQueueThreadPool(){
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("consumer-queue-thread-%d").build();
        ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5),namedThreadFactory,new ThreadPoolExecutor.AbortPolicy());
        return pool ;
    }
}

使用

1
2
3
4
5
6
7
8
9
@Resource(name = "consumerQueueThreadPool")
private ExecutorService consumerQueueThreadPool;
@Override
public void execute() {
    //消费队列
    for (int i = 0; i < 5; i++) {
        consumerQueueThreadPool.execute(new ConsumerQueueThread());
    }
}

ThreadPoolTaskExecutor :这个是springboot基于ThreadPoolExecutor实现的一个线程池执行类。如果没有配置线程池的话,springboot会自动配置一个ThreadPoolTaskExecutor 线程池到bean当中,springboot的线程池自动装配类:TaskExecutionAutoConfiguration

  • 通过@Async注解调用

在Application启动类上面加上@EnableAsync

在需要异步执行的方法上加上@Async注解

  • 直接注入ThreadPoolTaskExecutor

监控线程池

谈到了 SpringBoot,也可利用它 actuator 组件来做线程池的监控。

线程怎么说都是稀缺资源,对线程池的监控可以知道自己任务执行的状况、效率等。

其实 ThreadPoolExecutor 本身已经提供了不少 api 可以获取线程状态:

/images/java/thread-pool-007.png

只需要将这些信息暴露到 SpringBoot 的监控端点中,我们就可以在可视化页面查看当前的线程池状态了。

甚至我们可以继承线程池扩展其中的几个函数来自定义监控逻辑:

1
2
3
protected void beforeExecute(Thread t, Runnable r) { }  
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

可以在线程执行前、后、终止状态执行自定义逻辑。

线程池隔离

线程池看似很美好,但也会带来一些问题。

如果我们很多业务都依赖于同一个线程池,当其中一个业务因为各种不可控的原因消耗了所有的线程,导致线程池全部占满。

这样其他的业务也就不能正常运转了,这对系统的打击是巨大的。

比如我们 Tomcat 接受请求的线程池,假设其中一些响应特别慢,线程资源得不到回收释放;线程池慢慢被占满,最坏的情况就是整个应用都不能提供服务。

所以我们需要将线程池进行隔离

通常的做法是按照业务进行划分:

比如下单的任务用一个线程池,获取数据的任务用另一个线程池。这样即使其中一个出现问题把线程池耗尽,那也不会影响其他的任务运行。

hystrix 隔离

Hystrix 是一款开源的容错插件,具有依赖隔离、系统容错降级等功能。

首先需要定义两个线程池,分别用于执行订单、处理用户。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public class CommandOrder extends HystrixCommand<String> {

    private final static Logger LOGGER = LoggerFactory.getLogger(CommandOrder.class);

    private String orderName;

    public CommandOrder(String orderName) {


        super(Setter.withGroupKey(
                //服务分组
                HystrixCommandGroupKey.Factory.asKey("OrderGroup"))
                //线程分组
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("OrderPool"))

                //线程池配置
                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                        .withCoreSize(10)
                        .withKeepAliveTimeMinutes(5)
                        .withMaxQueueSize(10)
                        .withQueueSizeRejectionThreshold(10000))

                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD))
        )
        ;
        this.orderName = orderName;
    }


    @Override
    public String run() throws Exception {

        LOGGER.info("orderName=[{}]", orderName);

        TimeUnit.MILLISECONDS.sleep(100);
        return "OrderName=" + orderName;
    }
}

public class CommandUser extends HystrixCommand<String> {

    private final static Logger LOGGER = LoggerFactory.getLogger(CommandUser.class);

    private String userName;

    public CommandUser(String userName) {


        super(Setter.withGroupKey(
                //服务分组
                HystrixCommandGroupKey.Factory.asKey("UserGroup"))
                //线程分组
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("UserPool"))

                //线程池配置
                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                        .withCoreSize(10)
                        .withKeepAliveTimeMinutes(5)
                        .withMaxQueueSize(10)
                        .withQueueSizeRejectionThreshold(10000))

                //线程池隔离
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD))
        );
        this.userName = userName;
    }

    @Override
    public String run() throws Exception {

        LOGGER.info("userName=[{}]", userName);

        TimeUnit.MILLISECONDS.sleep(100);
        return "userName=" + userName;
    }
}

模拟运行:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) throws Exception {
    CommandOrder commandPhone = new CommandOrder("手机");
    CommandOrder command = new CommandOrder("电视");


    //阻塞方式执行
    String execute = commandPhone.execute();
    LOGGER.info("execute=[{}]", execute);

    //异步非阻塞方式
    Future<String> queue = command.queue();
    String value = queue.get(200, TimeUnit.MILLISECONDS);
    LOGGER.info("value=[{}]", value);


    CommandUser commandUser = new CommandUser("张三");
    String name = commandUser.execute();
    LOGGER.info("name=[{}]", name);
}

运行结果:

/images/java/thread-pool-006.jpg

可以看到两个任务分成了两个线程池运行,他们之间互不干扰。

获取任务任务结果支持同步阻塞和异步非阻塞方式,可自行选择。

它的实现原理其实容易猜到:

利用一个 Map 来存放不同业务对应的线程池。

计算线程数量

环境具有多变性,设置一个绝对精准的线程数其实是不大可能的,但我们可以通过一些实际操作因素来计算出一个合理的线程数,避免由于线程池设置不合理而导致的性能问题。

  • IO 密集型任务:由于线程并不是一直在运行,所以可以尽可能的多配置线程,比如 CPU 个数 * 2

  • CPU 密集型任务

    这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

4 核 intel i5 CPU

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class CPUTypeTest implements Runnable {
  // 整体执行时间,包括在队列中等待的时间
  List<Long> wholeTimeList;
  // 真正执行时间
  List<Long> runTimeList;
  private long initStartTime = 0;

  public CPUTypeTest(List<Long> runTimeList, List<Long> wholeTimeList) {
    initStartTime = System.currentTimeMillis();
    this.runTimeList = runTimeList;
    this.wholeTimeList = wholeTimeList;
  }

  public boolean isPrime(final int number) {
    if (number <= 1){
      return false;
    }
		for (int i = 2; i <= Math.sqrt(number); i++) {
			if (number % i == 0)
				return false;
		}
		return true;
	}

	public int countPrimes(final int lower, final int upper) {
		int total = 0;
		for (int i = lower; i <= upper; i++) {
			if (isPrime(i))
				total++;
		}
		return total;
	}

	public void run() {
		long start = System.currentTimeMillis();
		countPrimes(1, 1000000);
		long end = System.currentTimeMillis();

		long wholeTime = end - initStartTime;
		long runTime = end - start;
		wholeTimeList.add(wholeTime);
		runTimeList.add(runTime);
		System.out.println(" 单个线程花费时间:" + (end - start));
	}
}

/images/java/thread-pool-008.png

I/O 密集型任务

这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class IOTypeTest implements Runnable {

	// 整体执行时间,包括在队列中等待的时间
	Vector<Long> wholeTimeList;
	// 真正执行时间
	Vector<Long> runTimeList;

	private long initStartTime = 0;

	public IOTypeTest(Vector<Long> runTimeList, Vector<Long> wholeTimeList) {
		initStartTime = System.currentTimeMillis();
		this.runTimeList = runTimeList;
		this.wholeTimeList = wholeTimeList;
	}

	public void readAndWrite() throws IOException {
		File sourceFile = new File("D:/test.txt");
// 创建输入流
BufferedReader input = new BufferedReader(new FileReader(sourceFile));
// 读取源文件, 写入到新的文件
String line = null;
while((line = input.readLine()) != null){
//System.out.println(line);
}
// 关闭输入输出流
input.close();
	}


	public void run() {
		long start = System.currentTimeMillis();
		try {
			readAndWrite();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		long end = System.currentTimeMillis();

		long wholeTime = end - initStartTime;
		long runTime = end - start;
		wholeTimeList.add(wholeTime);
		runTimeList.add(runTime);
		System.out.println(" 单个线程花费时间:" + (end - start));
	}
}

备注:由于测试代码读取 2MB 大小的文件,涉及到大内存,所以在运行之前,我们需要调整 JVM 的堆内存空间:-Xms4g -Xmx4g,避免发生频繁的 FullGC,影响测试结果。

/images/java/thread-pool-005.png

平常的应用场景中,遇不到这两种极端情况,可参考以下公式计算线程数

线程数= N * (1+WT / ST)

  • N CPU 核数
  • WT 线程等待时间
  • ST 线程时间运行时间

以通过 JDK 自带的工具 VisualVM 来查看 WT/ST 比例

当然这些都是经验值,最好的方式还是根据实际情况测试得出最佳配置。

综合来看,我们可以根据自己的业务场景,从“N+1”和“2N”两个公式中选出一个适合的,计算出一个大概的线程数量,之后通过实际压测,逐渐往“增大线程数量”和“减小线程数量”这两个方向调整,然后观察整体的处理时间变化,最终确定一个具体的线程数量。