CompletableFuture是 java1.8 提供的一个新类,是对Future的增强,吸收了guava异步线程的特点,可以实现一系列的异步线程操作。CompletableFuture可以简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
CompletableFuture合适每个操作很复杂需要花费很长时间的的场景。
范例一:
/** * supplyAsync: 异步开始一个任务,并返回结果 * thenApply: 处理上一步的执行结果 * * allOf: 所有任务完成后才会返回 * anyOf: 任一任务完成就返回 */ private static void test2() throws Exception { List<String> list = new ArrayList<>(); ExecutorService executorService = Executors.newFixedThreadPool(10); List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10); Long start = System.currentTimeMillis(); CompletableFuture[] arr = taskList.stream() .map(x -> CompletableFuture.supplyAsync(() -> x, executorService) //对每个元素异步做处理,并返回一个CompletableFuture对象 .thenApply(y -> Integer.toString(y)) //处理结果进一步处理 .whenComplete((r, e) -> { //异步处理完成后,获取结果 list.add(r); }) ) .toArray(CompletableFuture[]::new); //所有CompletableFuture对象转成一个数组 boolean all = false; if(all){ CompletableFuture.allOf(arr).join(); //等待所有任务完成后才会继续往下执行 }else{ Object result = CompletableFuture.anyOf(arr).get(); //只要有一个任务完成就继续往下执行 System.out.println("result=" + result); } System.out.println("list="+list+", 耗时="+(System.currentTimeMillis()-start)); executorService.shutdown(); }
范例二:
/**
* thenCompose:对两个任务进行串行执行,第一个完成后,将其结果作为参数传递给第二个
*/
public static void test3() throws Exception {
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("task1 doing...");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result1";
});
CompletableFuture<String> completableFuture2 = completableFuture1.thenCompose(result -> CompletableFuture.supplyAsync(() -> {
try {
System.out.println("prvo result=" + result);
System.out.println("task2 doing...");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result2";
}));
System.out.println(completableFuture2.get());
}
范例三:
/** * thenCombine:组合两个执行结果。两个任务是并行执行的 * thenAccept:链末消费:接收上一阶段的输出作为本阶段的输入,执行任务 */ public static void test4() throws Exception { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { try { System.out.println("task1 doing start"); Thread.sleep(1000); System.out.println("task1 doing end"); } catch (InterruptedException e) { e.printStackTrace(); } return 100; }); completableFuture1.thenAccept(result -> System.out.println("result1=" + result)); CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> { try { System.out.println("task2 doing start"); Thread.sleep(3000); System.out.println("task2 doing end"); } catch (InterruptedException e) { e.printStackTrace(); } return 300; }); completableFuture2.thenAccept(result -> System.out.println("result2=" + result)); CompletableFuture<Integer> completableFuture3 = completableFuture2.thenCombine(completableFuture1, //合并函数 (result1, result2) -> result1 + result2 ); System.out.println(completableFuture3.get()); }
范例四:
/** * 递归做 thenCombine 处理 */ public static void test5() throws Exception { long s = System.currentTimeMillis(); CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { try { System.out.print("开始两两合并处理:0 + "); Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } return 0; }); for(int i=1; i<=7; i++){ completableFuture1 = completableFuture1.thenCombine( CompletableFuture.supplyAsync(() -> { int x = 0; try { Thread.sleep(3000); x = count.incrementAndGet(); System.out.print(x + " + "); } catch (InterruptedException e) { e.printStackTrace(); } return x; }) , (r1, r2) -> r1 + r2 ); } System.out.println("0 = " + completableFuture1.get()); System.out.println(System.currentTimeMillis() - s); }
范例五:
/** * runAsync: 异步执行,无返回 * supplyAsync: 异步执行,有返回 */ public static void test6(){ System.out.println("start doing..."); CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { try { Thread.sleep(3000); int result = 100/0; } catch (Exception ex) { throw new RuntimeException(ex); } }); try { future1.get(); } catch (Exception ex) { System.out.println("future1 error: " + ex.toString()); } CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); int result = 100/0; return result; } catch (Exception ex) { throw new RuntimeException(ex); } }); try { System.out.println(future2.get()); } catch (Exception ex) { System.out.println("future2 error: " + ex.toString()); } }
范例六:
/** * applyToEither: 应用两者中的任一,哪个先返回就用哪个 * runAfterBoth: 两个任务都完成后执行函数 * runAfterEither: 其中一个任务完成后执行函数,另一个任务忽略 */ public static void test7(){ String original = "Message"; CompletableFuture<String> f1 = CompletableFuture.completedFuture(original).thenApplyAsync(s -> { try { int sleep = ThreadLocalRandom.current().nextInt(5000); System.out.println("f1: " + sleep); TimeUnit.MILLISECONDS.sleep(sleep); System.out.println("f1 ok"); } catch (Exception e) { e.printStackTrace(); } return s.toUpperCase(); }); CompletableFuture<String> f2 = CompletableFuture.completedFuture(original).thenApplyAsync(s -> { try { int sleep = ThreadLocalRandom.current().nextInt(5000); System.out.println("f2: " + sleep); TimeUnit.MILLISECONDS.sleep(sleep); System.out.println("f2 ok"); } catch (Exception e) { e.printStackTrace(); } int i = 1/0; return s.toLowerCase(); }); // CompletableFuture<String> f3 = f1.applyToEither(f2, s -> s + " from applyToEither"); // System.out.println(f3.join()); // CompletableFuture<Void> f3 = f1.runAfterBoth(f2, () -> { // System.out.println("all ok"); // }); // f3.join(); // CompletableFuture<Void> f3 = f1.runAfterEither(f2, () -> { // System.out.println("one ok"); // }); // f3.join(); }
范例七:
/** * handle: 任务完成后或者抛出异常时,调用handle方法处理结果 * exceptionally: 捕获异常 */ public static void test8(){ CompletableFuture<String> f = CompletableFuture.completedFuture("Massges").thenApplyAsync(s -> { try { int sleep = ThreadLocalRandom.current().nextInt(3000); System.out.println("f: " + sleep); TimeUnit.MILLISECONDS.sleep(sleep); System.out.println("f ok"); } catch (Exception ex) { ex.printStackTrace(); } int i = 1/0; return s.toLowerCase(); }); CompletableFuture<String> f3 = f.handle((r, ex) -> { System.out.println(r + ", " + ex); int i = 1/0; return r; }).exceptionally(s -> { System.out.println("error: " + s); return null; }); System.out.println(f3.join()); }
相关推荐
主要给大家介绍了关于在Spring Boot2中使用CompletableFuture的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面来一起看看吧
springboot使用的小案例CompletableFuture 并发处理任务,并且汇总结果后统一处理数据
强化学习.您将收获: - 为什么会选择 CompletableFuture - 如何创建CompletableFuture异步任务 - CompletableFuture异步任务回调使用 - CompletableFuture异步任务编排 - CompletableFuture的异常处理
CompletableFuture实现的工具类 ,使用泛型实现各类多线程操作,满足大部分要求
CompletableFuture的定义 CompletableFuture是Java 8中引入的一种新的...CompletableFuture的使用场景 CompletableFuture常用于需要处理异步任务的场景,如并发请求、数据处理等,它可以提高程序的运行效率和响应速度。
关于Java8 inAction所有的Demo 非常的实用,全部来自本人亲人编写 放心使用
详细的demo
假设你使用 Future 运行子线程调用远程 API 来获取某款产品的最新价格,服务器由于洪灾宕机了,此时如果你想手动结束计算,而是想返回上次缓存中的价格,这是 Future 做不到的 调用 get() 方法会阻塞程序 Future ...
使用Java的进行异步编程的示例
Java8PromisesWithCompletableFuture 演示将CompletableFuture回调API与JUnit测试结合使用的测试用例
客户端使用 AsynchronousSocketChannel 和 CompletableFuture 的简单 TCP 客户端
JAVA使用线程池查询大批量数据
未来助手FutureHelper是一个实用程序库,其中包含处理Java 8的CompletableFuture并将其与Vert.x AsyncResult类型处理程序和Java 8的流集成的有用方法。 FutureHelper的附加功能是使用Java 8的Timer类简化的计时器...
###如何在我的项目中使用该库? 并使用maven生成.jar文件: 注意:您将需要下载并创建快照。 git clone https://github.com/JeffreyFalgout/completable-futurescd completable-futures/mvn package从页面下载一个....
android-retrofuture android-retrofuture是Java 8 CompletableFuture API的后向版本,已升级为希望使用Android Studio 3.x D8 / desugar工具链的Android开发人员的当前Java 9(JEP 266)增强功能。 此代码中没有...
包括:线程创建、Synchronized和Reentrantlock锁的使用、线程安全问题演示、Condition的应用、CountDownLatch的应用、Cyclicbarrier的应用、Semaphore的应用、线程池的应用、Completablefuture的应用、手写阻塞队列...
如果希望将其用于Java的早期版本(最早允许使用Java 9),则将pom.xml中的maven-compiler-plugin的源和目标从11更改为9。 。 然后从您的IDE运行mvn clean verify 。 您应该会看到类似以下内容的内容: ...
异步泽西灰熊实验JAX-RS 2 Async (Jersey) 和 Java 8 CompletableFuture 的实验基于来自: : 教程这是一个 Maven 项目。 需要 JDK 8 才能使用 CompletableFuture 和其他 Java 8 功能。
13.4 异步编排CompletableFuture 254 13.5 异步Web服务实现 257 13.6 请求缓存 259 13.7 请求合并 261 14 如何扩容 266 14.1 单体应用垂直扩容 267 14.2 单体应用水平扩容 267 14.3 应用拆分 268 14.4 数据库拆分 ...
WAMP 层使用 Java 8 CompletableFuture 进行 WAMP 操作(调用、注册、发布和订阅),并使用观察者模式进行 WAMP 会话、订阅和注册生命周期事件。 该库获得 MIT 许可,由 Crossbar.io 项目维护,使用 ...