抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

摘要:本文学习了如何使用Future相关接口实现异步调用。

环境

Windows 10 企业版 LTSC 21H2
Java 1.8

1 Callable

之前学习多线程的时候,可以通过继承Thread类和实现Runnable接口创建线程。

由于单继承的原因建议使用Runnable接口,需要重写run()方法,因为返回值为void类型,所以在执行完任务之后无法返回任何结果。

Callable接口是JDK1.5新增的函数式接口,位于java.util.concurrent包下:

java
1
2
3
4
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}

相比较使用Runnable接口,使用Callable接口能获取返回值,并且还可以处理异常。

如果要使用Callable接口,还需要配合支持异步调用的Future相关类。

2 Future

Future接口是JDK1.5新增的函数式接口,位于java.util.concurrent包下,用于表示异步计算的结果:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface Future<V> {
// 用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false
// 参数mayInterruptIfRunning表示是否允许取消正在执行的任务,如果设置true,则表示可以取消正在执行过程中的任务
// 如果任务已经完成,返回false
// 如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false
// 如果任务还没有执行,返回true
boolean cancel(boolean mayInterruptIfRunning);

// 表示正在执行的任务是否被取消成功,如果在完成前被取消成功,返回true
boolean isCancelled();

// 表示任务是否已经完成,若任务完成,则返回true
boolean isDone();

// 用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回
V get() throws InterruptedException, ExecutionException;

// 用来获取执行结果,如果在指定时间内,还没获取到结果,就抛出TimeoutException异常
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

3 FutureTask

FutureTask类间接实现了Runnable接口和Future接口,所以既可以作为Runnable被线程执行,又可以作为Future获取线程执行的结果。

多个线程创建时,如果使用了相同的FutureTask类,那么只会有一个线程执行call()方法。

构造方法:

java
1
2
public FutureTask(Callable<V> callable);
public FutureTask(Runnable runnable, V result);

通过实现Callable接口创建:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Demo {
public static void main(String[] args) {
DemoCallable callable = new DemoCallable();
FutureTask<Integer> f1 = new FutureTask<>(callable);
FutureTask<Integer> f2 = new FutureTask<>(callable);
Thread t1 = new Thread(f1, "线程一");
Thread t2 = new Thread(f2, "线程二");
t1.start();
t2.start();
try {
System.out.println(f1.get() + f2.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

class DemoCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
for (int i = 0; i < 1000; i++) {
System.out.println(Thread.currentThread().getName() + " >>> " + i);
}
return 1000;
}
}

4 CompletableFuture

4.1 简介

由于Future接口在获取结果时只能通过阻塞或者轮询的方式,阻塞的方式和异步编程的初衷相违背,轮询的方式会耗费无谓的CPU资源并且不能及时得到结果,因此迫切需要新的方式。

在JDK1.8中,提供了更加强大的CompletableFuture类,简化了异步编程的复杂性,提供了函数式编程的能力,还可以通过回调的方式处理结果。

CompletableFuture类实现了Future接口和CompletionStage接口。Future接口代表异步获取结果,CompletionStage接口代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段。一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。

使用CompletableFuture支持使用自定义的线程池,默认使用ForkJoinPool类的commonPool()方法创建的线程池。

4.2 使用

4.2.1 创建任务

4.2.1.1 runAsync

使用runAsync()方法创建无返回值的异步任务,默认使用ForkJoinPool线程池里的线程。

示例:

java
1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + "-runAsync");
});
try {
System.out.println(Thread.currentThread().getName() + "-result-" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}

结果:

log
1
2
ForkJoinPool.commonPool-worker-9-runAsync
main-result-null
4.2.1.2 supplyAsync

使用supplyAsync()方法创建带返回值的异步任务,默认使用ForkJoinPool线程池里的线程。

示例:

java
1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "-supplyAsync");
return "success";
});
try {
System.out.println(Thread.currentThread().getName() + "-result-" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}

结果:

log
1
2
ForkJoinPool.commonPool-worker-9-supplyAsync
main-result-success
4.2.1.3 比较

比较:

维度 runAsync supplyAsync
是否有返回值

4.2.2 立即返回

4.2.2.1 complete

使用complete()方法将入参设置为任务执行结果,将任务标记为成功完成。

示例:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) {
CompletableFuture<String> future = new CompletableFuture<>();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "-start");
future.complete("success");
System.out.println(Thread.currentThread().getName() + "-end");
}).start();
try {
System.out.println(Thread.currentThread().getName() + "-return-" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}

结果:

log
1
2
3
Thread-0-start
Thread-0-end
main-return-success
4.2.2.2 completeExceptionally

使用completeExceptionally()方法将入参设置为任务抛出异常,将任务标记为异常完成。

示例:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) {
CompletableFuture<String> future = new CompletableFuture<>();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "-start");
future.completeExceptionally(new NullPointerException("exception"));
System.out.println(Thread.currentThread().getName() + "-end");
}).start();
try {
System.out.println(Thread.currentThread().getName() + "-return-" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}

结果:

log
1
2
3
4
5
6
Thread-0-start
Thread-0-end
java.util.concurrent.ExecutionException: java.lang.NullPointerException: exception
at ...
Caused by: java.lang.NullPointerException: exception
at ...
4.2.2.3 比较

比较:

维度 complete completeExceptionally
后续状态 isDone() == true
isCompletedExceptionally() == false
isDone() == true
isCompletedExceptionally() == true
对回调链的影响 触发thenApply和thenAccept等正常回调,跳过exceptionally和handle等异常回调。 触发exceptionally和handle等异常回调,跳过thenApply和thenAccept等正常回调。
返回值 首次调用返回true,重复调用返回false,且不会覆盖已有结果。 首次调用返回true,重复调用返回false,且不会覆盖已有结果。

4.2.3 回调处理

4.2.3.1 thenRun

使用thenRun()方法在上一个任务结束后,继续执行下一个任务。

示例:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "-runAsync");
}).thenRun(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "-thenRun");
});
try {
System.out.println(Thread.currentThread().getName() + "-return-" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}

结果:

log
1
2
3
4
5
// 等待1s
ForkJoinPool.commonPool-worker-9-runAsync
// 等待1s
ForkJoinPool.commonPool-worker-9-thenRun
main-return-null
4.2.3.2 thenAccept

使用thenApply()方法在上一个任务结束后,接收上一个任务的结果,继续执行下一个任务,不返回结果。

示例:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "-supplyAsync");
return "success-first";
}).thenAccept((result) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "-thenAccept");
System.out.println(Thread.currentThread().getName() + "-result-" + result);
});
try {
System.out.println(Thread.currentThread().getName() + "-return-" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}

结果:

log
1
2
3
4
5
6
// 等待1s
ForkJoinPool.commonPool-worker-9-supplyAsync
// 等待1s
ForkJoinPool.commonPool-worker-9-thenAccept
ForkJoinPool.commonPool-worker-9-result-success-first
main-return-null
4.2.3.3 thenApply

使用thenApply()方法在上一个任务结束后,接收上一个任务的结果,继续执行下一个任务,返回下一个任务的结果。

示例:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "-supplyAsync");
return "success-first";
}).thenApply((result) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "-thenApply");
System.out.println(Thread.currentThread().getName() + "-param-" + result);
return "success-second";
});
try {
System.out.println(Thread.currentThread().getName() + "-return-" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}

结果:

log
1
2
3
4
5
6
// 等待1s
ForkJoinPool.commonPool-worker-9-supplyAsync
// 等待1s
ForkJoinPool.commonPool-worker-9-thenApply
ForkJoinPool.commonPool-worker-9-param-success-first
main-return-success-second
4.2.3.4 whenComplete

使用whenComplete()方法在上一个任务结束后,接收上一个任务的结果和异常,继续执行下一个任务,返回上一个任务的结果。

示例:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "-supplyAsync");
return "success-first";
}).whenComplete((result, exception) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "-whenComplete");
System.out.println(Thread.currentThread().getName() + "-result-" + result);
System.out.println(Thread.currentThread().getName() + "-exception-" + exception);
});
try {
System.out.println(Thread.currentThread().getName() + "-return-" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}

结果:

log
1
2
3
4
5
6
7
// 等待1s
ForkJoinPool.commonPool-worker-9-supplyAsync
// 等待1s
ForkJoinPool.commonPool-worker-9-whenComplete
ForkJoinPool.commonPool-worker-9-result-success-first
ForkJoinPool.commonPool-worker-9-exception-null
main-return-success-first
4.2.3.5 handle

使用handle()方法在上一个任务结束后,接收上一个任务的结果和异常,继续执行下一个任务,返回下一个任务的结果。

示例:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "-supplyAsync");
return "success-first";
}).handle((result, exception) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "-handle");
System.out.println(Thread.currentThread().getName() + "-result-" + result);
System.out.println(Thread.currentThread().getName() + "-exception-" + exception);
return "success-second";
});
try {
System.out.println(Thread.currentThread().getName() + "-return-" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}

结果:

log
1
2
3
4
5
6
7
// 等待1s
ForkJoinPool.commonPool-worker-9-supplyAsync
// 等待1s
ForkJoinPool.commonPool-worker-9-handle
ForkJoinPool.commonPool-worker-9-result-success-first
ForkJoinPool.commonPool-worker-9-exception-null
main-return-success-second
4.2.3.6 exceptionally

使用exceptionally()方法在上一个任务结束后,接收上一个任务的异常,继续执行下一个任务,返回下一个任务的结果。

示例:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "-supplyAsync");
return "success-first" + 10 / 0;
}).exceptionally((exception) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "-exceptionally");
System.out.println(Thread.currentThread().getName() + "-exception-" + exception);
return "success-second";
});
try {
System.out.println(Thread.currentThread().getName() + "-return-" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}

结果:

log
1
2
3
4
5
6
// 等待1s
ForkJoinPool.commonPool-worker-9-supplyAsync
// 等待1s
ForkJoinPool.commonPool-worker-9-exceptionally
ForkJoinPool.commonPool-worker-9-exception-java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
main-return-success-second
4.2.3.7 比较

比较:

维度 thenRun thenAccept thenApply whenComplete handle exceptionally
接收 仅结果 仅结果 结果和异常 结果和异常 仅异常
返回 null null 回调函数 前置结果 回调函数 回调函数
前置 仅正常 仅正常 仅正常 正常和异常 正常和异常 仅异常
用途 不接收结果,不返回结果。 接收结果,不返回结果。 接收结果,返回新结果。 接收结果和异常,返回原结果。 接收结果和异常,返回新结果。 接收异常,返回新结果。

评论