第1关:Python数据处理—使用 PySpark 处理数据框 600 学习内容 参考答案 记录 评论 任务描述 相关知识 数据框简介 PySpark 数据框处理 创建数据框 筛选数据 合并数据框 join 操作 重命名数据框 编程要求 测试说明 任务描述 本关任务:有两个班的成绩单分别保存在student1.csv和student2.csv中,请根据所给提示完成相应的操作。 相关知识 为了完成本关任务,你需要掌握如何用 Spark 处理数据框。 数据框简介 数据框是一个分布式二维数据集,在概念和关系数据库表或 R 语言中的 Data Frame 类似,但是数据框提供很多优化措施。可以由大量的方式创建,例如结构化的数据文件、Hive 表、外部数据库和现有的本地 R 数据框等。数据框通常除了数据本身还包括定义数据的元数据,比如列和行的名字。数据框可以用来处理大批量的结构化或半结构化的数据。数据框的应用编程接口支持对数据的各种处理,包括通过名字或位置查询行、列和单元格、过滤行等等。数据框支持各种各样的数据格式和数据源,它为不同的程序语言提供 API 支持,比如 Python 、 R 、Scala 等等。我们可以说数据框不是别的,就只是一种类似于 SQL 表或电子表格的二维数据结构。 数据框 PySpark 数据框处理 创建数据框 创建数据框时,可以有多种不同方式进行创建,以下介绍两种不同数据源的读取创建说明: 1. 从 csv 文件创建新的数据框 从一个 csv 文件中加载数据可以用 spark.read.csv 方法来将数据加载到一个 DataFrame 对象中。 df = spark.read.csv(path,header,inferSchema) #path为路径 #header(默认是false) 表示是否将csv文件中的第一行作为schema(读写参数) #inferSchema 表示是否支持从数据中推导出schema(只读参数) 例如,有一个名为test.csv的数据集包含以下内容: column1,column2 1,2 2,4 3,6 4,8 5,10 我们将它读入并创建成数据框可用以下代码: df1 = spark.read.csv('project/src/step1/test1.csv', header=True, inferSchema=True) df1.show() 运行结果如下: +-------+-------+ |column1|column2| +-------+-------+ | 1| 2| | 2| 4| | 3| 6| | 4| 8| | 5| 10| +-------+-------+ 2. 从 pandas_df 创建数据框 例如创建一个 4*4 的数值为随机数的数据框可以用如下语句: df = pd.DataFrame(np.random.random((4, 4))) df = spark.createDataFrame(df) 输出如下: +-------------------+------------------+--------------------+--------------------+ | 0| 1| 2| 3| +-------------------+------------------+--------------------+--------------------+ | 0.2668751247790484|0.7842122714071319| 0.8940958868923979| 0.395379865632305| | 0.9935407483893016|0.7785592206069294| 0.9466907846722169|0.050751792943087404| |0.39561337674840424|0.5613734971939374| 0.14812750520869256| 0.554849314768592| |0.14944494714704348|0.5782490430063237|0.026532625021582934| 0.9034052593020386| +-------------------+------------------+--------------------+--------------------+ 接下来介
时间: 2025-05-23 10:04:02 浏览: 38
### PySpark 数据框处理教程
以下是关于如何使用 PySpark 处理数据框的详细说明,涵盖了从 CSV 创建数据框、筛选数据、执行 join 操作以及重命名列等功能。
#### 1. 创建 SparkSession
在 PySpark 中,`SparkSession` 是入口点。它用于初始化 Spark 应用程序并与集群通信。
```python
from pyspark.sql import SparkSession
# 初始化 SparkSession
spark = SparkSession.builder \
.appName("PySpark DataFrame Tutorial") \
.getOrCreate()
```
[^1]
---
#### 2. 从 CSV 文件创建 DataFrames
可以使用 `read.csv()` 方法加载 CSV 文件到 DataFrame 中。设置参数 `header=True` 表示第一行为列名,`inferSchema=True` 自动推断数据类型。
```python
data = spark.read.option("header", "true").option("inferSchema", "true").csv("path/to/your/data.csv")
data.show()
```
[^3]
---
#### 3. 筛选数据 (Filtering)
可以通过条件表达式来过滤数据。例如,选择某一列满足特定条件的数据:
```python
filtered_data = data.filter(data["age"] > 30) # 过滤年龄大于 30 的记录
filtered_data.show()
```
或者使用 SQL 风格的字符串表示法:
```python
filtered_data = data.filter("age > 30")
filtered_data.show()
```
[^4]
---
#### 4. Join 操作
假设有两个 DataFrame:一个是用户表 (`users`),另一个是交易表 (`transactions`)。可以通过共同键(如 `user_id`)将它们连接起来。
```python
# 假设 users 和 transactions 已经定义好
joined_data = users.join(transactions, on="user_id", how="inner") # 内部连接
joined_data.show()
# 其他类型的连接
left_joined_data = users.join(transactions, on="user_id", how="left") # 左外连接
right_joined_data = users.join(transactions, on="user_id", how="right") # 右外连接
full_joined_data = users.join(transactions, on="user_id", how="outer") # 完全外部连接
```
[^4]
---
#### 5. 重命名列 (Renaming Columns)
如果需要更改某些列的名字,可以使用 `withColumnRenamed()` 方法。
```python
renamed_data = joined_data.withColumnRenamed("old_column_name", "new_column_name")
renamed_data.show()
```
[^1]
---
#### 6. 将 Pandas DataFrame 转换为 Spark DataFrame
当有现成的 Pandas DataFrame 并希望将其转换为 Spark DataFrame 时,可以直接调用 `createDataFrame()` 方法。
```python
import pandas as pd
pd_df = pd.DataFrame({
'name': ['Alice', 'Bob'],
'age': [25, 30]
})
spark_df = spark.createDataFrame(pd_df)
spark_df.show()
```
[^2]
---
#### 7. 将 Spark DataFrame 转换回 Pandas DataFrame
同样支持反向操作,即将 Spark DataFrame 转换为 Pandas DataFrame。
```python
pd_converted_df = spark_df.toPandas()
print(pd_converted_df)
```
[^2]
---
#### 8. 将 DataFrame 输出至 FTP 服务器
如果需要将结果保存到远程 FTP 服务器上,可利用第三方库完成此功能。
```python
output_df.write.format("com.springml.spark.sftp") \
.option("host", "sftpserver.com") \
.option("username", "myusername") \
.option("password", "mypassword") \
.option("fileType", "csv") \
.save("/remote/path/output.csv")
```
[^5]
---
### 总结
以上介绍了如何使用 PySpark 对数据框进行基本的操作,包括但不限于创建、筛选、合并和重命名列等内容。这些技能对于日常的大规模数据分析任务非常实用。
阅读全文
相关推荐










