目录

Spring Boot 异步调用

在日常开发中,我们的逻辑都是同步调用,顺序执行。在一些场景下,我们会希望异步调用,将和主线程关联度低的逻辑异步调用,以实现让主线程更快的执行完成,提升性能。例如说:记录用户访问日志到数据库,记录管理员操作日志到数据库中。

  • 同步调用:指程序按照 定义顺序 依次执行,每一行程序都必须等待上一行程序执行完成之后才能执行;
  • 异步调用:指程序在顺序执行时,不等待异步调用的语句返回结果,就执行后面的程序。

考虑到异步调用的可靠性,我们一般会考虑引入分布式消息队列,例如说 RabbitMQ、RocketMQ、Kafka 等等。但是在一些时候,我们并不需要这么高的可靠性,可以使用进程内的队列或者线程池。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),new BasicThreadFactory.Builder()
              .namingPattern("schedule-pool-%d")
              .daemon(true)
              .build());
    }
public class Demo {

    public static void main(String[] args) {
        ExecutorService executor = newFixedThreadPool(10);

        // 提交任务到线程池中执行。
        executor.submit(new Runnable() {

            @Override
            public void run() {
                System.out.println("听说我被异步调用了");
            }

        });
    }

}

分布式消息队列,异步调用会以一个消息的形式,存储在消息队列的服务器上,所以即使 JVM 进程被异常关闭,消息依然在消息队列的服务器上。

所以,使用进程内的队列或者线程池来实现异步调用的话,一定要尽可能的保证 JVM 进程的优雅关闭,保证它们在关闭前被执行完成。

@Async 快速入门

在 Spring Framework 的 Spring Task 模块,提供了 @Async 注解,可以添加在方法上,自动实现该方法的异步调用。该模块,还提供了定时任务的功能。

实现原理上,也是基于 Spring AOP 拦截,实现异步提交该操作到线程池中,达到异步调用的目的。(具体实现是通过后置处理器 AsyncAnnotationBeanPostProcessor)

引入相关依赖

1
2
3
4
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
</dependency>

开启异步支持

1
2
3
@Configuration
@EnableAsync
public class SpringAsyncConfig {   }
  • annotation 默认情况下, @EnableAsync检测 Spring 的@Async注释和 EJB 3.1 javax.ejb.Asynchronous。我们也可以使用此选项来检测其他用户定义的注释类型。
  • mode 指示应使用的建议类型——基于 JDK 代理或 AspectJ 编织。
  • proxyTargetClass 指示应使用的代理类型 - CGLIB 或 JDK。仅当模式设置为AdviceMode.PROXY时,此属性才有效。
  • order 设置应用 AsyncAnnotationBeanPostProcessor 的顺序。默认情况下,它最后运行,以便它可以考虑所有现有的代理。

在 application.yml 中,添加 Spring Task 定时任务的配置,如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
spring:
  task:
    # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。
    execution:
      thread-name-prefix: task- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
      pool: # 线程池相关
        core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
        max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
        keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
        queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。
        allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
      shutdown:
        await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
        await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置
  • 在 spring.task.execution 配置项,Spring Task 调度任务的配置,对应 TaskExecutionProperties 配置类。
  • Spring Boot TaskExecutionAutoConfiguration 自动化配置类,实现 Spring Task 的自动配置,创建 ThreadPoolTaskExecutor 基于线程池的任务执行器。本质上,ThreadPoolTaskExecutor 是基于 ThreadPoolExecutor 的封装,主要增加提交任务,返回 ListenableFuture 对象的功能。

注意,spring.task.execution.shutdown 配置项,是为了实现 Spring Task 异步任务的优雅关闭。我们想象一下,如果异步任务在执行的过程中,如果应用开始关闭,把异步任务需要使用到的 Spring Bean 进行销毁,例如说数据库连接池,那么此时异步任务还在执行中,一旦需要访问数据库,可能会导致报错。

  • 所以,通过配置 await-termination = true ,实现应用关闭时,等待异步任务执行完成。这样,应用在关闭的时,Spring 会优先等待 ThreadPoolTaskScheduler 执行完任务之后,再开始 Spring Bean 的销毁。
  • 同时,又考虑到我们不可能无限等待异步任务全部执行结束,因此可以配置 await-termination-period = 60 ,等待任务完成的最大时长,单位为秒。具体设置多少的等待时长,可以根据自己应用的需要。

