spark连接mysql
时间: 2025-02-10 16:08:00 浏览: 34
### 如何使用 Spark 连接 MySQL 数据库
#### 创建 SparkSession 并配置连接属性
为了使 Spark 应用程序能够与 MySQL 数据库通信,需要创建 `SparkSession` 实例并配置必要的参数:
```scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Spark MySQL Example")
.config("spark.master", "local")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.config("spark.sql.catalogImplementation", "hive")
.getOrCreate()
// 设置 MySQL 驱动加载
Class.forName("com.mysql.jdbc.Driver").newInstance()[^3]
// 定义连接属性
val connectionProperties = new java.util.Properties()
connectionProperties.put("user", "your_username")
connectionProperties.put("password", "your_password")[^2]
```
#### 构建 JDBC URL 和读取数据
构建用于描述目标数据库位置的 JDBC URL 字符串,并利用它来执行 SQL 查询从而获取所需的数据集。
```scala
val jdbcUrl = "jdbc:mysql://localhost:3306/database_name"
// 使用 jdbc 方法从 MySQL 中读取 'bank' 表的数据至 DataFrame
val dfBank = spark.read.jdbc(jdbcUrl, "bank", connectionProperties)
dfBank.show() // 展示部分记录
```
#### 处理和分析数据
一旦获得了来自 MySQL 的数据帧对象之后就可以对其进行各种转换操作了。比如下面这段代码实现了基于职业字段(`job`)对员工数量进行计数的功能:
```scala
val jobCountsDF = dfBank.groupBy("job").count().orderBy($"count".desc)
jobCountsDF.show(10) // 只展示前十项结果
```
#### 关闭 SparkSession
当所有的处理工作完成后应当记得终止会话以释放占用着的各种资源。
```scala
spark.stop()
```
阅读全文
相关推荐

















