PySpark 算子简单案例头歌
时间: 2025-03-21 09:15:36 浏览: 68
### PySpark 算子的简单案例及教程
以下是关于 PySpark 中常用算子的一些基本示例及其功能说明:
#### RDD 基本操作
RDD 是 PySpark 的核心抽象之一,支持多种转换 (Transformation) 和动作 (Action) 操作。
#### 转换操作 (Transformations)
转换操作会返回一个新的 RDD,而不会立即执行计算。这些操作只有在遇到 Action 时才会触发实际计算。
- **map()**: 对每个元素应用函数并返回新的 RDD。
```python
rdd = sc.parallelize([1, 2, 3, 4])
mapped_rdd = rdd.map(lambda x: x * 2)
print(mapped_rdd.collect()) # 输出 [2, 4, 6, 8][^1]
```
- **filter()**: 过滤掉不符合条件的数据。
```python
filtered_rdd = rdd.filter(lambda x: x % 2 == 0)
print(filtered_rdd.collect()) # 输出 [2, 4]
```
- **flatMap()**: 类似于 map,但是可以返回多个结果。
```python
flat_mapped_rdd = rdd.flatMap(lambda x: range(1, x))
print(flat_mapped_rdd.collect()) # 输出 [1, 1, 2, 1, 2, 3]
```
#### 动作操作 (Actions)
动作操作会对 RDD 执行计算,并返回具体的结果或者将数据保存到外部存储中。
- **collect()**: 将整个 RDD 收集到驱动程序上作为列表。
```python
collected_data = rdd.collect()
print(collected_data) # 输出 [1, 2, 3, 4][^1]
```
- **take(n)**: 返回前 n 条记录。
```python
taken_data = rdd.take(2)
print(taken_data) # 输出 [1, 2][^1]
```
- **count()**: 统计 RDD 中的元素数量。
```python
counted_elements = rdd.count()
print(counted_elements) # 输出 4
```
#### DataFrame API 示例
除了传统的 RDD 接口外,PySpark 提供了更高级别的接口——DataFrame API,它提供了类似于 SQL 查询的功能。
- 创建 DataFrame 并查询列的最大值:
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()
data = [(1, "a"), (2, "b"), (3, "c")]
columns = ["id", "value"]
df = spark.createDataFrame(data, columns)
max_value = df.selectExpr("MAX(id)").first()[0]
print(max_value) # 输出 3
```
#### 关联资源
对于更多深入学习资料,可以参考 GitHub 上类似的项目实现逻辑[^2]。
---
阅读全文
相关推荐
















