flink 写入 mysql
时间: 2025-01-08 22:17:42 浏览: 41
### 使用Flink将数据写入MySQL的方法
为了实现从Apache Flink到MySQL的数据写入操作,通常会采用JDBC连接器来完成这一过程。下面提供了一个具体的例子说明如何配置并执行此任务。
#### 配置依赖项
首先,在构建工具(Maven或Gradle)中的项目文件里加入必要的依赖库:
对于Maven而言,需添加如下内容至`pom.xml`:
```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>{FLINK_VERSION}</version>
</dependency>
<!-- MySQL JDBC Driver -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
```
其中 `{FLINK_VERSION}` 应替换为所使用的具体版本号。
#### 编写Java程序
接下来展示一段简单的Java代码片段用于演示通过Flink向MySQL数据库表内插入记录的过程[^4]:
```java
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.types.Row;
public class WriteToMySqlExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 定义要发送给MySQL的数据流
var inputDataStream = ...; // 这里应该是一个DataStream<Row>, Row对象包含了待插入字段
JdbcSink.sink(
"INSERT INTO my_table (column1, column2) VALUES (?, ?)",
(statement, row) -> {
statement.setString(1, row.getField(0).toString());
statement.setInt(2, Integer.parseInt(row.getField(1).toString()));
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/my_database")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("root")
.withPassword("password")
.build());
System.out.println("Starting job...");
env.execute("Write to MySQL Example");
}
}
```
上述代码展示了怎样利用 `JdbcSink` 来定义一个SQL语句模板以及参数设置逻辑,并指定了目标表格名称和列名;同时提供了建立与MySQL服务器之间网络链接所需的URL地址、驱动类名、用户名及密码等信息。
请注意实际应用时应根据实际情况调整这些细节部分,比如更改主机IP地址、端口号、数据库名字、认证凭证等。
阅读全文
相关推荐


















