Java并发编程--JUC并发工具类之CompletableFuture

摘要

  • 本文介绍CompletableFuture相关技术

  • 本文基于jdk1.8

CompletableFuture介绍

  • CompletableFuture是Java 8引入的一个强大的工具,用于处理异步编程和并发操作。它提供了一种简洁而灵活的方式来编写并发代码,使得处理异步任务和组合多个任务变得更加直观和易于管理。

  • CompletableFuture类实现了Java的Future接口,并提供了许多附加的方法,使得处理异步操作更加方便。它可以用于执行异步计算、等待多个异步任务完成、处理异常情况以及将多个异步任务组合成一个更大的任务等。

  • CompletableFuture的关键概念和用法:

    • 异步执行:CompletableFuture可以在后台线程中异步地执行任务,而不会阻塞主线程。你可以使用supplyAsync()方法来执行一个Supplier类型的任务,或者使用runAsync()方法来执行一个没有返回值的Runnable任务。
    • 链式操作:你可以使用一系列的方法来对CompletableFuture进行链式操作,每个方法都会在前一个操作完成后触发。这些方法包括thenApply()、thenAccept()和thenRun()等,它们可以对前一个操作的结果进行处理、消费或执行额外的操作。
    • 异常处理:CompletableFuture提供了异常处理的机制,使得你可以在任务完成时处理异常情况。你可以使用exceptionally()方法来处理任务的异常结果,或者使用handle()方法来处理正常和异常的结果。
    • 组合任务:你可以将多个CompletableFuture组合在一起,以创建更复杂的任务流水线。例如,你可以使用thenCompose()方法将两个CompletableFuture串联起来,或者使用allOf()和anyOf()方法来等待多个CompletableFuture的完成。

常用API

方法签名 描述
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) 当所有的CompletableFuture都完成时返回一个新的CompletableFuture,它的结果是一个空值。
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) 当任意一个CompletableFuture完成时返回一个新的CompletableFuture,它的结果是最先完成的CompletableFuture的结果。
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) 当CompletableFuture异常完成时执行一个Function,并返回一个新的CompletableFuture,它的结果是Function的返回值。
CompletableFuture<T> thenApply(Function<? super T, ? extends U> fn) 当CompletableFuture正常完成时执行一个Function,并返回一个新的CompletableFuture,它的结果是Function的返回值。
CompletableFuture<Void> thenAccept(Consumer<? super T> action) 当CompletableFuture正常完成时执行一个Consumer,不返回任何结果的CompletableFuture。
CompletableFuture<Void> thenRun(Runnable action) 当CompletableFuture正常完成时执行一个Runnable,不接受任何参数和返回值的CompletableFuture。
CompletableFuture<T> thenCompose(Function<? super T, ? extends CompletableFuture<U>> fn) 当CompletableFuture正常完成时执行一个Function,返回一个新的CompletableFuture,它的结果是Function返回的CompletableFuture的结果。
CompletableFuture<T> exceptionallyCompose(Function<Throwable, ? extends CompletableFuture<T>> fn) 当CompletableFuture异常完成时执行一个Function,返回一个新的CompletableFuture,它的结果是Function返回的CompletableFuture的结果。
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) 当CompletableFuture完成时执行一个BiConsumer,接受结果值和异常作为参数。
CompletableFuture<T> handle(BiFunction<? super T, Throwable, ? extends U> fn) 当CompletableFuture完成时执行一个BiFunction,并返回一个新的CompletableFuture,它的结果是BiFunction的返回值。
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 创建一个新的CompletableFuture,它会异步地执行给定的Supplier并返回结果。
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) 创建一个新的CompletableFuture,它会异步地在给定的Executor中执行给定的Supplier并返回结果。
CompletableFuture<T> thenApplyAsync(Function<? super T, ? extends U> fn) 当CompletableFuture正常完成时异步地执行一个Function,并返回一个新的CompletableFuture,它的结果是Function的返回值。
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) 当CompletableFuture正常完成时异步地执行一个Consumer,不返回任何结果的CompletableFuture。
CompletableFuture<Void> thenRunAsync(Runnable action) 当CompletableFuture正常完成时异步地执行一个Runnable,不接受任何参数和返回值的CompletableFuture。
CompletableFuture<T> thenComposeAsync(Function<? super T, ? extends CompletableFuture<U>> fn) 当CompletableFuture正常完成时异步地执行一个Function,并返回一个新的CompletableFuture,它的结果是Function返回的CompletableFuture的结果。
CompletableFuture<T> exceptionallyComposeAsync(Function<Throwable, ? extends CompletableFuture<T>> fn) 当CompletableFuture异常完成时异步地执行一个Function,并返回一个新的CompletableFuture,它的结果是Function返回的CompletableFuture的结果。
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) 当CompletableFuture完成时异步地执行一个BiConsumer,接受结果值和异常作为参数。
CompletableFuture<T> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) 当CompletableFuture完成时异步地执行一个BiFunction,并返回一个新的CompletableFuture,它的结果是BiFunction的返回值。
CompletableFuture<T> completeExceptionally(Throwable ex) 手动地将CompletableFuture标记为异常完成,并提供一个异常作为结果。
boolean complete(T value) 手动地将CompletableFuture标记为正常完成,并提供一个值作为结果。
T get() 等待CompletableFuture完成并返回结果值。
T get(long timeout, TimeUnit unit) 在给定的时间内等待CompletableFuture完成并返回结果值。
boolean isDone() 检查CompletableFuture是否已经完成。
boolean isCancelled() 检查CompletableFuture是否已经被取消。
boolean cancel(boolean mayInterruptIfRunning) 取消CompletableFuture的执行。

