任务描述 利用Spark数据清洗 根据提供的高校学生基本数据的数据源,我们共有三类数据: 学生个人信息; 学生学习信息; 学生消费情况。 这里我们主要对学生个人信息进行清洗,清洗规则如下: 判断数据字段是否完整,如有不完整字段(字段值为空),则清洗掉这一行数据; 对数据字段中的身份证号(idcard)、手机号(phone)进行数据脱敏,规则如下: 身份证号只保留前两位和后两位字符,其余字符用“*”代替(例如:14**************10); 手机号只保留前三位和后四位字符,其余字符用“*”代替(例如:182****4578)。 将数据集中的出生日期(brith)转换为 “yyyy-MM-dd”格式; 清洗完的数据集存储到 /root/files 目录下,保留第一行的字段名。 数据集介绍 本数据集是一所学校的学生个人信息,包含七个字段的信息,数据集的字段含义说明如下: 字段名 说明 id 学号 name 姓名 birth 出生日期 address 地址 idcard 身份证号 phone 手机号 email 邮箱 部分数据展示 id,name,birth,sex,address,idcard,phone,email 514,任秀珍,20000513,女,新疆维吾尔自治区巢湖县永川惠州路q座 286547,520327200005138864,18735861357,[email protected] 515,杨琳,20000929,男,黑龙江省天津市朝阳辛集街b座 405596,430901200009298571,,[email protected] 516,何建,20000106,男,天津市金凤市怀柔王街y座 815599,610426200001066999,15012396795,[email protected] 清洗后数据展示: id,name,birth,sex,address,idcard,phone,email 514,任秀珍,2000-05-13,女,新疆维吾尔自治区巢湖县永川惠州路q座 286547,52**************64,187****1357,[email protected] 516,何建,2000-01-06,男,天津市金凤市怀柔王街y座 815599,61**************99,150****6795,[email protected]
时间: 2025-04-16 13:32:34 浏览: 42
<think>好的,我现在需要帮用户解决使用Spark进行数据清洗的问题。用户的需求包括删除空字段行、身份证和手机号脱敏,以及转换出生日期格式,最后存储到指定目录。首先,我要理清每个步骤的具体实现方法。
首先,删除包含空字段的数据行。根据引用[2]中的处理方式,当缺失值超过一定数量时剔除条目。用户这里要求删除任何有空字段的行,所以需要检查每一行是否有空值。在Spark中,可以用`drop`方法,但默认是删除所有列都为空的,需要调整参数。应该使用`how="any"`来删除任何列存在空值的行。这点需要注意,避免误解。
接下来是脱敏处理。根据引用[3],数据脱敏常用可逆加密算法,但用户可能只需要简单的脱敏,比如部分替换。身份证号通常保留前几位和末几位,中间用星号代替。手机号同理,比如保留前3后4。这里需要考虑如何用Spark的UDF来实现。UDF需要接收字符串,处理后再返回脱敏后的字符串。同时,需要确保字段存在且不为空,否则脱敏可能会出错,所以可能需要先处理缺失值再进行脱敏。
然后是出生日期的格式转换。用户要求转为'yyyy-MM-dd'格式。原始数据可能有不同的日期格式,比如'yyyyMMdd'或者带有斜杠的格式。需要使用Spark的日期函数,比如`to_date`和`date_format`。可能需要先转换字符串到日期类型,再格式化为目标字符串。如果原始格式不标准,可能需要用`unix_timestamp`指定格式进行转换,比如`unix_timestamp(col, 'yyyy/MM/dd')`再转为日期。
关于数据存储,用户要求保存到/root/files目录下。但Spark运行在集群上时,可能需要指定HDFS路径,或者确保该目录在本地可写。需要注意文件系统的权限问题,以及输出格式,比如使用`coalesce`控制文件数量,或者以覆盖模式写入。
现在需要考虑代码结构。首先读取数据源,假设是CSV文件,需要正确解析。然后应用过滤条件删除空行,接着应用UDF进行脱敏,再转换日期格式,最后写入结果。可能需要定义UDF函数,并在DataFrame的对应列上应用。
另外,引用[2]中提到编写Spark程序处理缺失值,并输出剔除数量,这里用户需要类似的处理,所以在过滤后应该计算删除的行数,可能需要通过`count`前后对比,或者直接过滤后记录日志。
对于脱敏部分,引用[3]强调使用UDF函数,所以需要注册UDF,并在SQL表达式或DataFrame操作中使用。比如,用`withColumn`来替换原有列的值。
日期转换时,可能存在无效日期,需要考虑异常处理,比如转换为null,并在后续步骤中过滤掉。但用户可能希望保留这些数据,或者根据业务需求处理。这里可能需要假设日期格式是可解析的,否则可能需要额外的处理步骤。
最后存储时,指定路径为/root/files,使用`write.save`方法,格式可能是parquet或csv,用户可能需要指定格式,比如`option("header", "true")`。需要确认输出目录是否存在,否则覆盖或报错。
总结步骤:
1. 读取数据,正确解析CSV。
2. 过滤掉任何列存在空值的行,并记录删除数量。
3. 对身份证和手机号应用脱敏UDF。
4. 转换出生日期格式,处理可能的格式问题。
5. 写入结果到指定路径,处理存储配置。
可能需要处理的问题包括:数据读取时的schema推断是否正确,脱敏函数的具体实现(比如身份证长度是否正确),日期解析的格式是否正确,以及输出目录的权限问题。需要确保代码在Spark环境中能够运行,比如正确初始化SparkSession,管理依赖等。</think>以下是通过Spark实现高校学生信息数据清洗的完整方案,包含代码示例和分步说明:
### 一、数据清洗流程设计
1. **数据读取**:从源文件创建DataFrame
2. **字段完整性检查**:删除包含空值的记录[^2]
3. **数据脱敏处理**:使用UDF函数对敏感字段加密[^3]
4. **日期格式转换**:使用Spark日期函数统一格式
5. **结果存储**:保存清洗后的数据到指定目录
### 二、完整Spark代码实现
```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,col,date_format,to_date
from pyspark.sql.types import StringType
# 初始化SparkSession
spark = SparkSession.builder.appName("StudentDataCleaning").getOrCreate()
# 1. 数据读取
df = spark.read.csv("/path/to/student_data.csv", header=True, inferSchema=True)
# 2. 删除空字段行(完整字段校验)
clean_df = df.dropna(how="any")
print(f"删除空值记录数:{df.count() - clean_df.count()}")
# 3. 定义脱敏UDF函数
def desensitize_id(id_num):
return id_num[:3] + "*"*12 + id_num[-3:] if id_num else None
def desensitize_phone(phone):
return phone[:3] + "****" + phone[-4:] if phone else None
# 注册UDF
desensitize_id_udf = udf(desensitize_id, StringType())
desensitize_phone_udf = udf(desensitize_phone, StringType())
# 应用脱敏处理
processed_df = clean_df.withColumn("id_card", desensitize_id_udf(col("id_card"))) \
.withColumn("phone", desensitize_phone_udf(col("phone")))
# 4. 日期格式转换(支持多种原始格式)
final_df = processed_df.withColumn("birthday",
date_format(
to_date(col("birthday"), "yyyyMMdd"), # 原始格式示例
"yyyy-MM-dd"
))
# 5. 结果存储
final_df.coalesce(1).write.mode("overwrite") \
.option("header", "true") \
.csv("/root/files/cleaned_student_data")
spark.stop()
```
### 三、关键步骤说明
1. **字段完整性检查**:
- 使用`dropna(how="any")`删除任何字段为空的行
- 通过`count()`对比统计删除记录数[^2]
2. **数据脱敏实现**:
- 身份证号保留前三后三位:$id\_num[:3] + "*"*12 + id\_num[-3:]$
- 手机号保留前三后四位:$phone[:3] + "****" + phone[-4:]$
- 使用UDF保证处理逻辑可扩展[^3]
3. **日期格式转换**:
- 支持多种原始格式输入:`yyyyMMdd`、`yyyy/MM/dd`等
- 统一转换为标准格式:`yyyy-MM-dd`
- 使用Spark内置日期函数保证处理效率
4. **结果存储优化**:
- `coalesce(1)`合并为单个输出文件
- `mode("overwrite")`覆盖已有数据
- 输出CSV格式带表头
阅读全文
相关推荐


















