本关任务:本关主题是通过读取外部数据源文本文件生成DataFrame,并利用DataFrame对象的常用Transformation操作和Action操作实现功能。已知学生信息(student)、教师信息(teacher)、课程信息(course)和成绩信息(score)如下图所示,通过Spark SQL对这些信息进行查询,分别得到需要的结果。 学生信息student.txt如下所示。 108,ZhangSan,male,1995/9/1,95033 105,KangWeiWei,female,1996/6/1,95031 107,GuiGui,male,1992/5/5,95033 101,WangFeng,male,1993/8/8,95031 106,LiuBing,female,1996/5/20,95033 109,DuBingYan,male,1995/5/21,95031 教师信息teacher.txt如下所示。 825,LinYu,male,1958,Associate professor,department of computer 804,DuMei,female,1962,Assistant professor,computer science department 888,RenLi,male,1972,Lecturer,department of electronic engneering 852,GongMOMO,female,1986,Associate professor,computer science department 864,DuanMu,male,1985,Assistant professor,department of computer 课程信息course.txt如下所示。 3-105,Introduction to computer,825 3-245,The operating system,804 6-101,Spark SQL,888 6-102,Spark,852 9-106,Scala,864 成绩信息score.txt如下所示。 108,3-105,99 105,3-105,88 107,3-105,77 相关知识 (1)创建SparkSession对象 通过SparkSession.builder()创建一个基本的SparkSession对象,并为该Spark SQL应用配置一些初始化参数,例如设置应用的名称以及通过config方法配置相关运行参数。 import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // 引入spark.implicits._,以便于RDDs和DataFrames之间的隐式转换 import spark.implicits._ (2)显性地将RDD转换为DataFrame 通过编程接口,构造一个 Schema ,然后将其应用到已存在的 RDD[Row] (将RDD[T]转化为Row对象组成的RDD),将RDD显式的转化为DataFrame。 //导入Spark SQL的data types包 import org.apache.spark.sql.types._ //导入Spark SQL的Row包 import org.apache.spark.sql.Row // 创建peopleRDD scala> val stuRDD = spark.sparkContext.textFile("读取文件路径") // schema字符串 scala> val schemaString = "name age country" //将schema字符串按空格分隔返回字符串数组,对字符串数组进行遍历,并对数组中的每一个元素进一步封装成StructField对象,进而构成了Array[StructField] scala> val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,nullable = true)) //将fields强制转换为StructType对象,形成了可用于构建DataFrame对象的Schema scala> val schema = StructType(fields) //将peopleRDD(RDD[String])转化为RDD[Rows] scala> val rowRDD = stuRDD.map(_.split(",")).map(elements => Row(elements(0),elements(1).trim,elements(2))) //将schema应用到rowRDD上,完成DataFrame的转换 scala> val stuDF = spark.createDataFrame(rowRDD,schema) (3)sql接口的使用 SparkSession提供了直接执行sql语句的SparkSession.sql(sqlText:String)方法,sql语句可直接作为字符串传入sql()方法中,sql()查询所得到的结果依然是DataFrame对象。在Spark SQL模块上直接进行sql语句的查询需要首先将结构化数据源的DataFrame对象注册成临时表,进而在sql语句中对该临时表进行查询操作。 (4)select方法 select方法用于获取指定字段值,根据传入的String类型的字段名,获取指定字段的值,以DataFrame类型返回。 (5)filter方法 filter方法按参数指定的SQL表达式的条件过滤DataFrame。 (6)where方法 where按照指定条件对数据进行过滤筛选并返回新的DataFrame。 (7)distinct方法 distinct方法用来返回对DataFrame的数据记录去重后的DataFrame。 (8)groupBy方法 使用一个或者多个指定的列对DataFrame进行分组,以便对它们执行聚合操作。 (9)agg方法 agg是一种聚合操作,该方法输入的是对于聚合操作的表达,可同时对多个列进行聚合操作,agg为DataFrame提供数据列不需要经过分组就可以执行统计操作,也可以与groupBy法配合使用。 (10)orderBy方法 按照给定的表达式对指定的一列或者多列进行排序,返回一个新的DataFrame,输入参数为多个Column类。 编程要求 根据提示,在右侧编辑器补充代码,完成功能的实现。 测试说明 平台会对你编写的代码进行测试: 预期输出: |Sno| Sname| Ssex|Sbirthday|SClass| |101| WangFeng| male| 1993/8/8| 95031| |105|KangWeiWei|female| 1996/6/1| 95031| |106| LiuBing|female|1996/5/20| 95033| |107| GuiGui| male| 1992/5/5| 95033| |108| ZhangSan| male| 1995/9/1| 95033| |109| DuBingYan| male|1995/5/21| 95031| |tname |prof | |DuMei |Assistant professor| |DuanMu |Assistant professor| |GongMOMO|Associate professor| |LinYu |Associate professor| |RenLi |Lecturer | |Tno|Tname |Tsex |Tyear|Prof |Depart | |804|DuMei |female|1962 |Assistant professor|computer science department| |852|GongMOMO|female|1986 |Associate professor|computer science department| |Depart | |department of computer | |computer science department | |department of electronic engneering| |max(Degree)| | 100| | Cno| avg(Degree)| |3-105| 88.0| |3-245| 83.0| |6-101| 74.0| |6-102|87.66666666666667| |9-106| 85.0| 开始你的任务吧,祝你成功!
时间: 2025-06-02 17:55:45 浏览: 26
### 使用Spark SQL处理学生、教师、课程和成绩信息
为了完成此任务,可以通过以下方式实现:
#### 1. 创建SparkSession
首先需要创建一个 `SparkSession` 实例来初始化 Spark SQL 的环境。这是进入 DataFrame API 和 Spark SQL 功能的核心入口。
```scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Student Teacher Course Score Analysis")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// 引入 implicits 隐式转换支持
import spark.implicits._
```
此处引入了 `implicits._` 来启用 Scala 中的隐式转换功能,从而简化从 RDD 到 DataFrame 的转换过程[^4]。
---
#### 2. 加载文本文件为DataFrame
假设 `student.txt`, `teacher.txt`, `course.txt`, 和 `score.txt` 文件已经存在,并且每行数据是以逗号分隔的形式存储,则可以使用 `spark.read.csv()` 方法加载这些文件并指定其结构。
```scala
// 假设 student.txt 结构为 (id, name, age)
val studentDF = spark.read.format("csv").option("header", "true").load("path/to/student.txt")
// 假设 teacher.txt 结构为 (tid, tname, department)
val teacherDF = spark.read.format("csv").option("header", "true").load("path/to/teacher.txt")
// 假设 course.txt 结构为 (cid, cname, tid)
val courseDF = spark.read.format("csv").option("header", "true").load("path/to/course.txt")
// 假设 score.txt 结构为 (sid, cid, grade)
val scoreDF = spark.read.format("csv").option("header", "true").load("path/to/score.txt")
```
这里我们指定了 CSV 文件中的列名作为表头 (`header=true`) 并加载它们成为 DataFrames[^1]。
---
#### 3. 注册临时视图供SQL查询
为了让 Spark SQL 能够直接对 DataFrame 进行查询操作,需先将每个 DataFrame 注册成一张临时表。
```scala
studentDF.createOrReplaceTempView("students")
teacherDF.createOrReplaceTempView("teachers")
courseDF.createOrReplaceTempView("courses")
scoreDF.createOrReplaceTempView("scores")
```
这一步骤允许后续通过标准 SQL 查询语法访问这些表格的数据[^3]。
---
#### 4. 执行SQL查询
现在可以根据需求编写复杂的 SQL 查询语句来进行数据分析。例如,如果要获取每位学生的总分数以及平均分,可执行如下查询:
```sql
val resultDF = spark.sql("""
SELECT s.id AS sid,
s.name AS sname,
SUM(sc.grade) AS total_score,
AVG(sc.grade) AS avg_score
FROM students s
JOIN scores sc ON s.id = sc.sid
GROUP BY s.id, s.name
""")
resultDF.show()
```
上述代码片段展示了如何基于两个表(`students` 和 `scores`)进行连接(join),并对学生成绩求和与计算均值的操作。
---
#### 5. Transformation与Action操作
除了 SQL 方式外,还可以采用 DataFrame DSL(Domain Specific Language)形式完成同样的逻辑运算。比如筛选出所有成绩大于等于90的学生记录:
```scala
val highScoreStudents = scoreDF.filter($"grade" >= 90).join(studentDF, $"sid" === studentDF("id"))
highScoreStudents.show()
```
这里的 `.filter()` 是一种 transformation 操作;而最终调用 `.show()` 属于 action 类型的动作[^1]。
---
#### 总结
以上步骤涵盖了从加载原始数据至实施具体业务逻辑的过程,充分体现了 Spark SQL 在大规模分布式环境下高效处理复杂关系型数据的能力。
阅读全文
相关推荐


















