Java CompletableFuture 使用方法与最佳实践 – wiki基地


Java CompletableFuture 深度解析:从入门到精通

在现代软件开发中,尤其是在构建高并发、响应式的应用程序时,异步编程模型变得至关重要。Java 通过 java.util.concurrent.Future 接口为异步计算提供了一种基础机制,但 Future 本身存在一些局限性,例如无法方便地组合多个任务、缺乏非阻塞的回调机制以及异常处理相对繁琐。为了克服这些限制,Java 8 引入了 java.util.concurrent.CompletableFuture,它不仅实现了 FutureCompletionStage 接口,还提供了一套强大而灵活的工具,用于构建复杂的异步工作流。

本文将深入探讨 CompletableFuture 的核心概念、使用方法、组合技巧、异常处理机制以及最佳实践,旨在帮助开发者充分利用其潜力,编写出更高效、更健壮、更易于维护的异步 Java 代码。

1. CompletableFuture 概述:为何选择它?

CompletableFuture<T> 代表一个可能尚未完成的异步计算的结果。与传统的 Future 不同,CompletableFuture 具备以下关键优势:

  • 非阻塞性: CompletableFuture 鼓励使用回调函数(注册在 CompletionStage 接口中的方法),而不是依赖阻塞的 get() 方法来等待结果。这使得线程可以被释放去处理其他任务,从而提高系统吞吐量和响应性。
  • 可组合性 (Composability): 它提供了丰富的 API 来链接、组合和编排多个异步任务。你可以轻松地定义任务之间的依赖关系(顺序执行、并行执行、等待多个或任意一个完成等),构建复杂的异步处理流水线。
  • 函数式编程风格: 大量使用了 Lambda 表达式和函数式接口(如 Function, Consumer, Supplier, BiFunction 等),使得代码更简洁、更具表达力。
  • 显式完成: 你可以手动创建一个 CompletableFuture 并在稍后某个时间点通过 complete()completeExceptionally() 方法显式地完成它,这对于集成基于事件或回调的 API 非常有用。
  • 改进的异常处理: 提供了专门的方法(如 exceptionally, handle, whenComplete)来优雅地处理异步计算过程中可能出现的异常。

2. 创建 CompletableFuture 实例

