completablefuture批量处理
时间: 2025-05-09 15:12:39 浏览: 19
### 使用 CompletableFuture 进行批量处理
在 Java 中,`CompletableFuture` 是一种强大的工具,能够帮助开发者轻松实现异步任务的执行和结果收集。当涉及批量处理时,可以利用 `CompletableFuture.allOf()` 或者 `Stream` API 来简化多个异步任务的结果聚合过程。
以下是基于 `CompletableFuture` 的批量处理实现方式及其代码示例:
#### 方法一:使用 `CompletableFuture[]` 和 `allOf()`
通过创建一组 `CompletableFuture` 并将其传递给 `CompletableFuture.allOf()` 方法,可以在所有任务完成后统一获取结果。
```java
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class BatchProcessingExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建一批 CompletableFutures
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> processTask(1));
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> processTask(2));
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> processTask(3));
// 将所有的 futures 放入数组中
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2, future3);
// 等待所有任务完成
allOfFuture.join();
// 获取每个任务的具体结果
System.out.println("Result of Task 1: " + future1.get());
System.out.println("Result of Task 2: " + future2.get());
System.out.println("Result of Task 3: " + future3.get());
}
private static String processTask(int taskId) {
try {
Thread.sleep(1000); // 模拟耗时任务
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Task-" + taskId + "-Completed";
}
}
```
这种方法适用于需要等待所有任务都完成后再继续后续逻辑的情况[^1]。
---
#### 方法二:结合 `Stream` API 处理大批量任务
对于更复杂的场景,尤其是需要动态生成大量任务或者对结果进行流式处理时,可以借助 `Stream` API 提高灵活性。
```java
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class StreamBatchProcessingExample {
public static void main(String[] args) throws Exception {
List<Integer> taskIds = List.of(1, 2, 3, 4, 5);
// 转换为 CompletableFuture 列表
List<CompletableFuture<String>> futures = taskIds.stream()
.map(id -> CompletableFuture.supplyAsync(() -> processTask(id)))
.collect(Collectors.toList());
// 阻塞直到所有任务完成并提取结果
List<String> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
// 输出最终结果
results.forEach(System.out::println);
}
private static String processTask(int taskId) {
try {
Thread.sleep(1000); // 模拟耗时任务
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Processed Task ID: " + taskId;
}
}
```
此方法不仅提高了可读性和维护性,还允许我们灵活地调整输入数据源以及结果处理逻辑[^3]。
---
#### 方法三:集成自定义线程池提升性能
为了进一步优化资源利用率,还可以引入自定义线程池配合 `CompletableFuture` 完成批量任务调度。
```java
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class CustomThreadPoolExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(4);
List<CompletableFuture<String>> futures = Arrays.asList(
CompletableFuture.supplyAsync(() -> processTask(1), executor),
CompletableFuture.supplyAsync(() -> processTask(2), executor),
CompletableFuture.supplyAsync(() -> processTask(3), executor)
);
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
combinedFuture.join(); // 等待所有任务结束
// 关闭线程池
executor.shutdown();
// 打印结果
for (CompletableFuture<String> future : futures) {
System.out.println(future.get());
}
}
private static String processTask(int id) {
try {
Thread.sleep(1000); // 模拟延迟
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Custom Pool Result For Task:" + id;
}
}
```
上述代码展示了如何通过指定线程池参数来控制并发度,从而达到更好的性能表现[^5]。
---
### 总结
以上三种方案分别针对不同需求提供了相应的解决方案。无论是简单的固定数量的任务还是复杂的大规模动态任务集合,都可以找到合适的策略加以应对。值得注意的是,在实际开发过程中还需要考虑异常捕获机制以保障系统的健壮性[^2]。
阅读全文
相关推荐


