spark分析hadoop的CSV文件
时间: 2025-07-06 22:48:51 浏览: 8
### 使用 Spark 分析 Hadoop 中的 CSV 文件
为了利用 Spark 处理存储于 Hadoop 上的 CSV 数据,可以采用 DataFrame API 方式创建 SparkSession 并加载数据。具体实现如下所示:
```scala
// 创建 SparkSession 实例并配置运行环境参数
val spark = SparkSession.builder()
.master("yarn") // 配置 master URL 为 YARN 来连接到 Hadoop集群
.appName("Spark CSV Analysis on Hadoop")
.enableHiveSupport() // 如果需要 Hive 支持则启用此功能
.getOrCreate()
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
// 获取 FileSystem 对象以便操作 HDFS 路径
val hdfsPath = new Path("/path/to/csvfile.csv")
val hadoopConfig = new Configuration()
hadoopConfig.set("fs.defaultFS", "hdfs://namenode:8020") // 设置 NameNode 地址
val fs = FileSystem.get(hadoopConfig)
if (fs.exists(hdfsPath)) {
println(s"${hdfsPath.toString} exists.")
// 加载 CSV 文件至 DataFrame
val df = spark.read.format("com.databricks.spark.csv")
.option("header", "true") // 设定首行为列名
.option("inferSchema", "true") // 自动推断字段类型
.load(hdfsPath.toString())
// 显示前几条记录用于验证读取成功与否
df.show(5)
}
else{
println(s"${hdfsPath.toString} does not exist!")
}
spark.stop()
```
上述代码片段展示了如何通过设置 `master` 参数指向 YARN 来启动 Spark 应用程序,并指定应用程序名称以及是否开启 Hive 功能[^1]。
接着定义了一个路径对象代表目标 CSV 文件的位置,在获取到了分布式文件系统的实例之后就可以判断该位置是否存在有效文件了;如果存在,则继续执行后续逻辑——即调用 `read()` 方法配合相应选项完成实际的数据导入工作[^3]。
最后展示部分样本数据作为测试结果的一部分,确保整个流程按预期正常运作完毕后关闭当前会话资源释放占用端口等清理动作。
阅读全文
相关推荐


















