12 CompletableFuture
12.1 CompletableFuture简介
CompletableFuture在Java里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。
CompletableFuture实现了Future,CompletionStage接口,实现了Future接口就可以兼容现在有线程池框架,而CompletionStage接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture类。
12.2 Future和CompletableFuture
Future在Java里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个Future,在Future里面有isDone方法来判断任务是否处理结束,还有get方法可以一直阻塞知道任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。
Future的主要缺点如下:
(1)不支持手动完成
我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成
(2)不支持进一步的阻塞调用
通过Future的个体方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为Future不支持回调函数,所以无法实现这个功能
(3)不支持链式调用
对于Future的执行结果,我们想继续传到下一个Future处理使用,从而形成一个链式的pipeline调用,这在Future中是没法实现的。
(4)不支持多个Future合并
比如我们有10个Future并行执行,我们想在所有的Future运行完毕之后,执行某些函数,是没法通过Future实现的。
(5)不支持异常处理
Future的API没有任何的异常处理的api,所以在异步运行时,如果出了问题是不好定位 的。
12.3 CompletableFuture入门
12.3.1 使用CompletableFuture
场景:主线程里面创建了一个CompletableFuture,然后主线程调用get方法会阻塞,最后我们在一个子线程中使其终止。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future = new CompletableFuture<>();
new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+"子线程开始干活"); Thread.sleep(5000); future.complete("success"); } catch (InterruptedException e) { e.printStackTrace(); }
},"A").start(); System.out.println("主线程调用get方法获取的结果为:"+future.get()); System.out.println("主线程完成,阻塞结束!"); } }
|
12.3.2 没有返回值的异步任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("主线程开始"); CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { try { System.out.println("子线程开始干活"); Thread.sleep(5000); System.out.println("子线程完成"); } catch (InterruptedException e) { e.printStackTrace(); }
}); future.get(); System.out.println("主线程完成,阻塞结束"); }
|
12.3.3 有返回值的异步任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("主线程开始"); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { System.out.println("子线程开始干活"); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return "子线程完成任务"; }); String value = future.get(); System.out.println("主线程完成,获取的结果为:"+value); }
|
12.3.4 线程依赖
当一个线程依赖另一个线程时,可以使用thenApply方法来把着两个线程串行化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("主线程开始"); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{ try { System.out.println("子线程开始"); num += 10; } catch (Exception e) { e.printStackTrace(); } return num; }).thenApply(Integer->{ return num * num; }); Integer value = future.get(); System.out.println("主线成阻塞结束,获取的结果为:"+value); }
|
12.3.5 消费处理结果
thenAccept消费处理结果,接受任务的处理结果,并消费处理,无返回结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
private static Integer num = 10;
public static void main(String[] args) { System.out.println("主线程开始..."); CompletableFuture.supplyAsync(()->{ try { System.out.println("加10子线程开始"); Thread.sleep(5000); num += 10; } catch (InterruptedException e) { e.printStackTrace(); } return num; }).thenApply(Integer->{ return num * num; }).thenAccept(new Consumer<Integer>() { @Override public void accept(Integer integer) { System.out.println("子线程全部完成,最后调用accept,结果为:"+integer); } }); System.out.println("主线程结束"); }
|
12.3.6 异常处理
exceptionally异常处理,出现异常时触发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
private static Integer num = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("主线程开始"); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{ int i = 1 / 0; System.out.println("子线程➕10开始"); num += 10; return num; }).exceptionally(ex -> { System.out.println(ex.getMessage()); return -1; }); System.out.println("主线程完成"); Integer integer = future.get(); System.out.println("主线程完成,获取的结果为:"+integer); }
|
handle类似于thenAccept/thenRun方法,是最后一步的处理调用,但是同时可以处理异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| private static Integer num = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("主线程开始"); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{ System.out.println("子线程➕10开始"); num += 10; return num; }).handle((i,ex) -> { if (ex != null){ System.out.println("发生了异常,异常消息为:"+ex.getMessage()); return -1; }else { return i; }
}); System.out.println("主线程完成"); Integer integer = future.get(); System.out.println("主线程完成,获取的结果为:"+integer); }
|
12.3.7 结果合并
thenCompose合并两个没有依赖关系的CompletableFutures的执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
private static Integer num = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("主线程开始"); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{ System.out.println("操作加10"); num += 10; return num; });
CompletableFuture<Integer> future1 = future.thenCompose(integer -> CompletableFuture.supplyAsync(() -> { return integer + 1; }));
System.out.println(future.get()); System.out.println(future1.get()); }
|
thenCombine合并两个没有依赖关系的Completablefuturer任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
private static Integer num = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("主线程开始"); CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> { System.out.println("操作加10"); num += 10; return num; });
CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> { System.out.println("操作乘10"); num =num * 10; return num; });
CompletableFuture<List<Integer>> future = job1.thenCombine(job2, new BiFunction<Integer, Integer, List<Integer>>() { @Override public List<Integer> apply(Integer integer, Integer integer2) { List<Integer> list = new ArrayList<>(); list.add(integer); list.add(integer2); return list; } });
System.out.println("获取的结果为:"+future.get()); }
|
合并多个任务的结果allOf与anyOf
allOf:一系列独立的future任务,等其所有的任务执行完后做一些事情
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
|
private static Integer num = 10;
public static void main(String[] args) { System.out.println("主线程开始"); List<CompletableFuture> list = new ArrayList<>();
CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> { num += 10; return num; }); list.add(job1);
CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> { num = num * 10; return num; }); list.add(job2);
CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> { num -= 10; return num; }); list.add(job3);
CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> { num = num / 10; return num; }); list.add(job4);
List<Integer> collect = list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList());
System.out.println(collect); }
|
anyOf:只要多个future里面有一个返回,整个任务就可以结束,而不需要等到每一个future结束
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
|
private static Integer num = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("主线程开始"); CompletableFuture<Integer>[] futures = new CompletableFuture[4];
CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> { try { num += 10; Thread.sleep(5000); return num; } catch (InterruptedException e) { e.printStackTrace(); return 0; } }); futures[0] = job1;
CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> { try { num = num * 10; Thread.sleep(2000); return num; } catch (Exception e) { e.printStackTrace(); return 1; } }); futures[1] = job2; CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> { try { num -= 10; Thread.sleep(3000); return num; } catch (InterruptedException e) { e.printStackTrace(); return 2; } }); futures[2] = job3;
CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> { try { num = num / 10; Thread.sleep(4000); return num; } catch (Exception e) { e.printStackTrace(); return 3; } }); futures[3] = job4;
CompletableFuture<Object> future = CompletableFuture.anyOf(futures); System.out.println(future.get()); }
|