摘要:本文学习了如何使用Future相关接口实现异步调用。
环境
Windows 10 企业版 LTSC 21H2
Java 1.8
1 Callable
之前学习多线程的时候,可以通过继承Thread类和实现Runnable接口创建线程。
由于单继承的原因建议使用Runnable接口,需要重写run()
方法,因为返回值为void类型,所以在执行完任务之后无法返回任何结果。
Callable接口是JDK1.5新增的函数式接口,位于java.util.concurrent
包下:
java1 2 3 4
| @FunctionalInterface public interface Callable<V> { V call() throws Exception; }
|
相比较使用Runnable接口,使用Callable接口能获取返回值,并且还可以处理异常。
如果要使用Callable接口,还需要配合支持异步调用的Future相关类。
2 Future
Future接口是JDK1.5新增的函数式接口,位于java.util.concurrent
包下,用于表示异步计算的结果:
java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
|
3 FutureTask
FutureTask类间接实现了Runnable接口和Future接口,所以既可以作为Runnable被线程执行,又可以作为Future获取线程执行的结果。
多个线程创建时,如果使用了相同的FutureTask类,那么只会有一个线程执行call()
方法。
构造方法:
java1 2
| public FutureTask(Callable<V> callable); public FutureTask(Runnable runnable, V result);
|
通过实现Callable接口创建:
java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class Demo { public static void main(String[] args) { DemoCallable callable = new DemoCallable(); FutureTask<Integer> f1 = new FutureTask<>(callable); FutureTask<Integer> f2 = new FutureTask<>(callable); Thread t1 = new Thread(f1, "线程一"); Thread t2 = new Thread(f2, "线程二"); t1.start(); t2.start(); try { System.out.println(f1.get() + f2.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
class DemoCallable implements Callable<Integer> { @Override public Integer call() throws Exception { for (int i = 0; i < 1000; i++) { System.out.println(Thread.currentThread().getName() + " >>> " + i); } return 1000; } }
|
4 CompletableFuture
4.1 简介
由于Future接口在获取结果时只能通过阻塞或者轮询的方式,阻塞的方式和异步编程的初衷相违背,轮询的方式会耗费无谓的CPU资源并且不能及时得到结果,因此迫切需要新的方式。
在JDK1.8中,提供了更加强大的CompletableFuture类,简化了异步编程的复杂性,提供了函数式编程的能力,还可以通过回调的方式处理结果。
CompletableFuture类实现了Future接口和CompletionStage接口。Future接口代表异步获取结果,CompletionStage接口代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段。一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。
使用CompletableFuture支持使用自定义的线程池,默认使用ForkJoinPool类的commonPool()
方法创建的线程池。
4.2 使用
4.2.1 创建任务
4.2.1.1 runAsync
使用runAsync()
方法创建无返回值的异步任务,默认使用ForkJoinPool线程池里的线程。
示例:
java1 2 3 4 5 6 7 8 9 10
| public static void main(String[] args) { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName() + "-runAsync"); }); try { System.out.println(Thread.currentThread().getName() + "-result-" + future.get()); } catch (Exception e) { e.printStackTrace(); } }
|
结果:
log1 2
| ForkJoinPool.commonPool-worker-9-runAsync main-result-null
|
4.2.1.2 supplyAsync
使用supplyAsync()
方法创建带返回值的异步任务,默认使用ForkJoinPool线程池里的线程。
示例:
java1 2 3 4 5 6 7 8 9 10 11
| public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "-supplyAsync"); return "success"; }); try { System.out.println(Thread.currentThread().getName() + "-result-" + future.get()); } catch (Exception e) { e.printStackTrace(); } }
|
结果:
log1 2
| ForkJoinPool.commonPool-worker-9-supplyAsync main-result-success
|
4.2.1.3 比较
比较:
维度 |
runAsync |
supplyAsync |
是否有返回值 |
无 |
有 |
4.2.2 立即返回
4.2.2.1 complete
使用complete()
方法将入参设置为任务执行结果,将任务标记为成功完成。
示例:
java1 2 3 4 5 6 7 8 9 10 11 12 13
| public static void main(String[] args) { CompletableFuture<String> future = new CompletableFuture<>(); new Thread(() -> { System.out.println(Thread.currentThread().getName() + "-start"); future.complete("success"); System.out.println(Thread.currentThread().getName() + "-end"); }).start(); try { System.out.println(Thread.currentThread().getName() + "-return-" + future.get()); } catch (Exception e) { e.printStackTrace(); } }
|
结果:
log1 2 3
| Thread-0-start Thread-0-end main-return-success
|
4.2.2.2 completeExceptionally
使用completeExceptionally()
方法将入参设置为任务抛出异常,将任务标记为异常完成。
示例:
java1 2 3 4 5 6 7 8 9 10 11 12 13
| public static void main(String[] args) { CompletableFuture<String> future = new CompletableFuture<>(); new Thread(() -> { System.out.println(Thread.currentThread().getName() + "-start"); future.completeExceptionally(new NullPointerException("exception")); System.out.println(Thread.currentThread().getName() + "-end"); }).start(); try { System.out.println(Thread.currentThread().getName() + "-return-" + future.get()); } catch (Exception e) { e.printStackTrace(); } }
|
结果:
log1 2 3 4 5 6
| Thread-0-start Thread-0-end java.util.concurrent.ExecutionException: java.lang.NullPointerException: exception at ... Caused by: java.lang.NullPointerException: exception at ...
|
4.2.2.3 比较
比较:
维度 |
complete |
completeExceptionally |
后续状态 |
isDone() == true
isCompletedExceptionally() == false
|
isDone() == true
isCompletedExceptionally() == true
|
对回调链的影响 |
触发thenApply和thenAccept等正常回调,跳过exceptionally和handle等异常回调。 |
触发exceptionally和handle等异常回调,跳过thenApply和thenAccept等正常回调。 |
返回值 |
首次调用返回true,重复调用返回false,且不会覆盖已有结果。 |
首次调用返回true,重复调用返回false,且不会覆盖已有结果。 |
4.2.3 回调处理
4.2.3.1 thenRun
使用thenRun()
方法在上一个任务结束后,继续执行下一个任务。
示例:
java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public static void main(String[] args) { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + "-runAsync"); }).thenRun(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + "-thenRun"); }); try { System.out.println(Thread.currentThread().getName() + "-return-" + future.get()); } catch (Exception e) { e.printStackTrace(); } }
|
结果:
log1 2 3 4 5
| // 等待1s ForkJoinPool.commonPool-worker-9-runAsync // 等待1s ForkJoinPool.commonPool-worker-9-thenRun main-return-null
|
4.2.3.2 thenAccept
使用thenApply()
方法在上一个任务结束后,接收上一个任务的结果,继续执行下一个任务,不返回结果。
示例:
java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public static void main(String[] args) { CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + "-supplyAsync"); return "success-first"; }).thenAccept((result) -> { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + "-thenAccept"); System.out.println(Thread.currentThread().getName() + "-result-" + result); }); try { System.out.println(Thread.currentThread().getName() + "-return-" + future.get()); } catch (Exception e) { e.printStackTrace(); } }
|
结果:
log1 2 3 4 5 6
| // 等待1s ForkJoinPool.commonPool-worker-9-supplyAsync // 等待1s ForkJoinPool.commonPool-worker-9-thenAccept ForkJoinPool.commonPool-worker-9-result-success-first main-return-null
|
4.2.3.3 thenApply
使用thenApply()
方法在上一个任务结束后,接收上一个任务的结果,继续执行下一个任务,返回下一个任务的结果。
示例:
java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + "-supplyAsync"); return "success-first"; }).thenApply((result) -> { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + "-thenApply"); System.out.println(Thread.currentThread().getName() + "-param-" + result); return "success-second"; }); try { System.out.println(Thread.currentThread().getName() + "-return-" + future.get()); } catch (Exception e) { e.printStackTrace(); } }
|
结果:
log1 2 3 4 5 6
| // 等待1s ForkJoinPool.commonPool-worker-9-supplyAsync // 等待1s ForkJoinPool.commonPool-worker-9-thenApply ForkJoinPool.commonPool-worker-9-param-success-first main-return-success-second
|
4.2.3.4 whenComplete
使用whenComplete()
方法在上一个任务结束后,接收上一个任务的结果和异常,继续执行下一个任务,返回上一个任务的结果。
示例:
java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + "-supplyAsync"); return "success-first"; }).whenComplete((result, exception) -> { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + "-whenComplete"); System.out.println(Thread.currentThread().getName() + "-result-" + result); System.out.println(Thread.currentThread().getName() + "-exception-" + exception); }); try { System.out.println(Thread.currentThread().getName() + "-return-" + future.get()); } catch (Exception e) { e.printStackTrace(); } }
|
结果:
log1 2 3 4 5 6 7
| // 等待1s ForkJoinPool.commonPool-worker-9-supplyAsync // 等待1s ForkJoinPool.commonPool-worker-9-whenComplete ForkJoinPool.commonPool-worker-9-result-success-first ForkJoinPool.commonPool-worker-9-exception-null main-return-success-first
|
4.2.3.5 handle
使用handle()
方法在上一个任务结束后,接收上一个任务的结果和异常,继续执行下一个任务,返回下一个任务的结果。
示例:
java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + "-supplyAsync"); return "success-first"; }).handle((result, exception) -> { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + "-handle"); System.out.println(Thread.currentThread().getName() + "-result-" + result); System.out.println(Thread.currentThread().getName() + "-exception-" + exception); return "success-second"; }); try { System.out.println(Thread.currentThread().getName() + "-return-" + future.get()); } catch (Exception e) { e.printStackTrace(); } }
|
结果:
log1 2 3 4 5 6 7
| // 等待1s ForkJoinPool.commonPool-worker-9-supplyAsync // 等待1s ForkJoinPool.commonPool-worker-9-handle ForkJoinPool.commonPool-worker-9-result-success-first ForkJoinPool.commonPool-worker-9-exception-null main-return-success-second
|
4.2.3.6 exceptionally
使用exceptionally()
方法在上一个任务结束后,接收上一个任务的异常,继续执行下一个任务,返回下一个任务的结果。
示例:
java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + "-supplyAsync"); return "success-first" + 10 / 0; }).exceptionally((exception) -> { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + "-exceptionally"); System.out.println(Thread.currentThread().getName() + "-exception-" + exception); return "success-second"; }); try { System.out.println(Thread.currentThread().getName() + "-return-" + future.get()); } catch (Exception e) { e.printStackTrace(); } }
|
结果:
log1 2 3 4 5 6
| // 等待1s ForkJoinPool.commonPool-worker-9-supplyAsync // 等待1s ForkJoinPool.commonPool-worker-9-exceptionally ForkJoinPool.commonPool-worker-9-exception-java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero main-return-success-second
|
4.2.3.7 比较
比较:
维度 |
thenRun |
thenAccept |
thenApply |
whenComplete |
handle |
exceptionally |
接收 |
无 |
仅结果 |
仅结果 |
结果和异常 |
结果和异常 |
仅异常 |
返回 |
null |
null |
回调函数 |
前置结果 |
回调函数 |
回调函数 |
前置 |
仅正常 |
仅正常 |
仅正常 |
正常和异常 |
正常和异常 |
仅异常 |
用途 |
不接收结果,不返回结果。 |
接收结果,不返回结果。 |
接收结果,返回新结果。 |
接收结果和异常,返回原结果。 |
接收结果和异常,返回新结果。 |
接收异常,返回新结果。 |
条