flink 1.17 mysql cdc
时间: 2025-07-16 12:07:08 浏览: 7
<think>我们正在使用Flink 1.17版本,并且需要配置MySQL CDC连接器来实现实时数据同步。根据引用[1]的信息,我们知道在Flink 1.17.1中,需要两个关键jar包:mysql-connector-java(JDBC驱动)和flink-sql-connector-mysql-cdc(CDC连接器)。同时,引用[1]也提到了flink-sql-connector-mysql-cdc是一个fat jar,包含了所有依赖,方便在SQL客户端使用。
步骤:
1. 下载所需jar包:
- mysql-connector-java-8.0.27.jar(根据引用[1],版本为8.0.27)
- flink-sql-connector-mysql-cdc-2.4.2.jar(注意:2.4.2版本对应Flink 1.17,但需确认兼容性。根据Flink CDC官方文档,Flink CDC版本与Flink版本有对应关系,这里我们使用2.4.x版本支持Flink 1.17)
2. 将这两个jar包放入Flink安装目录的lib文件夹下。
3. 重启Flink集群:
```bash
./bin/stop-cluster.sh
./bin/start-cluster.sh
```
4. 使用Flink SQL客户端或者编写程序来使用MySQL CDC。
下面分别介绍两种方式:SQL客户端方式和DataStream API方式(使用Flink SQL的Table API)。
### 方式一:使用Flink SQL客户端
启动SQL客户端:
```bash
./bin/sql-client.sh
```
然后,在SQL客户端中执行以下DDL语句来创建MySQL CDC源表:
```sql
CREATE TABLE mysql_cdc_source (
id INT,
name STRING,
email STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'yourpassword',
'database-name' = 'testdb',
'table-name' = 'user_table'
);
```
这样就可以像查询普通表一样查询这个CDC表,它会实时捕获MySQL中`user_table`表的变更(插入、更新、删除)。
### 方式二:使用DataStream API(Java示例)
如果需要编写程序,可以使用Flink的DataStream API。但注意,引用[2]中提到了一个过时的包(com.alibaba.ververica),这是老版本的Flink CDC(由阿里开源,现在已进入Apache项目)。在Flink 1.17中,我们应该使用官方的`flink-connector-cdc`包。
首先,在项目的pom.xml中添加依赖(如果使用Maven):
```xml
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
```
然后,编写Java程序:
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.MySqlSource;
public class MySqlCDCExample {
public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("testdb") // 设置数据库
.tableList("testdb.user_table") // 设置表
.username("root")
.password("yourpassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // 将变更事件转换为JSON字符串
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(sourceFunction).print().setParallelism(1); // 设置并行度为1,方便输出
env.execute("MySQL CDC Example");
}
}
```
注意:这里使用了`JsonDebeziumDeserializationSchema`,它将变更事件转换为JSON字符串,方便我们查看。
### 注意事项
- 确保MySQL的binlog已启用。在MySQL配置文件中(如my.cnf)添加:
```
server-id = 1
log_bin = /var/log/mysql/mysql-bin.log
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 10
```
然后重启MySQL服务。
- 确保MySQL用户有复制权限(REPLICATION SLAVE, REPLICATION CLIENT)。
### 实时数据同步示例
假设我们要将MySQL中的表实时同步到另一个数据库(如ClickHouse)或消息队列(如Kafka),可以在Flink作业中定义源表(MySQL CDC)和目标表,然后通过INSERT INTO语句将数据实时写入目标表。
例如,同步到Kafka:
```sql
CREATE TABLE kafka_sink (
id INT,
name STRING,
email STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
INSERT INTO kafka_sink SELECT * FROM mysql_cdc_source;
```
这样,任何对MySQL源表的变更都会实时写入Kafka。
### 总结
通过以上步骤,我们可以在Flink 1.17中配置并使用MySQL CDC连接器实现实时数据同步。关键点包括:下载正确的jar包、启用MySQL的binlog、创建CDC源表、将数据同步到目标系统。
[^1]: Flink1.17.1本地单节点部署,并支持连接mysql-cdc。根据Flink官方文档,要使用mysql-cdc连接器,需要配置的JDBC Driver的版本为8.0.27。从maven仓库中下载mysql-connector-java-8.0.27.jar,上传到Flink的lib目录。下载flink-sql-connector-mysql-cdc-2.4.2.jar,上传到lib目录。重启flink集群。
[^2]: mysql cdc_Flink之Mysql数据CDC。介绍了使用MySQLSource构建DataStream源。
§§相关问题§§
1. 如何验证Flink MySQL CDC连接器是否正常工作?
2. 在Flink CDC中如何处理MySQL表结构变更(如添加列)?
3. Flink CDC支持哪些类型的数据库?
4. Flink CDC在同步数据时如何保证Exactly-Once语义?</think>### Flink 1.17 MySQL CDC 配置与实时数据同步指南
#### **1. 环境准备**
- **下载依赖包**(需与Flink 1.17兼容):
- MySQL JDBC驱动:[mysql-connector-java-8.0.27.jar](https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.27)
- MySQL CDC连接器:[flink-sql-connector-mysql-cdc-2.4.2.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc/2.4.2)
- **部署到Flink**:
```bash
# 将JAR包放入Flink的lib目录
cp mysql-connector-java-8.0.27.jar flink-sql-connector-mysql-cdc-2.4.2.jar /path/to/flink-1.17.1/lib/
# 重启集群
./bin/stop-cluster.sh
./bin/start-cluster.sh
```
#### **2. MySQL 配置**
- **启用Binlog**(在MySQL配置文件 `my.cnf` 中):
```ini
server-id = 1
log_bin = /var/log/mysql/mysql-bin.log
binlog_format = ROW
binlog_row_image = FULL
```
- **创建MySQL用户并授权**:
```sql
CREATE USER 'flink_user'@'%' IDENTIFIED BY 'Password123!';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_user'@'%';
FLUSH PRIVILEGES;
```
#### **3. Flink SQL 实时同步示例**
```sql
-- 创建MySQL CDC源表(捕获变更数据)
CREATE TABLE mysql_source (
id INT PRIMARY KEY,
name STRING,
email STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flink_user',
'password' = 'Password123!',
'database-name' = 'test_db',
'table-name' = 'users'
);
-- 创建Kafka目标表(存储变更数据)
CREATE TABLE kafka_sink (
id INT,
name STRING,
email STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_updates',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
-- 启动实时同步任务
INSERT INTO kafka_sink SELECT * FROM mysql_source;
```
#### **4. 关键参数说明**
| 参数 | 说明 |
|------|------|
| `connector` | 固定值 `mysql-cdc` |
| `scan.startup.mode` | 初始快照模式(`initial`/`latest-offset`) |
| `server-time-zone` | MySQL时区(例如 `Asia/Shanghai`) |
| `debezium.snapshot.mode` | 快照模式(默认 `initial`) |
#### **5. 验证数据同步**
- **向MySQL插入数据**:
```sql
INSERT INTO test_db.users(id, name, email) VALUES (1, 'Alice', '[email protected]');
```
- **检查Kafka输出**:
```json
{"id":1,"name":"Alice","email":"[email protected]"}
```
#### **6. 高级场景**
- **处理表结构变更**:
CDC自动同步ALTER TABLE操作,但需确保目标存储支持动态Schema(如Kafka+Schema Registry)。
- **Exactly-Once保证**:
启用Flink Checkpoint(间隔建议10s~30s):
```sql
SET 'execution.checkpointing.interval' = '10s';
```
> **注意**:
> - 使用 `flink-sql-connector-mysql-cdc`(而非 `flink-connector-mysql-cdc`)简化依赖管理[^1]
> - CDC支持 **INSERT/UPDATE/DELETE** 事件捕获,通过`op`字段标识操作类型(`c`=插入, `u`=更新, `d`=删除)[^2]
[^1]: Flink官网说明:`flink-sql-connector-mysql-cdc`是Flink SQL专用的Fat JAR,包含所有依赖。
[^2]: Debezium数据格式规范:变更事件包含`before`(旧值)和`after`(新值)字段。
阅读全文
相关推荐










