目录

CompletableFuture

CompletableFuturejava.util.concurrent 库在java 8中新增的主要工具,同传统的 Future 相比,其支持流式计算、函数式编程、完成通知、自定义异常处理等很多新的特性。由于函数式编程在java中越来越多的被使用到,熟练掌握 CompletableFuture ,对于更好的使用java 8后的主要新特性很重要。

简单起见,本文主要涉及 Java 8 的 CompletableFuture

什么是 CompletableFuture ?

1
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {}

在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

在Java中 CompletableFuture 用于异步编程,异步编程是编写非阻塞的代码,运行的任务在一个单独的线程,与主线程隔离,并且会通知主线程它的进度,成功或者失败。

在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成。主线程可以并行的执行其他任务。

使用这种并行方式,可以极大的提高程序的性能。

CompletableFuture字面翻译过来,就是“可完成的Future”。同传统的Future相比较,CompletableFuture能够主动设置计算的结果值(主动终结计算过程,即completable),从而在某些场景下主动结束阻塞等待。而Future由于不能主动设置计算结果值,一旦调用get()进行阻塞等待,要么当计算结果产生,要么超时,才会返回。

下面的示例,比较简单的说明了,CompletableFuture是如何被主动完成的。由于调用了complete方法,所以最终的打印结果是“manual test”,而不是"test"。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
        try{
            Thread.sleep(1000L);
            return "test";
        } catch (Exception e){
            return "failed test";
        }
    });
    future.complete("manual test");
    System.out.println(future.join());

Utilities

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class FutureUtil {

  private FutureUtil() {}

  public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
    CompletableFuture<Void> allDoneFuture =
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    return allDoneFuture.thenApply(
        v -> futures.stream().map(CompletableFuture::join).filter(Objects::nonNull).toList());
  }

  public static <T> CompletableFuture<Stream<T>> sequence(Stream<CompletableFuture<T>> futures) {
    return CompletableFuture.completedFuture(
        futures.map(CompletableFuture::join).filter(Objects::nonNull));
  }

  public static <T> CompletableFuture<T> toCompletable(Future<T> future, Executor executor) {
    // Future 转 CompletableFuture
    return CompletableFuture.supplyAsync(
        () -> {
          try {
            return future.get();
          } catch (InterruptedException | ExecutionException e) {
            Thread.currentThread().interrupt();
            throw new CompletionException(e);
          }
        },
        executor);
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
CompletableFuture<String> future1 =
        CompletableFuture.supplyAsync(
            () -> {
              try {
                TimeUnit.SECONDS.sleep(1);
              } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
              }
              return "Hello";
            });
CompletableFuture<String> future2 =
        CompletableFuture.supplyAsync(
            () -> {
              try {
                TimeUnit.SECONDS.sleep(2);
              } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
              }
              return "Beautiful";
            });
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "World");

List<String> join = FutureUtil.sequence(List.of(future1, future2, future3)).join();
System.out.println(join);

Stream<String> join = FutureUtil.sequence(Stream.of(future1, future2, future3)).join();
System.out.println(join.collect(Collectors.joining(" ")));
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public class ThreadPoolUtil {
  private ThreadPoolUtil() {}

  private static ExecutorService getExecutorService() {
    ThreadFactory namedThreadFactory =
        new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
    // Common Thread Pool
    return new ThreadPoolExecutor(
        5,
        200,
        0L,
        TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<>(1024),
        namedThreadFactory,
        new ThreadPoolExecutor.AbortPolicy());
  }
}

Future vs CompletableFuture

Future 是Java 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。

1
2
3
4
5
6
ExecutorService pool = ThreadPoolUtil.getExecutorService();

Future<String> future = pool.submit(() -> "task result");
System.out.println(future.get());

pool.shutdown(); // gracefully shutdown

上面也可以用 FutreTask 替代

1
2
3
4
5
FutureTask<String> futureTask = new FutureTask<>(() -> "task result");

ExecutorService pool = ThreadPoolUtil.getExecutorService();
pool.execute(futureTask);
pool.shutdown(); // gracefully shutdown

