flink读取kafka写入mysql
时间: 2025-05-16 07:53:47 浏览: 34
### 使用 Flink 从 Kafka 消费消息并存储至 MySQL
为了实现使用 Apache Flink 从 Kafka 中读取消息并将这些消息持久化到 MySQL 数据库的功能,通常会遵循如下设计模式。此过程不仅涉及到了解如何配置 Flink 来作为消费者订阅 Kafka 主题,还需要掌握怎样设置合适的 Sink 函数来确保数据能够被可靠地写入目标数据库。
#### 配置环境依赖项
首先,在构建项目之前,需确认已引入必要的 Maven 或 Gradle 依赖以支持 Flink-Kafka 连接器以及 JDBC Connector:
对于 Maven 用户来说,`pom.xml` 文件应包含以下条目[^2]:
```xml
<dependencies>
<!-- Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Kafka connector dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- MySQL JDBC driver -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Other necessary dependencies... -->
</dependencies>
```
#### 创建 Flink 应用程序逻辑
接下来展示一段 Python 实现的伪代码用于说明整个流程;请注意实际开发中应当采用 Java 或 Scala 编程语言编写生产级别的应用程序。
```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
import json
env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(env, environment_settings=settings)
# Define source from Kafka topic using DDL statement or Table API
t_env.execute_sql("""
CREATE TABLE kafka_source (
id BIGINT,
message STRING
) WITH (
'connector' = 'kafka',
'topic' = 'your_topic_name',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""")
# Transformations can be applied here as needed...
# Define sink to MySQL table via JdbcCatalog or SQL INSERT INTO syntax
t_env.execute_sql(f"""
INSERT INTO mysql_sink (id, content)
SELECT id, message FROM kafka_source;
""")
```
上述脚本展示了创建一个简单的管道,该管道可以从指定的主题名称处获取 JSON 格式的记录,并将其插入名为 `mysql_sink` 的关系型表内。这里假设已经预先建立了对应的物理表结构[^3]。
值得注意的是,当涉及到高可用性和一致性保障时,官方文档提到过利用 TwoPhaseCommitSinkFunction 类可帮助达成跨系统的 Exactly-once 语义[^1]。然而具体实施细节取决于业务需求和技术栈的选择。
阅读全文
相关推荐


















