import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ object DataAnalysis { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("DataAnalysis") .master("local[*]") .getOrCreate() // 假设数据存储在一个文本文件中,每行数据格式为:id, user_id, url, some_value, session_id, access_time val data = spark.read .option("header", false) .option("inferSchema", true) .csv("your_data_file.csv") // 列名重命名 val renamedData = data.toDF("id", "user_id", "url", "some_value", "session_id", "access_time") // (1)过滤出访问次数在50次以上的用户记录并持久化到内存 val filteredData = renamedData.groupBy("user_id").count() .filter($"count" > 50) .join(renamedData, "user_id") .cache() // (2)统计访问50次以上的用户主要访问的前5类网页 val top5Webpages = filteredData.groupBy("url").count() .orderBy(desc("count")) .limit(5) // (3)合并部分网页URL后面带有_1、_2字样的翻页网址,统一为一个网址 val mergedData = filteredData.withColumn("url", regexp_replace($"url", "_[12]", "")) // (4)根据访问时间加入对应时段,统计所有用户各时段访问情况 val dataWithPeriod = mergedData.withColumn("period", when( hour($"access_time") >= 6 && hour($"access_time") < 11, "上午") .when(hour($"access_time") >= 11 && hour($"access_time") < 14, "中午") .when(hour($"access_time") >= 14 && hour($"access_time") < 17, "下午") .when(hour($"access_time") >= 17 && hour($"access_time") < 19, "傍晚") .when(hour($"access_time") >= 19 && hour($"access_time") < 23, "晚上") .otherwise("深夜")) val periodAccessCount = dataWithPeriod.groupBy("user_id", "period").count() // 打印结果 println("过滤出访问次数在50次以上的用户记录:") filteredData.show() println("访问50次以上的用户主要访问的前5类网页:") top5Webpages.show() println("合并翻页网址后的数据:") mergedData.show() println("所有用户各时段访问情况:") periodAccessCount.show() spark.stop() } },请给出结果图片
时间: 2025-06-26 11:15:21 浏览: 13
很抱歉,作为文字型AI模型,我无法直接生成或提供代码运行的结果图片。但我可以详细为您解析上述Spark程序的功能,并帮助您理解每个步骤的作用以及预期的输出内容。
以下是代码功能的具体解释:
---
### **代码功能分析**
#### 1. 创建`SparkSession`
```scala
val spark = SparkSession.builder()
.appName("DataAnalysis")
.master("local[*]")
.getOrCreate()
```
- 这段代码创建了一个名为“DataAnalysis”的`SparkSession`实例。
- 使用本地模式(`local[*]`)启动Spark应用程序,适用于单机测试环境。
---
#### 2. 加载CSV数据并重命名列名
```scala
val data = spark.read
.option("header", false) // 没有标题行,默认为false
.option("inferSchema", true) // 自动推断数据类型
.csv("your_data_file.csv")
val renamedData = data.toDF("id", "user_id", "url", "some_value", "session_id", "access_time")
```
- `spark.read.option(...).csv()`读取指定路径下的CSV文件。
- 将默认列名称替换为更具描述性的名称:`id`, `user_id`, `url`, `some_value`, `session_id`, 和 `access_time`。
---
#### 3. 筛选出访问次数超过50次的用户记录
```scala
val filteredData = renamedData.groupBy("user_id").count()
.filter($"count" > 50)
.join(renamedData, "user_id")
.cache()
```
- 首先对`user_id`分组计算其总访问次数 (`groupBy("user_id").count()`).
- 再筛选出那些访问次数大于50的用户记录.
- 最后将筛选结果与原始DataFrame通过`user_id`连接在一起,并缓存到内存中以便后续操作。
**注意**: 此处可能会返回一个较大的数据集,取决于实际输入数据规模.
---
#### 4. 统计这些高频率用户的前五大访问网站类别
```scala
val top5Webpages = filteredData.groupBy("url").count()
.orderBy(desc("count"))
.limit(5)
```
- 对`filteredData`按`url`字段进行聚合统计访问量。
- 根据访问量降序排序 (`.orderBy(desc("count"))`) 并限制只展示前五个最受欢迎的页面地址。
---
#### 5. 合并带有 `_1/_2` 的翻页网址为统一形式
```scala
val mergedData = filteredData.withColumn("url", regexp_replace($"url", "_[12]", ""))
```
- 如果某些URL包含类似`example.com_page_1` 或者 `example.com_page_2`这样的尾缀,则将其规范化去掉后缀成为通用形式如`example.com_page`.
---
#### 6. 添加时间段分类及统计各时段访问分布情况
```scala
val dataWithPeriod = mergedData.withColumn("period", when(
hour($"access_time") >= 6 && hour($"access_time") < 11, "上午")
.when(hour($"access_time") >= 11 && hour($"access_time") < 14, "中午")
.when(hour($"access_time") >= 14 && hour($"access_time") < 17, "下午")
.when(hour($"access_time") >= 17 && hour($"access_time") < 19, "傍晚")
.when(hour($"access_time") >= 19 && hour($"access_time") < 23, "晚上")
.otherwise("深夜"))
val periodAccessCount = dataWithPeriod.groupBy("user_id", "period").count()
```
- 新增一列为“period”,表示一天的不同阶段划分标准分别为:“上午”、“中午”、“下午”等共六个区间。
- 计算每位用户各个时间段内的具体访问频数汇总表。
---
### **最终打印**
四个关键处理环节的数据均显示出来供查看验证:
```scala
println("过滤出访问次数在50次以上的用户记录:")
filteredData.show()
println("访问50次以上的用户主要访问的前5类网页:")
top5Webpages.show()
println("合并翻页网址后的数据:")
mergedData.show()
println("所有用户各时段访问情况:")
periodAccessCount.show()
```
---
由于本平台暂时不能上传截图,请自行运行此脚本来获取可视化效果!您可以复制粘贴至支持Scala语言编写的IDE工具内完成调试过程观察成果!
---
###
阅读全文
相关推荐


