Future 被用于作为一个异步计算结果的引用。提供一个 isDone() 方法来检查计算任务是否完成。当任务完成时,get() 方法用来接收计算任务的结果。

Future API 是非常好的 Java 异步编程进阶,但是它缺乏一些非常重要和有用的特性。

虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

Future 的局限性

  1. 不能手动完成 当你写了一个函数,用于通过一个远程API获取一个电子商务产品最新价格。因为这个 API 太耗时,你把它允许在一个独立的线程中,并且从你的函数中返回一个 Future。现在假设这个API服务宕机了,这时你想通过该产品的最新缓存价格手工完成这个Future 。你会发现无法这样做。
  2. Future 的结果在非阻塞的情况下,不能执行更进一步的操作 Future 不会通知你它已经完成了,它提供了一个阻塞的 get() 方法通知你结果。你无法给 Future 植入一个回调函数,当 Future 结果可用的时候,用该回调函数自动的调用 Future 的结果。
  3. 多个 Future 不能串联在一起组成链式调用 有时候你需要执行一个长时间运行的计算任务,并且当计算任务完成的时候,你需要把它的计算结果发送给另外一个长时间运行的计算任务等等。你会发现你无法使用 Future 创建这样的一个工作流。
  4. 不能组合多个 Future 的结果 假设你有10个不同的Future,你想并行的运行,然后在它们运行未完成后运行一些函数。你会发现你也无法使用 Future 这样做。
  5. 没有异常处理 Future API 没有任务的异常处理结构居然有如此多的限制,幸好我们有 CompletableFuture ,你可以使用 CompletableFuture 达到以上所有目的。

CompletableFuture 实现了 FutureCompletionStage接口,并且提供了许多关于创建,链式调用和组合多个 Future 的便利方法集,而且有广泛的异常处理支持。

Netty,自己扩展了Java的 Future接口,提供了addListener等多个扩展方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
      future.addListener(new ChannelFutureListener()
      {
              @Override
              public void operationComplete(ChannelFuture future) throws Exception
              {
                  if (future.isSuccess()) {
                      // SUCCESS
                  }
                  else {
                      // FAILURE
                  }
              }
      });

Google guava也提供了通用的扩展Future:ListenableFuture、SettableFuture 以及辅助类Futures等,方便异步编程。

join or get 等待完成计算

1
2
3
4
5
public T get()
public T get(long timeout, TimeUnit unit)
// 前面两个 get 是 继承 Future 的
public T getNow(T valueIfAbsent)
public T join()

join 完成时返回结果值,如果异常完成则抛出(未经检查的)异常。 为了更好地符合通用函数形式的使用,如果完成此 CompletableFuture 所涉及的计算引发异常,则此方法将引发(未经检查的)CompletionException,并将底层异常作为其原因。

  • CancellationException – if the computation was cancelled
  • CompletionException – if this future completed exceptionally or a completion computation threw an exception

get 如有必要,等待此 Future 完成,然后返回其结果。

  • CancellationException – if this future was cancelled
  • ExecutionException(继承 Exception) – if this future completed exceptionally
  • InterruptedException(继承 Exception) – if the current thread was interrupted while waiting

创建 CompletableFuture 对象

构造函数

由于新创建的 CompletableFuture 还没有任何计算结果,这时调用join,当前线程会一直阻塞在这里。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
CompletableFuture<String> future = new CompletableFuture();
    // 新创建的CompletableFuture 还没有任何计算结果,这时调用join,当前线程会一直阻塞在这里。
    CompletableFuture.runAsync(
        () -> {
          String result = future.join();
          System.out.println(result);
        });

    // 如果在另外一个线程中,主动设置该 CompletableFuture 的值,则上面线程中的结果就能返回。
    CompletableFuture.runAsync(() -> future.complete("task result"));

    // 等待2个任务都执行到才退出
    TimeUnit.SECONDS.sleep(1);

completed CompletableFuture

1
2
3
// 返回一个已经计算好的 CompletableFuture
public static <U> CompletableFuture<U> completedFuture(U value)
public static <U> CompletableFuture<U> failedFuture(Throwable ex) // java 9 引入