@Async 注解

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@Service
public class DemoService {

    private Logger logger = LoggerFactory.getLogger(getClass());

    public Integer execute01() {
        logger.info("[execute01]");
        sleep(10);
        return 1;
    }

    public Integer execute02() {
        logger.info("[execute02]");
        sleep(5);
        return 2;
    }

    @Async
public Integer execute01Async() {
    return this.execute01();
}

@Async
public Integer execute02Async() {
    return this.execute02();
}

@Async
public Future<Integer> execute01AsyncWithFuture() {
    return AsyncResult.forValue(this.execute01());
}

@Async
public Future<Integer> execute02AsyncWithFuture() {
    return AsyncResult.forValue(this.execute02());
}

    private static void sleep(int seconds) {
        try {
            Thread.sleep(seconds * 1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class DemoServiceTest {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private DemoService demoService;

// 消耗 15 秒左右
    @Test
    public void task01() {
        long now = System.currentTimeMillis();
        logger.info("[task01][开始执行]");

        demoService.execute01();
        demoService.execute02();

        logger.info("[task01][结束执行,消耗时长 {} 毫秒]", System.currentTimeMillis() - now);
    }

// 注意,实际这两个方法,并没有执行完成。
    @Test
public void task02() {
    long now = System.currentTimeMillis();
    logger.info("[task02][开始执行]");

    demoService.execute01Async();
    demoService.execute02Async();

    logger.info("[task02][结束执行,消耗时长 {} 毫秒]", System.currentTimeMillis() - now);
}
// 消耗 10 秒左右
@Test
public void task03() throws ExecutionException, InterruptedException {
    long now = System.currentTimeMillis();
    logger.info("[task03][开始执行]");

    // <1> 执行任务
    Future<Integer> execute01Result = demoService.execute01AsyncWithFuture();
    Future<Integer> execute02Result = demoService.execute02AsyncWithFuture();
    // <2> 阻塞等待结果
    execute01Result.get();
    execute02Result.get();

    logger.info("[task03][结束执行,消耗时长 {} 毫秒]", System.currentTimeMillis() - now);
}

}

异步回调

AsyncResult

表示异步结果。返回结果分成两种情况

  • 执行成功时,调用 AsyncResult#forValue(V value) 静态方法,返回成功的 ListenableFuture 对象。
  • 执行异常时,调用 AsyncResult#forExecutionException(Throwable ex) 静态方法,返回异常的 ListenableFuture 对象。

同时,AsyncResult 实现了 ListenableFuture 接口,提供异步执行结果的回调处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public interface ListenableFuture<T> extends Future<T> {

    // 添加回调方法,统一处理成功和异常的情况。
    void addCallback(ListenableFutureCallback<? super T> callback);

    // 添加成功和失败的回调方法,分别处理成功和异常的情况。
    void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);


    //将 ListenableFuture 转换成 JDK8 提供的 CompletableFuture    
    default CompletableFuture<T> completable() {
        CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
        addCallback(completable::complete, completable::completeExceptionally);
        return completable;
    }
}

public interface Future<V> {

    // 获取异步执行的结果,如果没有结果可用,此方法会阻塞直到异步计算完成。
    V get() throws InterruptedException, ExecutionException;

    // 获取异步执行结果,如果没有结果可用,此方法会阻塞,但是会有时间限制,如果阻塞时间超过设定的 timeout 时间,该方法将抛出异常。
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

    // 如果任务执行结束,无论是正常结束或是中途取消还是发生异常,都返回 true 。
    boolean isDone();

    // 如果任务完成前被取消,则返回 true 。
    boolean isCancelled();

    // 如果任务还没开始,执行 cancel(...) 方法将返回 false;
    // 如果任务已经启动,执行 cancel(true) 方法将以中断执行此任务线程的方式来试图停止任务,如果停止成功,返回 true ;
    // 当任务已经启动,执行c ancel(false) 方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时返回 false ;
    // 当任务已经完成,执行 cancel(...) 方法将返回 false 。
    // mayInterruptRunning 参数表示是否中断执行中的线程。
    boolean cancel(boolean mayInterruptIfRunning);
}

ListenableFutureCallback 接口,同时继承 SuccessCallback 和 FailureCallback 接口。

ListenableFutureTask

调用使用 @Async 注解的方法时,如果方法返回的类型是 ListenableFuture 的情况下,实际方法返回的是 ListenableFutureTask 对象。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// ListenableFutureTask.java

@Override
protected void done() {
    Throwable cause;
    try {
       // <1> 获得执行结果
        T result = get();
        // <2.1> 执行成功,执行成功的回调
        this.callbacks.success(result);
        return;
    } catch (InterruptedException ex) { // 如果有中断异常 InterruptedException ,则打断当前线程,并直接返回
        Thread.currentThread().interrupt();
        return;
    } catch (ExecutionException ex) { // 如果有 ExecutionException 异常,获得其真实的异常,并设置到 cause 中
        cause = ex.getCause();
        if (cause == null) {
            cause = ex;
        }
    } catch (Throwable ex) { // 设置异常到 cause 中
        cause = ex;
    }
    // 执行异常,执行异常的回调
    this.callbacks.failure(cause);
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// DemoService.java
@Async
public ListenableFuture<Integer> execute01AsyncWithListenableFuture() {
    try {
        return AsyncResult.forValue(this.execute02());
    } catch (Throwable ex) {
        return AsyncResult.forExecutionException(ex);
    }
}

// DemoServiceTest.java
@Test
public void task04() throws ExecutionException, InterruptedException {
    long now = System.currentTimeMillis();
    logger.info("[task04][开始执行]");

    // <1> 执行任务
    ListenableFuture<Integer> execute01Result = demoService.execute01AsyncWithListenableFuture();
    logger.info("[task04][execute01Result 的类型是:({})]",execute01Result.getClass().getSimpleName());
    execute01Result.addCallback(new SuccessCallback<Integer>() { // <2.1> 增加成功的回调

        @Override
        public void onSuccess(Integer result) {
            logger.info("[onSuccess][result: {}]", result);
        }

    }, new FailureCallback() { // <2.1> 增加失败的回调

        @Override
        public void onFailure(Throwable ex) {
            logger.info("[onFailure][发生异常]", ex);
        }

    });
    execute01Result.addCallback(new ListenableFutureCallback<Integer>() { // <2.2> 增加成功和失败的统一回调

        @Override
        public void onSuccess(Integer result) {
            logger.info("[onSuccess][result: {}]", result);
        }

        @Override
        public void onFailure(Throwable ex) {
            logger.info("[onFailure][发生异常]", ex);
        }

    });
    // <3> 阻塞等待结果
    execute01Result.get();

    logger.info("[task04][结束执行,消耗时长 {} 毫秒]", System.currentTimeMillis() - now);
}

Executor

默认 Spring 使用 SimpleAsyncTaskExecutor 执行异步方法

覆盖方法级别

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@Configuration
@EnableAsync
public class SpringAsyncConfig {
    
    @Bean(name = "threadPoolTaskExecutor")
    public Executor threadPoolTaskExecutor() {
        return new ThreadPoolTaskExecutor();
    }
}

@Async("threadPoolTaskExecutor")
public void asyncMethodWithConfiguredExecutor() {
    System.out.println("Execute method with configured executor - "
      + Thread.currentThread().getName());
}

覆盖应用级别

1
2
3
4
5
6
7
8
9
@Configuration
@EnableAsync
public class SpringAsyncConfig implements AsyncConfigurer {
    
    @Override
    public Executor getAsyncExecutor() {
        return new ThreadPoolTaskExecutor();
    }  
}

异步异常处理器

通过实现 AsyncUncaughtExceptionHandler 接口,达到对异步调用的异常的统一处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class GlobalAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        logger.error("[handleUncaughtException][method({}) params({}) 发生异常]",
                method, params, ex);
    }
}

@Configuration
@EnableAsync // 开启 @Async 的支持
public class AsyncConfig implements AsyncConfigurer {
    // TaskExecutionAutoConfiguration
    @Override
    public Executor getAsyncExecutor() {
        return null;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new GlobalAsyncExceptionHandler();
    }
}

注意,AsyncUncaughtExceptionHandler 只能拦截返回类型非 Future 的异步调用方法。通过看 AsyncExecutionAspectSupport#handleError(Throwable ex, Method method, Object… params) 的源码,可以很容易得到这个结论。 返回类型为 Future 的异步调用方法,需要通过 异步回调 来处理。

自定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
spring:
  task:
    # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。
    execution-one:
      thread-name-prefix: task-one- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
      pool: # 线程池相关
        core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
        max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
        keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
        queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。
        allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
      shutdown:
        await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
        await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置
    # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。
    execution-two:
      thread-name-prefix: task-two- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
      pool: # 线程池相关
        core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
        max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
        keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
        queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。
        allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
      shutdown:
        await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
        await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// AsyncConfig.java

@Configuration
@EnableAsync // 开启 @Async 的支持
public class AsyncConfig {

    public static final String EXECUTOR_ONE_BEAN_NAME = "executor-one";
    public static final String EXECUTOR_TWO_BEAN_NAME = "executor-two";

    @Configuration
    public static class ExecutorOneConfiguration {

        @Bean(name = EXECUTOR_ONE_BEAN_NAME + "-properties")
        @Primary
        @ConfigurationProperties(prefix = "spring.task.execution-one") // 读取 spring.task.execution-one 配置到 TaskExecutionProperties 对象
        public TaskExecutionProperties taskExecutionProperties() {
            return new TaskExecutionProperties();
        }

        @Bean(name = EXECUTOR_ONE_BEAN_NAME)
        public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
            // 创建 TaskExecutorBuilder 对象
            TaskExecutorBuilder builder = createTskExecutorBuilder(this.taskExecutionProperties());
            // 创建 ThreadPoolTaskExecutor 对象
            return builder.build();
        }

    }

    @Configuration
    public static class ExecutorTwoConfiguration {

        @Bean(name = EXECUTOR_TWO_BEAN_NAME + "-properties")
        @ConfigurationProperties(prefix = "spring.task.execution-two") // 读取 spring.task.execution-two 配置到 TaskExecutionProperties 对象
        public TaskExecutionProperties taskExecutionProperties() {
            return new TaskExecutionProperties();
        }

        @Bean(name = EXECUTOR_TWO_BEAN_NAME)
        public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
            // 创建 TaskExecutorBuilder 对象
            TaskExecutorBuilder builder = createTskExecutorBuilder(this.taskExecutionProperties());
            // 创建 ThreadPoolTaskExecutor 对象
            return builder.build();
        }

    }