创建 CompletableFuture 主要有以下几种方式:

  • 使用 runAsync() – 执行无返回值的任务:
    如果你需要异步执行一个不返回任何结果的操作(实现了 Runnable 接口),可以使用 runAsync()

    “`java
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;

    public class CompletableFutureRunAsync {
    public static void main(String[] args) throws InterruptedException {
    System.out.println(“Main thread: ” + Thread.currentThread().getName());

        // 使用默认的 ForkJoinPool.commonPool()
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
            System.out.println("Task 1 running in thread: " + Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("Task 1 finished.");
        });
    
        // 使用自定义的 Executor
        ExecutorService customExecutor = Executors.newFixedThreadPool(2);
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            System.out.println("Task 2 running in thread: " + Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("Task 2 finished.");
        }, customExecutor);
    
        // 等待任务完成 (仅为演示,实际应用中应避免阻塞)
        future1.join();
        future2.join();
    
        System.out.println("Main thread finished.");
        customExecutor.shutdown(); // 关闭自定义线程池
    }
    

    }
    ``runAsync有两个重载版本:一个使用默认的ForkJoinPool.commonPool()作为执行器,另一个允许你指定自定义的Executor`。

  • 使用 supplyAsync() – 执行有返回值的任务:
    如果你的异步任务需要返回一个结果(实现了 Supplier<U> 接口),应该使用 supplyAsync()

    “`java
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;

    public class CompletableFutureSupplyAsync {
    public static void main(String[] args) throws Exception {
    System.out.println(“Main thread: ” + Thread.currentThread().getName());

        // 使用默认的 ForkJoinPool.commonPool()
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task 1 (supply) running in thread: " + Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return "Task 1 Interrupted";
            }
            return "Result from Task 1";
        });
    
        // 使用自定义的 Executor
        ExecutorService customExecutor = Executors.newFixedThreadPool(2);
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task 2 (supply) running in thread: " + Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return -1;
            }
            return 123;
        }, customExecutor);
    
        // 获取结果 (join() 会阻塞)
        String result1 = future1.join();
        Integer result2 = future2.join();
    
        System.out.println("Result 1: " + result1);
        System.out.println("Result 2: " + result2);
    
        System.out.println("Main thread finished.");
        customExecutor.shutdown();
    }
    

    }
    ``
    runAsync类似,supplyAsync` 也有使用默认线程池和自定义线程池的两个版本。

  • 使用 CompletableFuture.completedFuture() – 创建已完成的 Future:
    有时你需要一个立即返回预定结果的 CompletableFuture,这在测试或某些需要 Future 作为起点的链式调用中很有用。

    java
    CompletableFuture<String> alreadyCompleted = CompletableFuture.completedFuture("Immediate Result");
    System.out.println("Is done? " + alreadyCompleted.isDone()); // true
    System.out.println("Result: " + alreadyCompleted.get()); // "Immediate Result" (不会阻塞)

  • 手动创建和完成:
    对于需要集成非 Future 基础的异步 API 的情况,可以先创建一个空的 CompletableFuture,然后在回调中手动完成它。

    “`java
    CompletableFuture manualFuture = new CompletableFuture<>();

    // 模拟一个异步操作完成后的回调
    new Thread(() -> {
    try {
    TimeUnit.SECONDS.sleep(2);
    // 模拟成功
    manualFuture.complete(“Result from manual operation”);
    // 或者模拟失败
    // manualFuture.completeExceptionally(new RuntimeException(“Operation failed”));
    } catch (InterruptedException e) {
    manualFuture.completeExceptionally(e);
    }
    }).start();

    // 后续可以链式调用或获取结果
    manualFuture.thenAccept(result -> System.out.println(“Manual future completed with: ” + result));

    // 阻塞等待 (仅为演示)
    System.out.println(“Waiting for manual future…”);
    System.out.println(manualFuture.join());
    “`

3. 处理 CompletableFuture 的结果 (回调函数)

CompletableFuture 的强大之处在于其丰富的回调方法,允许你在 Future 完成时(成功或失败)执行相应的操作,而无需阻塞当前线程。主要的回调方法分为三类:thenApply, thenAccept, 和 thenRun

  • thenApply(Function<? super T,? extends U> fn):
    CompletableFuture 成功完成时,将其结果作为输入传递给指定的 Function,并将该 Function 的返回值包装成一个新的 CompletableFuture<U> 返回。这用于转换结果。

    “`java
    CompletableFuture originalFuture = CompletableFuture.supplyAsync(() -> “Hello”);

    CompletableFuture transformedFuture = originalFuture.thenApply(result -> {
    System.out.println(“Applying transformation in thread: ” + Thread.currentThread().getName());
    return result.length(); // 转换 “Hello” 为 5
    });

    System.out.println(“Transformed result: ” + transformedFuture.join()); // 5
    “`

  • thenAccept(Consumer<? super T> action):
    CompletableFuture 成功完成时,将其结果传递给指定的 Consumer 进行消费(执行某个操作,无返回值)。返回 CompletableFuture<Void>

    “`java
    CompletableFuture greetingFuture = CompletableFuture.supplyAsync(() -> “World”);

    CompletableFuture acceptedFuture = greetingFuture.thenAccept(result -> {
    System.out.println(“Accepting result in thread: ” + Thread.currentThread().getName());
    System.out.println(“Greeting: ” + result); // 消费结果 “World”
    });

    acceptedFuture.join(); // 等待消费操作完成
    “`

  • thenRun(Runnable action):
    CompletableFuture 成功完成时,执行指定的 Runnable 任务,该任务不接收结果,也不返回任何值。返回 CompletableFuture<Void>

    “`java
    CompletableFuture taskFuture = CompletableFuture.supplyAsync(() -> {
    // some operation
    return “Done”;
    });

    CompletableFuture runFuture = taskFuture.thenRun(() -> {
    System.out.println(“Running action after completion in thread: ” + Thread.currentThread().getName());
    System.out.println(“Previous task completed successfully.”);
    });

    runFuture.join(); // 等待后续操作完成
    “`