supplyAsync & runAsync

使用 supplyAsync() 运行一个异步任务并且返回结果

通过该 supplyAsync 函数创建的 CompletableFuture 实例会异步执行当前传入的计算任务。在调用端,则可以通过get或join获取最终计算结果。

supplyAsync有两种签名:

1
2
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

第一种只需传入一个Supplier实例(一般使用lamda表达式),此时框架会默认使用 ForkJoinPool.commonPool() 的线程池来执行被提交的任务。

第二种可以指定自定义的线程池,然后将任务提交给该线程池执行。

下面为使用supplyAsync创建CompletableFuture的示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// Run a task specified by Using Lambda Expression
    CompletableFuture<String> future =
        CompletableFuture.supplyAsync(
            () -> {
              try {
                TimeUnit.SECONDS.sleep(1);
              } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
              }
              return "Result of the asynchronous computation";
            });

    // Block and wait for the future to complete
    System.out.println("get result: " + future.join());

在示例中,异步任务中会打印出“compute test”,并返回"test"作为最终计算结果。所以,最终的打印信息为“get result: test”。

使用 runAsync() 运行异步计算

与supplyAsync()不同的是,runAsync()传入的任务要求是Runnable类型的,所以没有返回值。因此,runAsync适合创建不需要返回值的计算任务。同supplyAsync()类似,runAsync()也有两种签名:

1
2
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// Run a task specified by a Runnable Object asynchronously.
    CompletableFuture<Void> future =
        CompletableFuture.runAsync(
            () -> {
              // Simulate a long-running Job
              try {
                TimeUnit.SECONDS.sleep(1L);
                System.out.println("compute test");
              } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
              }
              System.out.println("I'll run in a separate thread than the main thread.");
            });

    // Block and wait for the future to complete
    System.out.println("get result: " + future.join());

CompletionStage

是异步计算的一个阶段,它也可能在另一个 CompletionStage 完成时执行一个操作或计算一个值。 一个阶段在其计算终止时完成,但这可能反过来触发其他相关阶段。 此接口中定义的功能仅采用几种基本形式,可扩展为更大的方法集以捕获一系列使用风格: 阶段执行的计算可以表示为 Function、Consumer 或 Runnable(分别使用名称包括 apply、accept 或 run 的方法),具体取决于它是否需要参数和/或产生结果。 例如:

1
2
3
 stage.thenApply(x -> square(x))
      .thenAccept(x -> System.out.print(x))
      .thenRun(() -> System.out.println());

方法表单 handle 是创建延续阶段的最通用方法,无条件地执行计算,该计算同时给出触发 CompletionStage 的结果和异常(如果有),并计算任意结果。 方法 whenComplete 类似,但保留触发阶段的结果而不是计算新的结果。 因为一个阶段的正常结果可能为空,所以这两种方法都应该具有这样的计算结构:

1
2
3
4
5
6
7
(result, exception) -> {
   if (exception == null) {
     // triggering stage completed normally
   } else {
     // triggering stage completed exceptionally
   }
 }

CompletionStage 中常用的流式连接函数包括:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
thenApply
thenApplyAsync

thenAccept
thenAcceptAsync

thenRun
thenRunAsync

thenCombine
thenCombineAsync

thenCompose
thenComposeAsync

whenComplete
whenCompleteAsync

handle
handleAsync

其中,带 Async 后缀的函数表示需要连接的后置任务会被单独提交到线程池中,从而相对前置任务来说是异步运行的。除此之外,两者没有其他区别。

CompletionStage 提供的所有回调方法都有两个变体:

1
2
3
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

这些异步回调变体通过在独立的线程中执行回调任务帮助你进一步执行并行计算。

如果你使用thenApplyAsync()回调,将从ForkJoinPool.commonPool()获取不同的线程执行。

1
2
3
4
5
6
CompletableFuture.supplyAsync(() -> {
    return "Some Result"
}).thenApplyAsync(result -> {
    // Executed in a different thread from ForkJoinPool.commonPool()
    return "Processed Result"
})

手工创建线程池

