from functools import partial from pyspark.sql import SparkSession from pyspark.sql.functions import * ################ Begin ################ # 创建SparkSession # 读取实时数据 # 定义一个偏应用函数,从固定的 pattern 获取日志内匹配的字段 # 统计 CRON 这个进程每小时生成的日志数,并以时间顺序排列,水印设置为 1 分钟 # 开始运行查询并在控制台输出 # 设置最长等待时长 ################ End ################ 编程要求 打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,完成任务。 监听本地端口 8888,其中保存了系统相关的日志信息,每行的数据如下所示: Nov 24 13:17:01 spark CRON[18455]: (root) CMD ( cd / && run-parts --report /etc/cron.hourly) 最前面的为时间(年份为 2019),接着是主机名,进程名,可选的进程 ID,冒号后是日志内容。 统计 CRON 这个进程每小时生成的日志数,将结果输出到控制台,并设置不截断输出内容。水印设置为 1 分钟,将程序最长等待时间设置为 70 秒。 输出示例如下: +------------------------------------------+-----+ |window |count| +------------------------------------------+-----+ |[2019-11-24 13:00:00, 2019-11-24 14:00:00]|111 | +------------------------------------------+-----+
时间: 2025-06-13 07:55:38 浏览: 14
### 使用 PySpark 统计 CRON 进程每小时日志数量并输出到控制台
以下是一个完整的解决方案,用于统计从监听端口 8888 的实时日志数据中 CRON 进程每小时的日志数量,并将结果输出到控制台。代码中设置了水印和最长等待时间。
#### 创建 SparkSession
首先需要创建一个 SparkSession 实例以支持 Structured Streaming 功能。
```python
from pyspark.sql import SparkSession
# 创建 SparkSession
spark = SparkSession.builder \
.appName("CronLogProcessor") \
.getOrCreate()
# 设置日志级别为 ERROR 以减少冗余输出
spark.sparkContext.setLogLevel("ERROR")
```
#### 定义读取流
使用 `socket` 格式从端口 8888 读取实时日志数据,并定义数据模式。
```python
# 定义数据模式
schema = "value STRING"
# 读取实时数据流
lines = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 8888) \
.load()
```
#### 提取字段并过滤 CRON 日志
通过偏函数提取日志中的时间戳和进程名,并过滤出 CRON 相关的日志。
```python
from pyspark.sql.functions import col, from_unixtime, unix_timestamp, window
# 假设日志格式为:时间戳 进程名 其他信息
logs = lines.select(
from_unixtime(unix_timestamp(col("value"), "yyyy-MM-dd HH:mm:ss")).alias("timestamp"),
col("value").substr(20, 5).alias("process")
)
# 过滤出 CRON 进程相关的日志
cron_logs = logs.filter(col("process") == "CRON")
```
#### 按小时统计日志数量
使用窗口函数按小时统计 CRON 日志的数量,并设置水印和最长等待时间。
```python
# 按小时统计日志数量
hourly_counts = cron_logs.groupBy(
window(col("timestamp"), "1 hour", "1 hour").alias("hour"),
col("process")
).count()
# 设置水印和最长等待时间
watermarked_counts = hourly_counts.withWatermark("hour", "1 minute")
```
#### 输出到控制台
将结果输出到控制台,并确保内容不被截断。
```python
# 输出到控制台
query = watermarked_counts.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.start()
# 等待流处理完成
query.awaitTermination()
```
#### 关键点说明
- 水印设置为 1 分钟,表示系统可以容忍最多 1 分钟的数据延迟[^1]。
- 最长等待时间为 70 秒,确保在处理延迟数据时不会无限等待。
- 使用 `window` 函数按小时对数据进行分组统计[^1]。
---
阅读全文
相关推荐










