Skip to main content
 首页 » 编程设计

Java CompletableFuture多线程详解

2022年07月18日45xiaohuochai

参考文章:https://colobu.com/2016/02/29/Java-CompletableFuture/

https://www.jdon.com/50027

https://www.jianshu.com/p/f2735065a13a


public class TestMain {
/**
* @desc : Future的用法
* 虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,
* 只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,
* 轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,
* 为什么不能用观察者设计模式当计算结果完成及时通知监听者呢
* @author : 毛会懂
* @create: 2021/11/7 13:09:00
**/
public static void main1(String[] args) throws InterruptedException, ExecutionException {
ExecutorService cachePool = Executors.newCachedThreadPool();
Future<String> future = cachePool.submit(() -> {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务完成了");
return "异步任务计算结果!";
});

System.out.println("做其他的事情");
Thread.sleep(100);
// 用法一:使用get堵塞主调用线程,知道计算完成返回结果
// String result = future.get();
// System.out.println("异步结果:"+ result);

// 用法二:使用isDone()方法检查计算是否完成
// long start = System.currentTimeMillis();
// while (true){
// // 使用isDone()方法检查计算是否完成
// if(future.isDone()){
// break;
// }
// }
// String result = future.get();
// System.out.println("轮训时间:" + (System.currentTimeMillis() - start));
// System.out.println("异步结果:"+ result);

// 用法三:使用cancel方法停止任务的执行,参数false不会立即终止任务,true会打断sleep,抛出异常
future.cancel(true);
System.out.println("停止任务执行了");
cachePool.shutdown();
}

/**
* @desc : CompletableFuture常用的用法(同步)
* 在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,
* 提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,
* 提供了函数式编程的能力,可以通过回调的方式处理计算结果,
* 并且提供了转换和组合CompletableFuture的方法。
* @author : 毛会懂
* @create: 2021/11/7 13:19:00
**/
public static void main2(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFutureOne = new CompletableFuture<>();
ExecutorService cachePool = Executors.newCachedThreadPool();
cachePool.execute(() ->{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("one=" +Thread.currentThread().getName());
completableFutureOne.complete("异步执行结果");
});

// WhenComplete 方法返回的 CompletableFuture 仍然是原来的 CompletableFuture 计算结果(类型都是String).
CompletableFuture<String> completableFutureTwo = completableFutureOne.whenComplete((s, t) -> {
System.out.println("two=" +Thread.currentThread().getName());
System.out.println("异步执行完毕后,打印异步任务的结果:" + s);
});

// ThenApply 方法返回的是一个新的 completeFuture(类型不再一致)
CompletableFuture<Integer> completableFutureThree = completableFutureTwo.thenApply(s -> {
System.out.println("three=" + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.length();
});
System.out.println("堵塞获取结果:" + completableFutureThree.get());
cachePool.shutdown();
}

/**
* @desc : CompletableFuture常用的用法(异步)
* @author : 毛会懂
* @create: 2021/11/7 13:19:00
**/
public static void main3(String[] args) throws IOException, ExecutionException, InterruptedException {
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
List<CompletableFuture<String>> newList = list.stream().map(id -> getNum(id)).collect(Collectors.toList());
System.out.println("以上3个completableFuture执行完毕");

// 使用allOf方法封装所有的并行任务
CompletableFuture<Void> allFutures = CompletableFuture.allOf(newList.toArray(new CompletableFuture[newList.size()]));
//获得所有子任务的处理结果
CompletableFuture<List<String>> listCompletableFuture = allFutures.thenApply(v -> newList.stream().map(future -> future.join()).collect(Collectors.toList()));

System.out.println("等待结果");
List<String> strings = listCompletableFuture.get();
System.out.println("结果" + strings);

// 让主线程暂停
System.in.read();
}

private static CompletableFuture<String> getNum(Integer id){
return CompletableFuture.supplyAsync(() -> {
System.out.println("start" + id);
try {
Thread.sleep(3000 * id);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("over" + id);
return "num" + id;
});
}


/**
* @desc : get 和 join抛异常的不同
* @author : 毛会懂
* @create: 2021/11/7 13:54:00
**/
public static void main4(String[] args) throws ExecutionException, InterruptedException, IOException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
int i = 1/ 0;
return 100;
});
//completableFuture.get();
completableFuture.join();
System.in.read();
}