1
2
3
4
5
6
ExecutorService pool = ThreadPoolUtil.getExecutorService();
    CompletableFuture<String> completableFuture =
        CompletableFuture.supplyAsync(() -> "Some Result")
            .thenApplyAsync(result -> result + " Processed Result", pool);
System.out.println(completableFuture.join());
pool.shutdown();

为了快速理解,在接下来主要介绍不带Async的版本。 可以根据方法的参数的类型来加速你的记忆。 Runnable 类型的参数会忽略计算的结果,Consumer 是纯消费计算结果,BiConsumer 会组合另外一个CompletionStage 纯消费,Function 会对计算结果做转换,BiFunction 会组合另外一个 CompletionStage 的计算结果做转换。

thenApply / thenAccept / thenRun

这里将thenApply / thenAccept / thenRun放在一起讲,因为这几个连接函数之间的唯一区别是提交的任务类型不一样。thenApply提交的任务类型需遵从Function签名,也就是有入参和返回值,其中入参为前置任务的结果。thenAccept提交的任务类型需遵从Consumer签名,也就是有入参但是没有返回值,其中入参为前置任务的结果。thenRun提交的任务类型需遵从Runnable签名,即没有入参也没有返回值。

thenApply() 可以使用 thenApply() 处理和改变 CompletableFuture 的结果。持有一个Function<R,T>作为参数。Function<R,T>是一个简单的函数式接口,接受一个T类型的参数,产出一个R类型的结果。

1
2
3
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
1
2
3
4
5
6
7
8
9
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
        System.out.println("compute 1");
        return 1;
    });
    CompletableFuture<Integer> future2 = future1.thenApply((p)->{
        System.out.println("compute 2");
        return p+10;
    });
    System.out.println("result: " + future2.join());

在上面的示例中,future1通过调用thenApply将后置任务连接起来,并形成future2。该示例的最终打印结果为11,可见程序在运行中,future1的结果计算出来后,会传递给通过thenApply连接的任务,从而产生future2的最终结果为1+10=11。当然,在实际使用中,我们理论上可以无限连接后续计算任务,从而实现链条更长的流式计算。

需要注意的是,通过thenApply / thenAccept / thenRun连接的任务,当且仅当前置任务计算完成时,才会开始后置任务的计算。因此,这组函数主要用于连接前后有依赖的任务链。

你也可以通过附加一系列的thenApply()在回调方法 在 CompletableFuture 写一个连续的转换。这样的话,结果中的一个 thenApply方法就会传递给该系列的另外一个 thenApply方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
CompletableFuture<String> welcomeText =  CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Rajeev";
}).thenApply(name -> {
    return "Hello " + name;
}).thenApply(greeting -> {
    return greeting + ", Welcome to the CalliCoder Blog";
});

System.out.println(welcomeText.get());
// Prints - Hello Rajeev, Welcome to the CalliCoder Blog

thenAccept() 和 thenRun() 如果你不想从你的回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片段,你可以使用thenAccept()thenRun()方法,这些方法经常在调用链的最末端的最后一个回调函数中使用。 CompletableFuture.thenAccept() 持有一个Consumer<T>,返回一个 CompletableFuture<Void>。它可以访问 CompletableFuture 的结果:

1
2
3
4
5
6
7
public CompletableFuture<Void> 	thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> 	thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> 	thenAcceptAsync(Consumer<? super T> action, Executor executor)
    
public CompletableFuture<Void> 	thenRun(Runnable action)
public CompletableFuture<Void> 	thenRunAsync(Runnable action)
public CompletableFuture<Void> 	thenRunAsync(Runnable action, Executor executor)
1
2
3
4
5
6
// thenAccept() example
 CompletableFuture.supplyAsync(() -> {
    return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
    System.out.println("Got product detail from remote service " + product.getName())
});

虽然thenAccept()可以访问 CompletableFuture 的结果,但thenRun()不能访Future的结果,它持有一个Runnable返回 CompletableFuture

1
2
3
4
5
6
// thenRun() example
CompletableFuture.supplyAsync(() -> {
    // Run some computation  
}).thenRun(() -> {
    // Computation Finished.
});

