Java并发编程:线程管理与协调
立即解锁
发布时间: 2025-08-19 02:34:03 阅读量: 5 订阅数: 34 


JVM并发编程的艺术与实践
# Java并发编程:线程管理与协调
## 1. 使用 ExecutorService 管理线程
在计算目录大小时,我们可以将操作拆分为多个任务,每个任务负责探索一个子目录。但直接使用`Thread`实例存在问题,因为线程不可复用,且调度多个任务并不容易,创建过多线程也不可扩展。`java.util.concurrent` API 正是为管理线程池而引入的。
### 1.1 ExecutorService 概述
- `ExecutorService`代表一个线程池,它将线程的创建和执行的任务分离。
- 可以配置线程池的类型,如单线程、缓存、基于优先级、定时/周期性或固定大小的线程池,还能设置任务等待队列的大小。
- 可轻松调度任意数量的任务,对于无返回结果的任务,可将其包装在`Runnable`中;对于有返回结果的任务,可将其包装在`Callable`中。
### 1.2 任务调度方法
| 方法 | 描述 |
| ---- | ---- |
| `execute()` | 用于调度任意任务 |
| `submit()` | 用于调度任意任务,可返回`Future`对象获取任务结果 |
| `invokeAll()` | 用于调度一组任务 |
| `invokeAny()` | 用于只关心其中一个任务完成的情况 |
这些方法都有带超时参数的重载版本,用于设置等待结果的时间。
### 1.3 线程池的生命周期管理
- 创建`ExecutorService`后,线程池即准备好并处于活动状态。若没有任务,线程会空闲,缓存池中的线程会在延迟后死亡。
- 不再需要线程池时,调用`shutdown()`方法,它会在完成所有已调度任务后关闭线程池,但不能再调度新任务。
- 调用`shutdownNow()`方法会尝试强制取消当前执行的任务,但不保证成功,因为它依赖任务对`interrupt()`调用的响应。
### 1.4 示例代码
```java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadManagementExample {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(10);
// 调度任务
service.submit(() -> System.out.println("Task executed"));
// 关闭线程池
service.shutdown();
}
}
```
### 1.5 避免依赖线程终止检查
`ExecutorService`提供了检查服务是否终止或关闭的方法,但最好不要依赖它们。我们应关注任务的完成,而非线程或服务的死亡。
## 2. 线程协调
将问题拆分为多个部分后,我们可以在线程池中调度多个并发任务,并等待它们的结果。
### 2.1 顺序计算文件大小示例
```java
import java.io.File;
public class TotalFileSizeSequential {
private long getTotalSizeOfFilesInDir(final File file) {
if (file.isFile()) return file.length();
final File[] children = file.listFiles();
long total = 0;
if (children != null)
for (final File child : children)
total += getTotalSizeOfFilesInDir(child);
return total;
}
public static void main(final String[] args) {
final long start = System.nanoTime();
final long total = new TotalFileSizeSequential()
.getTotalSizeOfFilesInDir(new File(args[0]));
final long end = System.nanoTime();
System.out.println("Total Size: " + total);
System.out.println("Time taken: " + (end - start) / 1.0e9);
}
}
```
### 2.2 简单并发计算文件大小示例
```java
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class NaivelyConcurrentTotalFileSize {
private long getTotalSizeOfFilesInDir(
final ExecutorService service, final File file)
throws InterruptedException, ExecutionException, TimeoutException {
if (file.isFile()) return file.length();
long total = 0;
final File[] children = file.listFiles();
if (children != null) {
final List<Future<Long>> partialTotalFutures =
new ArrayList<Future<Long>>();
for (final File child : children) {
partialTotalFutures.add(service.submit(new Callable<Long>() {
public Long call() throws InterruptedException,
ExecutionException, TimeoutException {
return getTotalSizeOfFilesInDir(service, child);
}
}));
}
for (final Future<Long> partialTotalFuture : partialTotalFutures)
total += partialTotalFuture.get(100, TimeUnit.SECONDS);
}
return total;
}
private long getTotalSizeOfFile(final String fileName)
throws InterruptedException, ExecutionException, TimeoutException {
final ExecutorService service = Executors.newFixedThreadPool(100);
try {
return getTotalSizeOfFilesInDir(service, new File(fileName));
} finally {
service.shutdown();
}
}
public static void main(final String[] args)
throws InterruptedException, ExecutionException, TimeoutException {
final long start = System.nanoTime();
final long total = new NaivelyConcurrentTotalFileSize()
.getTotalSizeOfFile(args[0]);
final long end = System.nanoTime();
System.out.println("Total Size: " + total);
System.out.println("Time taken: " + (end - start) / 1.0e9);
}
}
```
### 2.3 简单并发版本的问题
简单并发版本存在问题,如在计算`/etc`目录大小时,速度比顺序版本还慢,计算`/usr`目录大小时还出现超时问题。这是因为`getTotalSizeOfFilesInDir()`方法会阻塞线程池,导致潜在的“池诱导死锁”。
### 2.4 改进的并发计算文件大小示例
```java
import java.io.File;
import java.util.*;
import java.util.concurrent.*;
public class ConcurrentTotalFileSize {
class SubDirectoriesAndSize {
final public long size;
final public List<File> subDirectories;
public SubDirectoriesAndSize(
final long totalSize, final List<File> theSubDirs) {
size = totalSize;
subDirectories = Collections.unmodifiableList(theSubDirs);
}
}
private SubDirectoriesAndSize getTotalAndSubDirs(final File file) {
long total = 0;
final List<File> subDirectories = new ArrayList<File>();
if (file.isDirectory()) {
final File[] children = file.listFiles();
if (children != null)
for (final File child : children) {
if (child.isFile())
total += child.length();
else
subDirectories.add(child);
}
}
return new SubDirectoriesAndSize(total, subDirectories);
}
private long getTotalSizeOfFilesInDir(final File file)
throws InterruptedException, ExecutionException, TimeoutException {
final ExecutorService service = Executors.newFixedThreadPool(100);
try {
long total = 0;
final List<File> directories = new ArrayList<File>();
directories.add(file);
while (!directories.isEmpty()) {
final List<Future<SubDirectoriesAndSize>> partialResults =
new ArrayList<Future<SubDirectoriesAndSize>>();
for (final File directory : directories) {
partialResults.add(
service.submit(new Callable<SubDirectoriesAndSize>() {
public SubDirectoriesAndSize call() {
return getTotalAndSubDirs(directory);
}
}));
}
directories.clear();
for (final Future<SubDirectoriesAndSize> partialResultFuture :
partialResults) {
final SubDirectoriesAndSize subDirectoriesAndSize =
partialResultFuture.get(100, TimeUnit.SECONDS);
directories.addAll(subDirectoriesAndSize.subDirectories);
total += subDirectoriesAndSize.size;
}
}
return total;
} finally {
service.shutdown();
}
}
public static void main(final String[] args)
throws InterruptedException, ExecutionException, TimeoutException {
final long start = System.nanoTime();
final long total = new ConcurrentTotalFileSize()
.getTotalSizeOfFilesInDir(new File(args[0]));
final long end = System.nanoTime();
System.out.println("Total Size: " + total);
System.out.println("Time taken: " + (end - start) / 1.0e9);
}
}
```
### 2.5 改进版本的优势
改进的并发版本解决了简单并发版本的问题,计算`/usr`目录大小时,时间从顺序版本的 18 秒多缩短到约 8 秒。它通过让每个任务返回子目录列表,避免长时间持有线程,设计更合理。
### 2.6 线程协调流程
```mermaid
graph TD;
A[开始] --> B[拆分问题为多个任务];
B --> C[在线程池中调度任务];
C --> D[等待任务结果];
D --> E[汇总结果];
E --> F[结束];
```
## 3. 使用 CountDownLatch 进行线程协调
虽然`Future`在获取任务结果和协调线程方面有一定作用,但对于无返回结果的任务,它作为协调工具并不理想。而`CountDownLatch`可以在这种情况下发挥作用。
### 3.1 简单并发代码的问题与改进思路
`NaivelyConcurrentTotalFileSize`代码简单,但存在每个线程等待其调度任务完成的问题。为了保持代码简单并使其正常工作,我们可以让每个线程更新一个共享变量,同时使用`CountDownLatch`来确保主线程等待所有子目录被访问。
### 3.2 使用 CountDownLatch 的并发计算文件大小示例
```java
import java.io.File;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class ConcurrentTotalFileSizeWLatch {
private ExecutorService service;
final private AtomicLong pendingFileVisits = new AtomicLong();
final private AtomicLong totalSize = new AtomicLong();
final private CountDownLatch latch = new CountDownLatch(1);
private void updateTotalSizeOfFilesInDir(final File file) {
long fileSize = 0;
if (file.isFile())
fileSize = file.length();
else {
final File[] children = file.listFiles();
if (children != null) {
for (final File child : children) {
if (child.isFile())
fileSize += child.length();
else {
pendingFileVisits.incrementAndGet();
service.execute(new Runnable() {
public void run() {
updateTotalSizeOfFilesInDir(child);
}
});
}
}
}
}
totalSize.addAndGet(fileSize);
if (pendingFileVisits.decrementAndGet() == 0)
latch.countDown();
}
private long getTotalSizeOfFile(final String fileName)
throws InterruptedException {
service = Executors.newFixedThreadPool(100);
pendingFileVisits.incrementAndGet();
try {
updateTotalSizeOfFilesInDir(new File(fileName));
latch.await(100, TimeUnit.SECONDS);
return totalSize.longValue();
} finally {
service.shutdown();
}
}
public static void main(final String[] args)
throws InterruptedException {
final long start = System.nanoTime();
final long total = new ConcurrentTotalFileSizeWLatch()
.getTotalSizeOfFile(args[0]);
final long end = System.nanoTime();
System.out.println("Total Size: " + total);
System.out.println("Time taken: " + (end - start) / 1.0e9);
}
}
```
### 3.3 代码说明
- `pendingFileVisits`:用于记录仍需访问的文件数量。
- `totalSize`:用于记录文件的总大小,使用`AtomicLong`确保线程安全。
- `latch`:用于主线程等待所有子目录被访问。
### 3.4 执行流程
| 步骤 | 操作 |
| ---- | ---- |
| 1 | 创建线程池和`CountDownLatch`。 |
| 2 | 启动任务,更新`pendingFileVisits`和`totalSize`。 |
| 3 | 当`pendingFileVisits`为 0 时,调用`latch.countDown()`。 |
| 4 | 主线程等待`latch`释放。 |
| 5 | 关闭线程池。 |
### 3.5 使用 CountDownLatch 的线程协调流程
```mermaid
graph TD;
A[开始] --> B[初始化 CountDownLatch 和线程池];
B --> C[启动任务并更新共享变量];
C --> D{所有任务完成?};
D -- 否 --> C;
D -- 是 --> E[释放 CountDownLatch];
E --> F[主线程继续执行];
F --> G[关闭线程池];
G --> H[结束];
```
## 4. 总结
### 4.1 不同方法的对比
| 方法 | 优点 | 缺点 | 适用场景 |
| ---- | ---- | ---- | ---- |
| 顺序计算 | 代码简单 | 速度慢 | 目录结构简单,文件数量少 |
| 简单并发计算 | 尝试并发处理 | 可能导致线程池阻塞和超时问题 | 不适用复杂目录结构 |
| 改进的并发计算 | 避免线程长时间阻塞,性能提升 | 代码复杂度增加 | 复杂目录结构 |
| 使用 CountDownLatch 的并发计算 | 代码简单,能正常工作 | 引入共享变量 | 任务无返回结果,需要协调线程 |
### 4.2 关键要点回顾
- 使用`ExecutorService`管理线程池,可配置线程池类型和任务等待队列大小。
- 任务调度可使用`execute()`、`submit()`、`invokeAll()`和`invokeAny()`方法。
- 线程协调可使用`Future`和`CountDownLatch`。
- 设计时应关注任务完成,而非线程或服务的终止。
### 4.3 实践建议
在实际应用中,应根据具体场景选择合适的方法。对于简单问题,可使用顺序计算;对于复杂问题,可考虑使用改进的并发计算或使用`CountDownLatch`的并发计算。同时,要注意线程安全和资源管理,避免出现死锁等问题。
0
0
复制全文
相关推荐