/**
* @desc : 抛异常和执行正常的结果
* CompletableFuture.complete()、CompletableFuture.completeExceptionally只能被调用一次。
* @author : 毛会懂
* @create: 2021/11/7 14:07:00
**/
public static void main5(String[] args) throws InterruptedException, IOException {
final CompletableFuture<Integer> future = new CompletableFuture<>();
class Client extends Thread{
CompletableFuture<Integer> future;
Client(String threadName,CompletableFuture<Integer> future){
super(threadName);
this.future = future;
}
@Override
public void run(){
try {
System.out.println(this.getName() + ":" + future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

new Client("client1",future).start();
new Client("client2",future).start();
System.out.println("等待");
Thread.sleep(3000);
future.complete(100);
// future.completeExceptionally(new Exception());
System.in.read();
}

/**
* @desc : 异步执行长时间的任务
* @author : 毛会懂
* @create: 2021/11/7 14:30:00
*
* @return*/
public static void main6(String[] args) throws InterruptedException, IOException {

try {
System.out.println("开始执行我们的业务");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束执行我们的业务");
CompletableFuture<Integer> future = new CompletableFuture<>();
// 方法一: 无关业务异步执行。 supplyAsync方法以Supplier<U>函数式接口类型为参数,CompletableFuture的计算结果类型为U。
// future.supplyAsync( () ->{
// System.out.println("开始执行其他与本请求的无关的业务");
// try {
// Thread.sleep(5000);
// } catch (InterruptedException ee ){
// ee.printStackTrace();
// }
// System.out.println("结束执行其他与本请求的无关的业务");
// return null;
// });

// 方法二:主业务成功执行完,执行future.complete(),可触发无关业务的异步执行。
// future.whenComplete((v,e) ->{
// System.out.println("开始执行其他与本请求的无关的业务");
// try {
// Thread.sleep(5000);
// } catch (InterruptedException ee ){
// ee.printStackTrace();
// }
// System.out.println("结束执行其他与本请求的无关的业务");
// });
// // 此行与方法二配套
// future.complete(1);

// 方法三: 无关业务异步执行 runAsync方法也好理解,它以Runnable函数式接口类型为参数,所以CompletableFuture的计算结果为空。
Map<String,Integer> map = new HashMap<>();
map.put("activityId",11);
String name = "毛会懂";
// CompletableFuture.runAsync(() ->{
// System.out.println("开始执行其他与本请求的无关的业务");
// try {
// System.out.println(map);
// Thread.sleep(5000);
// System.out.println("name=" + name);
// } catch (InterruptedException ee ){
// ee.printStackTrace();
// }
// System.out.println("结束执行其他与本请求的无关的业务");
// });

// 方法四:supplyAsync方法以Supplier<U>函数式接口类型为参数,CompletableFuture的计算结果类型为U。
CompletableFuture.supplyAsync(()->{
System.out.println("开始执行其他与本请求的无关的业务");
try {
System.out.println(map);
Thread.sleep(5000);
System.out.println("name=" + name);
} catch (InterruptedException ee ){
ee.printStackTrace();
}
System.out.println("结束执行其他与本请求的无关的业务");
return null;
});
System.out.println("ok");

System.in.read();
}


/**
* @desc : 当原先的CompletableFuture的值计算完成或者抛出异常的时候,
* 会触发这个CompletableFuture对象的计算,结果由BiFunction参数计算而得。
* 因此handle 和 handleAsync方法兼有whenComplete和转换的两个功能
* @author : 毛会懂
* @create: 2021/11/7 15:43:00
**/
public static void main7(String[] args) throws IOException, ExecutionException, InterruptedException {
CompletableFuture future = new CompletableFuture();
CompletableFuture handler = future.handle((s, v) -> {
System.out.println("handler");
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s;
});

// CompletableFuture handler = future.handleAsync((s, v) -> {
// System.out.println("handler");
// System.out.println(Thread.currentThread().getName());
// try {
// Thread.sleep(5000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// return s;
// });
System.out.println(Thread.currentThread().getName());
future.complete("ok");
System.out.println(handler.get());
System.in.read();
}

/**
* @desc : thenApply 和thenApplyAsync:原来的CompletableFuture计算完后,
* 将结果传递给函数fn,将fn的结果作为新的CompletableFuture计算结果。
* 因此它的功能相当于将CompletableFuture<T>转换成CompletableFuture<U>
* 它们与handle方法的区别在于handle方法会处理正常计算值和异常,
* 因此它可以屏蔽异常,避免异常继续抛出。而thenApply方法只是用来处理正常值,因此一旦有异常就会抛出。
* @author : 毛会懂
* @create: 2021/11/7 15:46:00
**/
public static void main8(String[] args) throws ExecutionException, InterruptedException {
// future初始化
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
return 100;
});
CompletableFuture<String> completableFuture = future.thenApplyAsync(i -> i * 10).thenApply(i -> i.toString() + "ok");
System.out.println(completableFuture.get());
}

/**
* @desc : 没有返回值的用法
* @author : 毛会懂
* @create: 2021/11/7 15:52:00
**/
public static void main9(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 100;
});

// thenAccept 和 thenAcceptAsync没有返回值
CompletableFuture<Void> future1 = future.thenAccept(System.out::println);
// 没有返回值,打印出来为 null
System.out.println(future1.get());

//thenAcceptBoth以及相关方法提供了类似的功能,当两个CompletionStage都正常完成计算的时候,就会执行提供的action,它用来组合另外一个异步的结果。
CompletableFuture<Void> future2 = future.thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
return 10;
}), (x, y) -> System.out.println(x * y));
System.out.println(future2.get());
//runAfterBoth是当两个CompletionStage都正常完成计算的时候,执行一个Runnable,这个Runnable并不使用计算的结果
CompletableFuture<Void> future3 = future.runAfterBoth(CompletableFuture.supplyAsync(() -> {
return 10;
}), () -> System.out.println("ok"));
System.out.println(future3.get());