Demo

  • 创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
//子线程代码逻辑
return "Hello, world!"; //返回值
});

// CompletableFuture 使用的线程池是 ForkJoinPool ,默认全局共用一个ForkJoinPool,建议创建CompletableFuture时,为其指定一个新的线程池
//构建一个forkjoin线程池
ForkJoinPool pool = new ForkJoinPool();

//创建一个异步任务,并将其提交到ForkJoinPool中执行
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello, world!";
}, pool);

try {
// 等待任务完成,并获取结果
String result = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
//关闭ForkJoinPool,释放资源
pool.shutdown();
}


  • 工厂方法:

1
2
3
4
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)

Asynsc 表示异步,而 supplyAsync 与 runAsync 不同在于,supplyAsync 异步返回一个结果,runAsync 是 void。第二个函数第二个参数表示是用我们自己创建的线程池,否则采用默认的ForkJoinPool.commonPool()作为它的线程池

  • 获取返回值: getjoin的区别

    future.get(); 需要处理异常:InterruptedException,ExecutionException
    future.join(); 不需要处理异常

  • allOfanyOf

1
2
3
4
5
6
7
CompletableFuture.allOf(future1,future2,future3).thenRun(()->{
//所有线程全部返回
System.out.println("All done!");
});

//接收返回最快的那个值
CompletableFuture<Object> f = CompletableFuture.anyOf(future1,future2,future3);
  • 处理返回结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//thenAccept :消费,没有返回值
CompletableFuture.supplyAsync(() -> "hello").thenAccept(s -> System.out.println(s+" world"));

//thenApply :变更返回结果(值或类型)
CompletableFuture.supplyAsync(() -> "hello").thenApply(s -> s + " world").join();

//exceptionally :捕获异常并返回结果
CompletableFuture.supplyAsync(() -> "hello").exceptionally(e -> {
System.out.println(e.getMessage());
return "hello world";
}).join();

//thenCompose :第一个future的结果作为第二个future的参数
CompletableFuture.supplyAsync(() -> 10).thenCompose(i -> CompletableFuture.supplyAsync(() -> i+1)).join();

//whenComplete:不修改返回值
//whenComplete()方法接收一个BiConsumer参数,当异步任务完成时,无论是否发生异常,都会执行该BiConsumer。如果任务发生异常,异常信息会被传递给BiConsumer,但whenComplete()方法无法修改异常或处理异常。
CompletableFuture.supplyAsync(() -> "hello").whenComplete((s, t) -> {
System.out.println(s); //s是返回值
System.out.println(t.getMessage()); //t是异常对象
}).join();

//handle :会修改返回值
//handle()方法接收一个BiFunction参数,当异步任务完成时,可以对任务的结果进行处理。如果任务发生异常,异常信息会被传递给BiFunction,并允许你根据异常进行处理并返回一个结果。如果没有异常发生,BiFunction将使用任务的结果进行处理。
CompletableFuture.supplyAsync(() -> "hello").handle((s, t) -> {
System.out.println(s); //s是返回值
System.out.println(t.getMessage()); //t是异常对象return s;
}).join();

CompletableFuture的优势和不足

  • 优势:

    • 1.强大的异步编程支持:CompletableFuture提供了丰富的方法和操作符,使得异步编程变得直观和易于管理。你可以使用链式操作来处理异步任务,进行任务组合和转换,以及处理异常情况。
    • 2.可组合性和灵活性:CompletableFuture可以非常容易地组合多个任务,构建复杂的任务流水线。你可以将多个CompletableFuture串联起来,以处理依赖关系,并发起并行任务。
    • 3.错误处理机制:CompletableFuture提供了异常处理的机制,使得你可以在任务完成时处理异常情况。你可以对任务的正常结果和异常结果进行不同的处理,保证代码的健壮性。
    • 4.非阻塞的并发操作:CompletableFuture允许你以非阻塞的方式执行异步任务,并在任务完成时执行后续操作。这样可以充分利用计算资源,提高系统的并发能力和响应性能。
    • 5.可扩展性:CompletableFuture可以与Java 8引入的Stream API和Lambda表达式结合使用,以实现更简洁、可读性更高的代码。它还可以与其他并发工具和库进行集成,如Executor框架和并发集合类。
  • 不足:

    • 1.学习曲线较陡峭:使用CompletableFuture需要一定的学习和理解成本,特别是对于初学者来说。理解它的概念和方法的正确用法可能需要一些时间和实践。
    • 2.缺乏对非JDK库的支持:CompletableFuture主要是用于Java的标准库,因此在与其他非JDK库和框架集成时可能会有一些限制。某些库可能没有提供与CompletableFuture的无缝集成。
    • 3.异常处理略显繁琐:虽然CompletableFuture提供了异常处理的机制,但在处理复杂的异常场景时,可能需要编写较多的代码来处理各种异常情况和错误处理策略。
    • 4.可能存在性能开销:在某些情况下,使用CompletableFuture可能会引入额外的性能开销。例如,创建和管理多个CompletableFuture实例可能会导致一些额外的开销。
    • 5.复杂度增加:当涉及到更复杂的并发场景和任务组合时,CompletableFuture的代码可能变得复杂和难以理解。需要谨慎设计和组织任务的依赖关系和并发控制逻辑。

虽然CompletableFuture具有一些不足之处,但它仍然是Java中强大且灵活的异步编程工具。