关于 *Async 变体:

上述每个回调方法 (thenApply, thenAccept, thenRun) 都有一个对应的 *Async 版本(例如 thenApplyAsync)。它们的区别在于回调任务执行的线程:

  • 不带 Async 后缀的方法: 回调可能在完成前一个阶段任务的线程中执行,也可能在调用回调注册方法(如 thenApply)的线程中执行。具体取决于前一个阶段是否已经完成。
  • Async 后缀的方法 (无 Executor 参数): 回调任务会被提交到默认的 ForkJoinPool.commonPool() 中异步执行。
  • Async 后缀的方法 (带 Executor 参数): 回调任务会被提交到你指定的 Executor 中异步执行。

何时使用 Async 版本?

  • 当回调任务是 CPU 密集型或阻塞型操作时,应使用 Async 版本,并最好指定一个独立的 Executor,以避免阻塞 ForkJoinPool.commonPool() 或执行前序任务的线程。
  • 当回调任务非常轻量且快速时,可以直接使用非 Async 版本,以减少线程切换的开销。

4. 组合 CompletableFuture

组合是 CompletableFuture 最强大的特性之一,允许你构建复杂的异步工作流。

  • thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) – 链式依赖:
    当你需要在一个 CompletableFuture 完成后,使用其结果启动另一个异步操作(返回 CompletionStage,通常是另一个 CompletableFuture)时,使用 thenCompose。它类似于 flatMap 操作,避免了 CompletableFuture<CompletableFuture<U>> 这样的嵌套结构。

    “`java
    CompletableFuture getUserFuture = CompletableFuture.supplyAsync(() -> “UserId-123”);

    CompletableFuture getOrderFuture = getUserFuture.thenCompose(userId ->
    // 根据 userId 异步获取订单信息
    CompletableFuture.supplyAsync(() -> {
    System.out.println(“Fetching order for user: ” + userId);
    // 模拟网络调用
    try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) {}
    return “Order details for ” + userId;
    })
    );

    System.out.println(getOrderFuture.join()); // “Order details for UserId-123”
    ``
    注意
    thenCompose的参数函数返回的是一个CompletionStage`。

  • thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) – 合并两个独立结果:
    当你有两个独立的 CompletableFuture,并且想在它们都完成后,将它们的结果合并(使用 BiFunction)产生一个新的结果时,使用 thenCombine

    “`java
    CompletableFuture weightFuture = CompletableFuture.supplyAsync(() -> 70.5); // kg
    CompletableFuture heightFuture = CompletableFuture.supplyAsync(() -> 1.75); // meters

    CompletableFuture bmiFuture = weightFuture.thenCombine(heightFuture, (weight, height) -> {
    System.out.println(“Calculating BMI in thread: ” + Thread.currentThread().getName());
    return weight / (height * height);
    });

    System.out.println(“BMI: ” + bmiFuture.join());
    “`

  • thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) – 消费两个独立结果:
    类似于 thenCombine,但在两个 CompletableFuture 都完成后,使用它们的结执行一个 BiConsumer 操作(无返回值)。

    “`java
    CompletableFuture userFuture = CompletableFuture.supplyAsync(() -> “Alice”);
    CompletableFuture productFuture = CompletableFuture.supplyAsync(() -> “Laptop”);

    CompletableFuture combinedAction = userFuture.thenAcceptBoth(productFuture, (user, product) -> {
    System.out.println(user + ” purchased a ” + product);
    });

    combinedAction.join(); // “Alice purchased a Laptop”
    “`

  • runAfterBoth(CompletionStage<?> other, Runnable action) – 两个都完成后执行动作:
    在两个 CompletableFuture 都完成后,执行一个 Runnable 任务,不关心它们的结果。

  • allOf(CompletableFuture<?>... cfs) – 等待所有 Future 完成:
    接收一个可变参数的 CompletableFuture 数组,返回一个新的 CompletableFuture<Void>,它在所有输入的 Future 都完成后才完成。注意: allOf 返回的 CompletableFuture<Void> 本身不携带任何结果。你需要再次链式调用或手动遍历原始 Future 列表来获取它们各自的结果。

    “`java
    CompletableFuture f1 = CompletableFuture.supplyAsync(() -> “Result 1”);
    CompletableFuture f2 = CompletableFuture.supplyAsync(() -> 2);
    CompletableFuture f3 = CompletableFuture.supplyAsync(() -> true);

    CompletableFuture allFutures = CompletableFuture.allOf(f1, f2, f3);

    // 等待 allFutures 完成
    allFutures.join(); // 阻塞直到 f1, f2, f3 都完成

    // allOf 完成后,可以安全地获取各个 Future 的结果 (它们保证已完成)
    System.out.println(“f1 result: ” + f1.join()); // “Result 1”
    System.out.println(“f2 result: ” + f2.join()); // 2
    System.out.println(“f3 result: ” + f3.join()); // true

    // 或者使用流处理获取所有结果 (更优雅的方式)
    CompletableFuture.allOf(f1, f2, f3).thenRun(() -> {
    try {
    String r1 = f1.get(); // 非阻塞,因为 allOf 保证了完成
    Integer r2 = f2.get();
    Boolean r3 = f3.get();
    System.out.println(“All results via thenRun: ” + r1 + “, ” + r2 + “, ” + r3);
    } catch (Exception e) { // InterruptedException, ExecutionException
    e.printStackTrace();
    }
    }).join();
    “`

  • anyOf(CompletableFuture<?>... cfs) – 等待任意一个 Future 完成:
    接收一个可变参数的 CompletableFuture 数组,返回一个新的 CompletableFuture<Object>,它在输入的 Future 中任意一个完成后就立即完成,其结果是第一个完成的 Future 的结果。

    “`java
    CompletableFuture slowTask = CompletableFuture.supplyAsync(() -> {
    try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {}
    return “Slow Result”;
    });
    CompletableFuture fastTask = CompletableFuture.supplyAsync(() -> {
    try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) {}
    return “Fast Result”;
    });

    CompletableFuture firstCompleted = CompletableFuture.anyOf(slowTask, fastTask);

    System.out.println(“First completed result: ” + firstCompleted.join()); // “Fast Result”
    “`

    5. 异常处理

    异步编程中的异常处理是一个关键点。CompletableFuture 提供了专门的方法来处理链式调用中可能出现的异常。

    • exceptionally(Function<Throwable, ? extends T> fn):
      注册一个回调函数,当前面的阶段(或原始 Future)异常完成时,这个函数会被调用。它接收 Throwable 作为参数,并返回一个替代结果 T,从而允许从异常中恢复。如果前面的阶段正常完成,则此回调不会执行。

      “`java
      CompletableFuture errorFuture = CompletableFuture.supplyAsync(() -> {
      if (true) { // 模拟错误条件
      throw new RuntimeException(“Something went wrong!”);
      }
      return “Success”;
      }).exceptionally(ex -> {
      System.err.println(“Caught exception: ” + ex.getMessage());
      return “Default Value on Error”; // 返回一个备用值
      });

      System.out.println(“Result (potentially recovered): ” + errorFuture.join()); // “Default Value on Error”
      “`

    • handle(BiFunction<? super T, Throwable, ? extends U> fn):
      这是一个更通用的处理方法。无论前面的阶段是正常完成还是异常完成,注册的 BiFunction 都会被调用。它接收两个参数:正常结果 T(如果异常则为 null)和异常 Throwable(如果正常则为 null)。你可以根据这两个参数来决定返回什么结果 U 或抛出新的异常。

      “`java
      CompletableFuture futureWithHandle = CompletableFuture.supplyAsync(() -> {
      // 模拟可能成功或失败的操作
      if (Math.random() > 0.5) {
      return 100;
      } else {
      throw new IllegalStateException(“Failed randomly”);
      }
      }).handle((result, ex) -> {
      if (ex != null) {
      System.err.println(“Handling exception: ” + ex.getMessage());
      return -1; // 返回错误代码
      } else {
      System.out.println(“Handling success: ” + result);
      return result * 2; // 处理正常结果
      }
      });

      System.out.println(“Final result after handle: ” + futureWithHandle.join()); // 可能是 200 或 -1
      “`

    • whenComplete(BiConsumer<? super T, ? super Throwable> action):
      handle 类似,whenComplete 的回调 BiConsumer 也会在 Future 完成时(无论成功或失败)执行。但它主要用于执行副作用(如日志记录),它不改变 Future 的结果或异常状态。如果 whenComplete 的 action 自身抛出异常,这个异常会覆盖原始 Future 的结果/异常。

      java
      CompletableFuture<String> futureWithLogging = CompletableFuture.supplyAsync(() -> "Data")
      .whenComplete((result, ex) -> {
      if (ex != null) {
      System.out.println("Operation failed: " + ex.getMessage());
      } else {
      System.out.println("Operation succeeded with result: " + result);
      }
      });
      // futureWithLogging 的结果仍然是 "Data" 或者原始的异常

    6. 控制执行:Executor 的选择与使用

    CompletableFuture*Async 方法默认使用 ForkJoinPool.commonPool()。这是一个共享的、大小通常等于 CPU 核心数的线程池。虽然方便,但在某些场景下可能不是最佳选择:

    • I/O 密集型任务: 如果你的异步任务主要是等待网络或磁盘 I/O,它们大部分时间处于阻塞状态。在 commonPool 中运行大量此类任务可能会耗尽所有线程,导致 CPU 密集型任务或其他需要 commonPool 的任务(如并行流 parallelStream())饿死。
    • 需要隔离的任务: 你可能希望某些关键任务或不同类型的任务使用独立的线程池,以避免相互干扰。
    • 需要精细控制线程池参数: 你可能需要自定义线程池的大小、队列类型、拒绝策略等。

    最佳实践:

    • 对于 I/O 密集型任务,创建并使用一个独立的、大小可配置的 ThreadPoolExecutor 线程池的大小通常可以设置得比 CPU 核心数大很多(例如,根据预期的并发 I/O 请求数)。

      “`java
      // 创建一个适合 I/O 任务的线程池
      int ioPoolSize = Math.max(Runtime.getRuntime().availableProcessors() * 2, 50); // 示例大小,需调优
      ExecutorService ioExecutor = Executors.newFixedThreadPool(ioPoolSize,
      runnable -> {
      Thread t = new Thread(runnable);
      t.setName(“io-worker-” + t.getId());
      t.setDaemon(true); // 设为守护线程,以便 JVM 退出
      return t;
      });

      CompletableFuture ioTask = CompletableFuture.supplyAsync(() -> {
      // 模拟网络调用
      System.out.println(“Performing I/O task in thread: ” + Thread.currentThread().getName());
      try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {}
      return “Data from network”;
      }, ioExecutor); // 指定使用 ioExecutor

      ioTask.thenAcceptAsync(data -> {
      // 后续处理也可以指定 Executor
      System.out.println(“Processing I/O result in thread: ” + Thread.currentThread().getName());
      // …
      }, ioExecutor).join();

      // 记得在应用程序关闭时优雅地关闭自定义 Executor
      // ioExecutor.shutdown();
      “`

    • 对于 CPU 密集型任务,可以使用 commonPool 或创建一个大小接近 CPU 核心数的 ThreadPoolExecutor

    • 谨慎使用 commonPool: 了解它是共享资源,避免在其中执行长时间阻塞的操作。

    7. 超时处理与取消

    • 超时 (Java 9+):
      Java 9 引入了更便捷的超时处理方法:

      • orTimeout(long timeout, TimeUnit unit): 如果 Future 在指定时间内未完成,则使其异常完成,抛出 TimeoutException
      • completeOnTimeout(T value, long timeout, TimeUnit unit): 如果 Future 在指定时间内未完成,则使用提供的默认值 value 完成它。

      “`java
      // Java 9+
      CompletableFuture taskWithTimeout = CompletableFuture.supplyAsync(() -> {
      try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) {}
      return “Late Result”;
      })
      .orTimeout(2, TimeUnit.SECONDS); // 设置 2 秒超时

      try {
      System.out.println(taskWithTimeout.join());
      } catch (Exception e) { // CompletionException wrapping TimeoutException
      System.err.println(“Task timed out: ” + e.getCause());
      }

      CompletableFuture taskWithDefault = CompletableFuture.supplyAsync(() -> {
      try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) {}
      return “Late Result”;
      })
      .completeOnTimeout(“Default on Timeout”, 2, TimeUnit.SECONDS); // 超时则返回默认值

      System.out.println(“Result (with default on timeout): ” + taskWithDefault.join()); // “Default on Timeout”
      “`

    • 手动实现超时 (Java 8):
      在 Java 8 中,可以通过结合 anyOf 和一个延迟执行的 Future 来模拟超时:

      “`java
      public static CompletableFuture failAfter(Duration duration) {
      final CompletableFuture promise = new CompletableFuture<>();
      // 使用 ScheduledExecutorService 实现延迟
      ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
      scheduler.schedule(() -> {
      final TimeoutException ex = new TimeoutException(“Timeout after ” + duration);
      return promise.completeExceptionally(ex);
      }, duration.toMillis(), TimeUnit.MILLISECONDS);
      scheduler.shutdown(); // 确保调度器关闭
      return promise;
      }

      CompletableFuture originalTask = CompletableFuture.supplyAsync(() -> {
      try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) {}
      return “Data”;
      });

      CompletableFuture taskOrTimeout = CompletableFuture.anyOf(originalTask, failAfter(Duration.ofSeconds(2)));

      try {
      Object result = taskOrTimeout.join();
      if (result instanceof String) {
      System.out.println(“Task completed: ” + result);
      }
      // 注意:如果超时发生,join() 会抛出 CompletionException(TimeoutException)
      } catch (CompletionException ce) {
      if (ce.getCause() instanceof TimeoutException) {
      System.err.println(“Task timed out!”);
      } else {
      // Handle other potential exceptions from originalTask
      ce.printStackTrace();
      }
      }
      “`

    • 取消:
      CompletableFuture 实现了 Futurecancel(boolean mayInterruptIfRunning) 方法。

      • 调用 cancel(true) 会尝试取消 Future。如果 Future 尚未开始,它将永远不会运行。如果正在运行,mayInterruptIfRunning = true 会尝试中断执行线程(如果任务代码响应中断)。
      • 取消成功后,Future 会以 CancellationException 异常完成。后续的 get()join() 调用会抛出此异常。
      • 局限性: 取消操作可能不会成功,特别是如果任务已经完成、已经取消,或者任务代码不响应中断。取消只是一个请求,不保证强制停止。

      “`java
      CompletableFuture cancellableTask = CompletableFuture.runAsync(() -> {
      try {
      for (int i = 0; i < 5; i++) {
      if (Thread.currentThread().isInterrupted()) {
      System.out.println(“Task interrupted, exiting.”);
      throw new InterruptedException(); // 响应中断
      }
      System.out.println(“Working… ” + i);
      TimeUnit.SECONDS.sleep(1);
      }
      } catch (InterruptedException e) {
      System.out.println(“Task caught interruption.”);
      Thread.currentThread().interrupt(); // Restore interrupt status
      }
      });

      // 在 2 秒后尝试取消任务
      Thread.sleep(2500);
      boolean cancelled = cancellableTask.cancel(true); // 尝试中断
      System.out.println(“Cancellation attempted. Success? ” + cancelled);

      try {
      cancellableTask.join(); // 会抛出 CancellationException
      } catch (CancellationException e) {
      System.out.println(“Task was cancelled.”);
      } catch (CompletionException e) {
      System.out.println(“Task completed with exception: ” + e.getCause());
      }
      “`

    • 8. 最佳实践总结

      1. 避免阻塞: 尽量使用回调方法 (thenApply, thenAccept, thenCompose 等)和组合方法,而不是调用 get()join() 阻塞等待结果,尤其是在需要高吞吐量的服务器端应用中。join() 可以在主线程末尾或单元测试中适度使用。
      2. 明智地选择 Executor: 不要盲目依赖 ForkJoinPool.commonPool()。为 I/O 密集型任务创建和使用独立的、大小合适的线程池。CPU 密集型任务可以使用 commonPool 或核心数大小的线程池。在 *Async 方法中明确指定 Executor。
      3. 使用 *Async 处理耗时回调: 如果回调任务本身是阻塞的或计算密集型的,使用带 Async 后缀的方法,并考虑指定合适的 Executor,以防止阻塞调用链中其他任务的执行线程。
      4. 优先使用 thenCompose 处理依赖: 当一个异步任务依赖于另一个异步任务的结果时,使用 thenCompose 而不是嵌套的 thenApply,以保持代码扁平化和可读性。
      5. 显式处理异常: 使用 exceptionally 提供恢复机制或默认值,使用 handle 处理成功和失败两种情况,使用 whenComplete 进行日志记录或资源清理等副作用操作。不要让异常在链中悄无声息地传播。
      6. 保持回调简洁: 回调 Lambda 表达式应尽可能简短且非阻塞。如果回调逻辑复杂或需要阻塞,考虑将其封装到另一个 supplyAsyncrunAsync 调用中。
      7. 资源管理: 确保在异步操作链中使用的资源(如数据库连接、文件句柄)得到妥善管理和释放,可以使用 whenCompletehandle 来执行清理逻辑。
      8. 理解 allOfanyOf: allOf 用于等待所有任务完成(常用于汇聚结果),anyOf 用于获取第一个完成的任务结果(常用于竞争或超时场景)。
      9. 测试: 异步代码的测试可能更复杂。使用 join()get()(带有超时)在测试中断言最终结果,或使用 Awaitility 等库来处理异步断言。

      9. 潜在陷阱

      • 线程池耗尽: 如前所述,不当使用 commonPool 或线程池大小配置不合理可能导致性能问题。
      • 回调地狱 (Callback Hell): 虽然 CompletableFuture 的组合方法旨在避免这个问题,但如果滥用嵌套的非组合回调,仍然可能导致代码难以理解和维护。优先使用 thenCompose, thenCombine 等组合方法。
      • 忘记异常处理: 未在链中处理异常可能导致异常被“吞没”(仅在最终调用 join/get 时抛出),使得调试困难。
      • 阻塞操作混入:CompletableFuture 的回调或任务中执行了未预期的长时间阻塞操作,违背了异步设计的初衷。
      • 过度设计: 对于非常简单的异步场景,CompletableFuture 可能引入不必要的复杂性。权衡其带来的好处和复杂性成本。

      10. 结论

      CompletableFuture 是 Java 并发库中的一颗明珠,它极大地简化了异步编程的复杂性,使得开发者能够以声明式、可组合的方式构建高效、响应迅速的应用程序。通过理解其核心概念、熟练运用其丰富的 API(特别是回调、组合和异常处理方法),并遵循最佳实践(尤其是关于 Executor 的选择和避免阻塞),你可以充分发挥 CompletableFuture 的威力,编写出优雅且高性能的现代 Java 代码。虽然掌握它需要一定的学习曲线,但其带来的在并发处理、资源利用率和代码可维护性方面的提升是显著的,是现代 Java 开发者必备的核心技能之一。


      发表评论

      您的邮箱地址不会被公开。 必填项已用 * 标注

      滚动至顶部