thenCombine

同前面一组连接函数相比,thenCombine最大的不同是连接任务可以是一个独立的CompletableFuture(或者是任意实现了CompletionStage的类型),从而允许前后连接的两个任务可以并行执行(后置任务不需要等待前置任务执行完成),最后当两个任务均完成时,再将其结果同时传递给下游处理任务,从而得到最终结果。

1
2
3
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
System.out.println("Retrieving weight.");
    CompletableFuture<Double> weightInKgFuture =
        CompletableFuture.supplyAsync(
            () -> {
              try {
                TimeUnit.SECONDS.sleep(1);
              } catch (InterruptedException e) {
                throw new IllegalStateException(e);
              }
              return 65.0;
            });

    System.out.println("Retrieving height.");
    CompletableFuture<Double> heightInCmFuture =
        CompletableFuture.supplyAsync(
            () -> {
              try {
                TimeUnit.SECONDS.sleep(1);
              } catch (InterruptedException e) {
                throw new IllegalStateException(e);
              }
              return 177.8;
            });

    System.out.println("Calculating BMI.");
    CompletableFuture<Double> combinedFuture =
        weightInKgFuture.thenCombine(
            heightInCmFuture,
            (weightInKg, heightInCm) -> {
              Double heightInMeter = heightInCm / 100;
              return weightInKg / (heightInMeter * heightInMeter);
            });

    System.out.println("Your BMI is - " + combinedFuture.get());
  }

上面示例代码中,weightInKgFuture和heightInCmFuture为独立的CompletableFuture任务,他们分别会在各自的线程中并行执行,然后weightInKgFuture通过thenCombine与heightInCmFuture连接,并且以lamda表达式传入处理结果的表达式,该表达式代表的任务会将future1与future2的结果作为入参并计算他们的和。

一般,在连接任务之间互相不依赖的情况下,可以使用thenCombine来连接任务,从而提升任务之间的并发度。

注意,thenAcceptBoth、thenAcceptBothAsync、runAfterBoth、runAfterBothAsync的作用与thenConbime类似,唯一不同的地方是任务类型不同,分别是BiConumser、Runnable。

1
2
3
4
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,  Runnable action)

thenCompose

前面讲了thenCombine主要用于没有前后依赖关系之间的任务进行连接。那么,如果两个任务之间有前后依赖关系,但是连接任务又是独立的CompletableFuture,该怎么实现呢?

1
2
3
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)

先来看一下直接使用thenApply来实现:

1
2
3
4
5
6
7
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
        System.out.println("compute 1");
        return 1;
    });
    CompletableFuture<CompletableFuture<Integer>> future2 =
            future1.thenApply((r)->CompletableFuture.supplyAsync(()->r+10));
    System.out.println(future2.join().join());

可以发现,上面示例代码中,future2的类型变成了CompletableFuture嵌套,而且在获取结果的时候,也需要嵌套调用join或者get。这样,当连接的任务越多时,代码会变得越来越复杂,嵌套获取层级也越来越深。因此,需要一种方式,能将这种嵌套模式展开,使其没有那么多层级。thenCompose的主要目的就是解决这个问题(这里也可以将thenCompose的作用类比于stream接口中的flatMap,因为他们都可以将类型嵌套展开)。

看一下通过thenCompose如何实现上面的代码:

1
2
3
4
5
6
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
        System.out.println("compute 1");
        return 1;
    });
    CompletableFuture<Integer> future2 = future1.thenCompose((r)->CompletableFuture.supplyAsync(()->r+10));
    System.out.println(future2.join());

通过示例代码可以看出来,很明显,在使用了thenCompose后,future2不再存在CompletableFuture类型嵌套了,从而比较简洁的达到了我们的目的。

whenComplete

whenComplete主要用于注入任务完成时的回调通知逻辑。这个解决了传统future在任务完成时,无法主动发起通知的问题。前置任务会将计算结果或者抛出的异常作为入参传递给回调通知函数。

1
2
3
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

