pyspark代码语法
时间: 2025-03-21 17:11:12 浏览: 33
### PySpark代码语法示例及说明
以下是PySpark的一些基本代码语法及其解释:
#### 初始化SparkSession
在PySpark中,`SparkSession` 是入口点,用于创建DataFrame并执行SQL查询。
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Example") \
.getOrCreate()
```
上述代码通过 `SparkSession.builder` 创建了一个名为 "Example" 的会话对象[^1]。此对象可以用来加载数据、运行查询以及保存结果。
#### 加载CSV文件到DataFrame
可以通过读取本地或HDFS上的CSV文件来创建DataFrame。
```python
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
```
这段代码将指定路径下的 CSV 文件加载为 DataFrame,并自动推断列名和数据类型[^2]。
#### 显示DataFrame的内容
显示前几行数据以便快速查看内容。
```python
df.show(5) # 显示前五行
```
该命令能够帮助开发者验证数据是否被正确加载[^3]。
#### 使用SQL语句查询数据
可以在注册临时表之后使用标准SQL进行复杂的数据处理。
```python
df.createOrReplaceTempView("table_name")
result_df = spark.sql("SELECT * FROM table_name WHERE column_name > value")
```
这里展示了如何利用 SQL 查询过滤特定条件下的记录。
#### 自定义函数 (UDF)
当内置函数无法满足需求时,可定义自己的逻辑作为用户自定义函数(User Defined Functions)。
```python
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def custom_function(input_value):
return f"Processed {input_value}"
udf_custom_function = udf(custom_function, StringType())
df_with_udf = df.withColumn("processed_column", udf_custom_function(df["column_name"]))
```
上面的例子演示了怎样创建一个简单的字符串加工型 UDF 并将其应用至现有的一列上。
#### 转换RDD为DataFrame
如果已经有一个 RDD,则可通过结构化方式转换成 DataFrames 来享受更高效的计算性能。
```python
rdd = sc.parallelize([(1, "Alice"), (2, "Bob")])
columns = ["id", "name"]
df_from_rdd = rdd.toDF(columns)
```
这是把原始的键值对形式转化为带有命名字段的新表格表示法的一个例子。
---
阅读全文
相关推荐
















