将下列json数据复制到你的ubuntu系统/usr/local/spark下,并保存命名为employee.json { "id":1 , "name":" Ella" , "age":36 } { "id":2, "name":"Bob","age":29 } { "id":3 , "name":"Jack","age":29 } { "id":4 , "name":"Jim","age":28 } { "id":4 , "name":"Jim","age":28 } { "id":5 , "name":"Damon" } { "id":5 , "name":"Damon" } 首先读取employee.json创建DataFrame,并满足以下要求 进行数据分析。(要求:pycharm中进行编写程序) (1)查询所有数据,并去除重复的数据 (2)筛选age>20的记录 (3)将数据按age分组,统计每个年龄员工的数量 (4)查询所有记录的name列,并为其取别名为username (5)查询年龄age的平均值,最大值
时间: 2025-05-21 08:38:54 浏览: 19
### 创建 `employee.json` 文件
在 Ubuntu 的 `/usr/local/spark` 目录下创建 `employee.json` 文件,可以使用以下命令:
```bash
sudo nano /usr/local/spark/employee.json
```
将如下 JSON 数据写入文件中作为示例数据:
```json
[
{"id": 1, "name": "Ella", "age": 36},
{"id": 2, "name": "John", "age": 28},
{"id": 3, "name": "Anna", "age": 25},
{"id": 4, "name": "Mike", "age": 36},
{"id": 5, "name": "Lily", "age": 22},
{"id": 6, "name": "Ella", "age": 36}
]
```
保存并退出。
---
### PyCharm 中编写 Spark 程序
以下是完整的 Python 脚本代码,在 PyCharm 中运行该脚本来完成所需的任务:
#### 初始化 SparkSession 和加载 JSON 文件
```python
from pyspark.sql import SparkSession
# 初始化 SparkSession
spark = SparkSession.builder \
.appName("Employee Analysis") \
.getOrCreate()
# 加载 employee.json 文件为 DataFrame
df_employee = spark.read.json("/usr/local/spark/employee.json")
# 打印原始 DataFrame 结构
df_employee.printSchema()
df_employee.show(truncate=False)
```
#### 去重
去除重复数据可以通过 `dropDuplicates()` 方法实现:
```python
# 去除重复记录
df_no_duplicates = df_employee.dropDuplicates() # 默认基于所有列去重
print("After removing duplicates:")
df_no_duplicates.show(truncate=False)
```
#### 筛选 age 大于 20 的记录
筛选条件可通过 `filter()` 或 `where()` 方法指定:
```python
# 筛选出 age > 20 的记录
df_filtered = df_no_duplicates.filter(df_no_duplicates["age"] > 20)
print("Filtered records where age > 20:")
df_filtered.show(truncate=False)
```
#### 按 age 分组统计每个年龄的员工数量
分组统计通过 `groupBy()` 和聚合函数 `count()` 实现:
```python
# 统计每个年龄段的人数
df_grouped_by_age = df_filtered.groupBy("age").count()
print("Group by age and count employees:")
df_grouped_by_age.show(truncate=False)
```
#### 设置 name 列别名为 username
设置别名通过 `alias()` 方法或 SQL 表达式的 `AS` 关键字实现:
```python
# 查询 name 列并设置别名为 username
df_username_alias = df_filtered.select(df_filtered["name"].alias("username"))
print("Name column with alias 'username':")
df_username_alias.show(truncate=False)
```
#### 计算 age 列的平均值和最大值
计算平均值和最大值分别使用 `avg()` 和 `max()` 聚合函数:
```python
from pyspark.sql.functions import avg, max
# 计算 age 列的平均值和最大值
df_stats = df_filtered.agg(avg("age").alias("average_age"), max("age").alias("maximum_age"))
print("Statistics of the 'age' column:")
df_stats.show(truncate=False)
```
---
### 完整流程总结
以上代码实现了从读取 JSON 文件到执行各种分析操作的过程。每一步均提供了清晰的结果展示,便于验证中间状态是否正确[^1]。
---
###
阅读全文
相关推荐
