以下为示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
        System.out.println("compute 1");
        return 1;
    });
    CompletableFuture future2 = future1.whenComplete((r, e)->{
        if(e != null){
            System.out.println("compute failed!");
        } else {
            System.out.println("received result is " + r);
        }
    });
    System.out.println("result: " + future2.join());

需要注意的是,future2获得的结果是前置任务的结果,whenComplete中的逻辑不会影响计算结果。

handle

handle与whenComplete的作用有些类似,但是handle接收的处理函数有返回值,而且返回值会影响最终获取的计算结果。

1
2
3
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

以下为示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
        System.out.println("compute 1");
        return 1;
    });
    CompletableFuture<Integer> future2 = future1.handle((r, e)->{
        if(e != null){
            System.out.println("compute failed!");
            return r;
        } else {
            System.out.println("received result is " + r);
            return r + 10;
        }
    });
    System.out.println("result: " + future2.join());

在以上示例中,打印出的最终结果为11。说明经过handle计算后产生了新的结果。

Either

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// 当任意一个CompletionStage完成的时候,action 这个消费者就会被执行
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
// 当任意一个CompletionStage完成的时候, Function 消费 完成的结果,返回的结果包装成 CompletableFuture.newIncompleteFuture().completeValue(fn.apply(t))
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor);

allOf 和 anyOf 组合多个 CompletableFuture

thenCompose()thenCombine()把两个 CompletableFuture 组合在一起。现在如果你想组合任意数量的 CompletableFuture ,应该怎么做?我们可以使用以下两个方法组合任意数量的 CompletableFuture

1
2
3
4
// 并发所有 CompletableFuture 任务
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
// 有一个 CompletableFuture 完成返回 注意结果为 Object
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

并发运行多个 Future

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
CompletableFuture<String> future1  
  = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2  
  = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3  
  = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<Void> combinedFuture 
  = CompletableFuture.allOf(future1, future2, future3);

// ...

combinedFuture.get();

assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

注意 CompletableFuture.allOf() 的返回类型是 CompletableFuture<Void>。此方法的局限性在于它不会返回所有Futures的组合结果。相反,我们必须手动从Futures获取结果。幸运的是,*CompletableFuture.join()*方法和 Java 8 Streams API 使它变得简单:

1
2
3
4
5
String combined = Stream.of(future1, future2, future3)
  .map(CompletableFuture::join)
  .collect(Collectors.joining(" "));

assertEquals("Hello Beautiful World", combined);

CompletableFuture.allOf() 的使用场景是当你一个列表的独立future,并且你想在它们都完成后并行的做一些事情。

假设你想下载一个网站的100个不同的页面。你可以串行的做这个操作,但是这非常消耗时间。因此你想写一个函数,传入一个页面链接,返回一个 CompletableFuture ,异步的下载页面内容。

1
2
3
4
5
 CompletableFuture<String> downloadWebPage(String pageLink) {
    return  CompletableFuture.supplyAsync(() -> {
        // Code to download and return the web page's content
    });
} 

现在,当所有的页面已经下载完毕,你想计算包含关键字 CompletableFuture 页面的数量。可以使用 CompletableFuture.allOf() 达成目的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// A list of 100 web page links
List<String> webPageLinks = Arrays.asList(...);

// Download contents of all the web pages asynchronously
List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
        .map(webPageLink -> downloadWebPage(webPageLink))
        .collect(Collectors.toList());

// Create a combined Future using allOf()
CompletableFuture<List<String>> allPageContentsFuture = FutureUtil.sequece(pageContentFutures);

花一些时间理解下以上代码片段。当所有future完成的时候,我们调用了future.join(),因此我们不会在任何地方阻塞。

join()方法和get()方法非常类似,这唯一不同的地方是如果最顶层的 CompletableFuture 完成的时候发生了异常,它会抛出一个未经检查的异常。

现在让我们计算包含关键字页面的数量。

1
2
3
4
5
6
7
8
// Count the number of web pages having the " `CompletableFuture` " keyword.
CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
    return pageContents.stream()
            .filter(pageContent -> pageContent.contains("CompletableFuture"))
            .count();
});

