12 CompletableFuture

12.1 CompletableFuture简介

CompletableFuture在Java里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。

CompletableFuture实现了Future,CompletionStage接口,实现了Future接口就可以兼容现在有线程池框架,而CompletionStage接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture类。

12.2 Future和CompletableFuture

Future在Java里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个Future,在Future里面有isDone方法来判断任务是否处理结束,还有get方法可以一直阻塞知道任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。

Future的主要缺点如下:

(1)不支持手动完成

我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成

(2)不支持进一步的阻塞调用

通过Future的个体方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为Future不支持回调函数,所以无法实现这个功能

(3)不支持链式调用

对于Future的执行结果,我们想继续传到下一个Future处理使用,从而形成一个链式的pipeline调用,这在Future中是没法实现的。

(4)不支持多个Future合并

比如我们有10个Future并行执行,我们想在所有的Future运行完毕之后,执行某些函数,是没法通过Future实现的。

(5)不支持异常处理

Future的API没有任何的异常处理的api,所以在异步运行时,如果出了问题是不好定位 的。

12.3 CompletableFuture入门

12.3.1 使用CompletableFuture

场景:主线程里面创建了一个CompletableFuture,然后主线程调用get方法会阻塞,最后我们在一个子线程中使其终止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CompletableFutureDemo {
/**
* 主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会阻塞,最后我们
* 在一个子线程中使其终止
* @param args
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = new CompletableFuture<>();

new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"子线程开始干活");
// 子线程睡眠5秒
Thread.sleep(5000);
// 在子线程中完成主线程
future.complete("success");
} catch (InterruptedException e) {
e.printStackTrace();
}

},"A").start();
// 主线程调用get方法阻塞
System.out.println("主线程调用get方法获取的结果为:"+future.get());
System.out.println("主线程完成,阻塞结束!");
}
}

12.3.2 没有返回值的异步任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 没有返回值的异步任务
* @param args
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程开始");
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
System.out.println("子线程开始干活");
// 子线程休眠5秒
Thread.sleep(5000);
System.out.println("子线程完成");
} catch (InterruptedException e) {
e.printStackTrace();
}

});
// 主线程阻塞
future.get();
System.out.println("主线程完成,阻塞结束");
}

12.3.3 有返回值的异步任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 有返回值的异步任务
* @param args
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程开始");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("子线程开始干活");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "子线程完成任务";
});
// 主线程阻塞
String value = future.get();
System.out.println("主线程完成,获取的结果为:"+value);
}

12.3.4 线程依赖

当一个线程依赖另一个线程时,可以使用thenApply方法来把着两个线程串行化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 现对一个数加10,然后取平方
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程开始");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
try {
System.out.println("子线程开始");
num += 10;
} catch (Exception e) {
e.printStackTrace();
}
return num;
}).thenApply(Integer->{
return num * num;
});
// 阻塞主线程
Integer value = future.get();
System.out.println("主线成阻塞结束,获取的结果为:"+value);
}

12.3.5 消费处理结果

thenAccept消费处理结果,接受任务的处理结果,并消费处理,无返回结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 消费处理结果
*/

private static Integer num = 10;

public static void main(String[] args) {
System.out.println("主线程开始...");
CompletableFuture.supplyAsync(()->{
try {
System.out.println("加10子线程开始");
Thread.sleep(5000);
num += 10;
} catch (InterruptedException e) {
e.printStackTrace();
}
return num;
}).thenApply(Integer->{
return num * num;
}).thenAccept(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
System.out.println("子线程全部完成,最后调用accept,结果为:"+integer);
}
});
System.out.println("主线程结束");
}

12.3.6 异常处理

exceptionally异常处理,出现异常时触发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* exceptionally 异常处理
*/

private static Integer num = 10;

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程开始");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
int i = 1 / 0;
System.out.println("子线程➕10开始");
num += 10;
return num;
}).exceptionally(ex -> {
System.out.println(ex.getMessage());
return -1;
});
System.out.println("主线程完成");
Integer integer = future.get();
System.out.println("主线程完成,获取的结果为:"+integer);
}

handle类似于thenAccept/thenRun方法,是最后一步的处理调用,但是同时可以处理异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private static Integer num = 10;

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程开始");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
System.out.println("子线程➕10开始");
num += 10;
return num;
}).handle((i,ex) -> {
if (ex != null){
System.out.println("发生了异常,异常消息为:"+ex.getMessage());
return -1;
}else {
return i;
}

});
System.out.println("主线程完成");
Integer integer = future.get();
System.out.println("主线程完成,获取的结果为:"+integer);
}

12.3.7 结果合并

thenCompose合并两个没有依赖关系的CompletableFutures的执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 结果合并
*/
private static Integer num = 10;

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程开始");
// 第一步加10
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
System.out.println("操作加10");
num += 10;
return num;
});

// 合并结果
CompletableFuture<Integer> future1 = future.thenCompose(integer ->
CompletableFuture.supplyAsync(() -> {
return integer + 1;
}));

System.out.println(future.get());
System.out.println(future1.get());
}

thenCombine合并两个没有依赖关系的Completablefuturer任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 结果合并
*/
private static Integer num = 10;

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程开始");
// 第一步加10
CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
System.out.println("操作加10");
num += 10;
return num;
});

CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
System.out.println("操作乘10");
num =num * 10;
return num;
});

// 合并两个结果
CompletableFuture<List<Integer>> future = job1.thenCombine(job2, new BiFunction<Integer, Integer, List<Integer>>() {
@Override
public List<Integer> apply(Integer integer, Integer integer2) {
List<Integer> list = new ArrayList<>();
list.add(integer);
list.add(integer2);
return list;
}
});

// 主线程任务结束
System.out.println("获取的结果为:"+future.get());
}

合并多个任务的结果allOf与anyOf

allOf:一系列独立的future任务,等其所有的任务执行完后做一些事情

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 现对一个数加10,然后乘10,然后减10,然后除10
*/
private static Integer num = 10;

public static void main(String[] args) {
System.out.println("主线程开始");
List<CompletableFuture> list = new ArrayList<>();

CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
num += 10;
return num;
});
list.add(job1);

CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
num = num * 10;
return num;
});
list.add(job2);

CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
num -= 10;
return num;
});
list.add(job3);

CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
num = num / 10;
return num;
});
list.add(job4);
// 多任务合并
List<Integer> collect = list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList());

System.out.println(collect);
}

anyOf:只要多个future里面有一个返回,整个任务就可以结束,而不需要等到每一个future结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* 现对一个数加10,然后乘10,然后减10,然后除10
*/
private static Integer num = 10;

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程开始");
CompletableFuture<Integer>[] futures = new CompletableFuture[4];

CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
try {
num += 10;
Thread.sleep(5000);
return num;
} catch (InterruptedException e) {
e.printStackTrace();
return 0;
}
});
futures[0] = job1;

CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
try {
num = num * 10;
Thread.sleep(2000);
return num;
} catch (Exception e) {
e.printStackTrace();
return 1;
}
});
futures[1] = job2;
CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
try {
num -= 10;
Thread.sleep(3000);
return num;
} catch (InterruptedException e) {
e.printStackTrace();
return 2;
}
});
futures[2] = job3;

CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
try {
num = num / 10;
Thread.sleep(4000);
return num;
} catch (Exception e) {
e.printStackTrace();
return 3;
}
});
futures[3] = job4;

CompletableFuture<Object> future = CompletableFuture.anyOf(futures);
System.out.println(future.get());
}