java监听pgsql数据库表变化
时间: 2025-06-23 09:26:59 浏览: 19
### Java 实现 PostgreSQL 数据库表变更监听
#### 使用 Apache Flink 和 Change Data Capture (CDC)
为了实现实时监听和处理 PostgreSQL 表的变化,可以采用 Apache Flink 结合其提供的 CDC 功能。这种方法能够高效地捕获数据库中的任何更改,并将其转换为流数据进行进一步处理。
```java
// 导入必要的包
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSource;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class PostgresCdcExample {
public static void main(String[] args) throws Exception {
// 创建执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(1);
// 获取 Table 环境
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 定义 JDBC 连接选项
String url = "jdbc:postgresql://localhost:5432/mydb";
String username = "myuser";
String password = "mypassword";
// 构建源表 DDL 语句
StringBuilder sourceDDLBuilder = new StringBuilder()
.append("CREATE TABLE my_table_source (\n")
.append("id BIGINT,\n")
.append("name STRING\n")
.append(") WITH (\n")
.append("'connector' = 'postgres-cdc',\n")
.append("'hostname' = 'localhost',\n")
.append("'port' = '5432',\n")
.append("'username' = '" + username + "',\n")
.append("'password' = '" + password + "',\n")
.append("'database-name' = 'mydb',\n")
.append("'table-name' = 'my_table'\n")
.append(")");
// 执行创建源表的操作
tableEnv.executeSql(sourceDDLBuilder.toString());
// 查询并打印结果到控制台
TableResult result = tableEnv.sqlQuery(
"SELECT * FROM my_table_source").execute();
while(result.hasNext()){
System.out.println(result.next());
}
}
}
```
此代码片段展示了如何设置一个基于 Apache Flink 的应用来监控 PostgreSQL 数据库中的 `my_table` 变更情况[^2]。
#### 基于定时轮询的方式
另一种较为简单的方案是通过定期查询特定时间戳之后发生的更新记录来进行监听操作。这种方式虽然简单易懂,但在高并发场景下可能会带来性能瓶颈以及延迟较高的问题。
```java
// 导入所需类库
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Date;
class PollingBasedChangeListener {
private Connection connect() {
try {
Class.forName("org.postgresql.Driver");
return DriverManager.getConnection(
"jdbc:postgresql://localhost:5432/test", "postgres", "");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void checkForChanges(Date lastCheckedTime) {
try (Connection conn = connect()) {
PreparedStatement pstmt = null;
ResultSet rs = null;
String sql = "SELECT id, name FROM my_table WHERE updated_at > ?";
pstmt = conn.prepareStatement(sql);
pstmt.setDate(1, new java.sql.Date(lastCheckedTime.getTime()));
rs = pstmt.executeQuery();
while(rs.next()){
int id = rs.getInt("id");
String name = rs.getString("name");
System.out.printf("Detected change on row with ID=%d and Name='%s'%n", id, name);
// 处理检测到的变化...
}
} catch(Exception ex){
ex.printStackTrace();
}
}
// 调用该函数以启动监听逻辑
public static void startListening(){
Date initialCheckPoint = new Date(); // 记录首次检查的时间点
Timer timer = new Timer(true);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run(){
new PollingBasedChangeListener().checkForChanges(initialCheckPoint);
}
}, 0L, TimeUnit.MINUTES.toMillis(1)); // 每分钟执行一次轮询
}
}
```
这段代码实现了基本的轮询机制,在每次调用 `startListening()` 方法后会每隔一分钟去检查自上次以来是否有新的变动发生[^3]。
阅读全文
相关推荐



