System.out.println("Number of Web Pages having  `CompletableFuture`  keyword - " + countFuture.get());

CompletableFuture.anyOf() 当任何一个 CompletableFuture 完成的时候【相同的结果类型】,返回一个新的 CompletableFuture 。以下示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
 CompletableFuture<String> future1 =  CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 1";
});

 CompletableFuture<String> future2 =  CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 2";
});

 CompletableFuture<String> future3 =  CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 3";
});

CompletableFuture<Object> anyOfFuture =  CompletableFuture.anyOf(future1, future2, future3);

System.out.println(anyOfFuture.get()); // Result of Future 2

在以上示例中,当三个中的任何一个 CompletableFuture 完成, anyOfFuture 就会完成。因为 future2 的休眠时间最少,因此她最先完成,最终的结果将是future2 的结果。

如果你的 CompletableFuture 返回的结果是不同类型的,这时候你将会不知道你最终 CompletableFuture 是什么类型。

CompletableFuture 异常处理

我们探寻了怎样创建 CompletableFuture ,转换它们,并组合多个 CompletableFuture 。现在让我们弄明白当发生错误的时候我们应该怎么做。

首先让我们明白在一个回调链中错误是怎么传递的。思考下以下回调链:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CompletableFuture.supplyAsync(
            () ->
                // Code which might throw an exception
                "Some result")
        .thenApply(result -> "processed result")
        .thenApply(result -> "result after further processing")
        .thenAccept(
            result -> {
              // do something with the final result
            });

如果在原始的supplyAsync()任务中发生一个错误,这时候没有任何thenApply会被调用并且future将以一个异常结束。如果在第一个thenApply发生错误,这时候第二个和第三个将不会被调用,同样的,future将以异常结束。

使用 exceptionally() 回调处理异常 exceptionally()回调给你一个从原始Future中生成的错误恢复的机会。你可以在这里记录这个异常并返回一个默认值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
int age = -1;

    CompletableFuture<String> maturityFuture =
        CompletableFuture.supplyAsync(
                () -> {
                  if (age < 0) {
                    throw new IllegalArgumentException("Age can not be negative");
                  }
                  if (age > 18) {
                    return "Adult";
                  } else {
                    return "Child";
                  }
                })
            .exceptionally(
                ex -> {
                  System.out.println("Oops! We have an exception - " + ex.getMessage());
                  return "Unknown!";
                });

    System.out.println("Maturity : " + maturityFuture.get());

使用 handle() 方法处理异常 API提供了一个更通用的方法 - handle()从异常恢复,无论一个异常是否发生它都会被调用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int age = -1;

    CompletableFuture<String> maturityFuture =
        CompletableFuture.supplyAsync(
                () -> {
                  if (age < 0) {
                    throw new IllegalArgumentException("Age can not be negative");
                  }
                  if (age > 18) {
                    return "Adult";
                  } else {
                    return "Child";
                  }
                })
            .handle(
                (res, ex) -> {
                  if (ex != null) {
                    System.out.println("Oops! We have an exception - " + ex.getMessage());
                    return "Unknown!";
                  }
                  return res;
                });

    System.out.println("Maturity : " + maturityFuture.get());

如果异常发生,res参数将是 null,否则,ex将是 null。

JDK 9 CompletableFuture API

Java 9 enhances the CompletableFuture API with the following changes:

  • New factory methods added
  • Support for delays and timeouts
  • Improved support for subclassing

and new instance APIs:

1
2
3
4
5
6
7
8
public Executor defaultExecutor()
public CompletableFuture<U> newIncompleteFuture()
public CompletableFuture<T> copy()
public CompletionStage<T> minimalCompletionStage()
public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor)
public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier)
public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit)
public CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit)

We also now have a few static utility methods:

1
2
3
4
5
public static Executor delayedExecutor(long delay, TimeUnit unit) 
public static Executor delayedExecutor(long delay, TimeUnit unit)
public static <U> CompletionStage<U> completedStage(U value)
public static <U> CompletionStage<U> failedStage(Throwable ex)
public static <U> CompletableFuture<U> failedFuture(Throwable ex)