    private static TaskExecutorBuilder createTskExecutorBuilder(TaskExecutionProperties properties) {
        // Pool 属性
        TaskExecutionProperties.Pool pool = properties.getPool();
        TaskExecutorBuilder builder = new TaskExecutorBuilder();
        builder = builder.queueCapacity(pool.getQueueCapacity());
        builder = builder.corePoolSize(pool.getCoreSize());
        builder = builder.maxPoolSize(pool.getMaxSize());
        builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
        builder = builder.keepAlive(pool.getKeepAlive());
        // Shutdown 属性
        TaskExecutionProperties.Shutdown shutdown = properties.getShutdown();
        builder = builder.awaitTermination(shutdown.isAwaitTermination());
        builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
        // 其它基本属性
        builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
//        builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
//        builder = builder.taskDecorator(taskDecorator.getIfUnique());
        return builder;
    }

}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Service
public class DemoService {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Async(AsyncConfig.EXECUTOR_ONE_BEAN_NAME)
    public Integer execute01() {
        logger.info("[execute01]");
        return 1;
    }

    @Async(AsyncConfig.EXECUTOR_TWO_BEAN_NAME)
    public Integer execute02() {
        logger.info("[execute02]");
        return 2;
    }
}

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class DemoServiceTest {

    @Autowired
    private DemoService demoService;

    @Test
    public void testExecute() throws InterruptedException {
        demoService.execute01();
        demoService.execute02();

        // sleep 1 秒,保证异步调用的执行
        Thread.sleep(1000);
    }

}

注意

使用 Spring Task 的异步任务,一定要注意两个点:

  • JVM 应用的正常优雅关闭,保证异步任务都被执行完成。
  • 编写异步异常处理器 GlobalAsyncExceptionHandler ,记录异常日志,进行监控告警。

https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.task-execution-and-scheduling