// Runnable并不使用CompletableFuture计算的结果。
CompletableFuture<Void> future4 = future.thenRun(() -> System.out.println("ok"));
System.out.println(future4.get());
}

/**
* @desc : 组合
* @author : 毛会懂
* @create: 2021/11/7 16:10:00
**/
public static void main10(String[] args) throws ExecutionException, InterruptedException, IOException {
// CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// return 100;
// });
// CompletableFuture<String> f = future.thenCompose( i -> {
// return CompletableFuture.supplyAsync(() -> {
// return (i * 10) + "";
// });
// });
// System.out.println(f.get()); //1000

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 100;
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});
CompletableFuture<String> f = future.thenCombine(future2, (x,y) -> {String str = (y + "-" + x); return str;});
System.out.println(f.get()); //abc-100

System.in.read();
}

/**
* @desc : Java Future转CompletableFuture
* @author : 毛会懂
* @create: 2021/11/7 16:29:00
**/
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Future<String> future = executorService.submit(() -> {
return "ok";
});
CompletableFuture<String> completableFuture = toCompletable(future, executorService);
String s = completableFuture.get();
System.out.println(s);
}

public static <T> CompletableFuture<T> toCompletable(Future<T> future, Executor executor) {
return CompletableFuture.supplyAsync(() -> {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}, executor);
}


public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.<T>toList()));
}
public static <T> CompletableFuture<List<T>> sequence(Stream<CompletableFuture<T>> futures) {
List<CompletableFuture<T>> futureList = futures.filter(f -> f != null).collect(Collectors.toList());
return sequence(futureList);
}
}

本文参考链接:https://www.cnblogs.com/maohuidong/p/15519489.html