用python写一个类用flask_sqlalchemy扩展,读取.sql文件中的多行多个脚本,到openGauss数据库中去执行
时间: 2025-07-21 12:59:45 浏览: 4
<think>我们需要使用Flask和Flask-SQLAlchemy扩展来连接openGauss数据库,并执行一个.sql文件中的多个SQL脚本。
注意:openGauss兼容PostgreSQL,所以我们可以使用psycopg2作为驱动。
步骤:
1. 安装必要的包:flask, flask-sqlalchemy, psycopg2 (或psycopg2-binary)
2. 创建一个Flask应用,配置数据库连接字符串。
3. 创建一个类,该类使用Flask-SQLAlchemy的db对象,并提供一个方法来读取.sql文件并执行其中的多个SQL语句。
注意:.sql文件中的多个SQL语句通常以分号(;)结尾。但是,SQLAlchemy的execute方法默认一次执行一个语句。
因此,我们需要将整个文件内容分割成多个独立的SQL语句。注意避免分割掉字符串或注释中的分号。
然而,由于SQL的复杂性(例如,存储过程中可能包含多个分号),简单的按分号分割并不总是可靠。
这里我们采用一个折中的方法:如果我们的.sql文件是由多个简单的SQL语句组成(没有存储过程等复杂结构),我们可以按分号分割并逐条执行。
如果文件中有存储过程等,则需要整个执行(因为它们可能包含多个分号,但整个是一个语句)。
因此,我们假设.sql文件中每个语句都是独立的,并且以分号结尾。同时,我们忽略空语句。
另一种方法是使用`psycopg2`的`cursor.execute()`无法直接执行多个语句,但我们可以使用`psycopg2`的`execute_values`或`execute_batch`并不直接支持多语句。
所以,我们选择将整个文件作为一个字符串,然后分割成单个语句。
但是,更可靠的方法是使用SQLAlchemy的`connection`,然后使用`connection.connection`获取原始的psycopg2连接,然后使用它的`cursor`来执行整个脚本(通过一次性提交整个字符串)?
实际上,psycopg2的cursor可以执行多个语句,只要它们用分号分隔并且一次性提交。但是,如果脚本中有GO这样的关键字(在SQL Server中),在PostgreSQL中并不支持。
因此,我们这里假设.sql文件是PostgreSQL兼容的,并且每个语句以分号结束,没有使用其他批处理分隔符。
我们可以这样:
- 读取整个文件内容
- 使用分号分割字符串,然后逐个语句执行(跳过空语句)
但是,如果语句中包含字符串内的分号,这种方法就会出错。所以,我们需要一个更智能的分割方法。
由于实现一个完整的SQL解析器很复杂,我们可以考虑使用`psycopg2`的`split`方法?但是,psycopg2并没有提供这样的函数。
另一种方法是使用`sqlparse`库(第三方)来分割SQL语句。我们可以先安装它:`pip install sqlparse`
使用`sqlparse`可以安全地将SQL脚本分割成多个语句。
因此,我们选择使用`sqlparse`来分割SQL文件中的多个语句。
步骤:
1. 读取.sql文件
2. 使用sqlparse.split()得到语句列表
3. 遍历每个语句,如果语句非空,则执行
注意:openGauss的驱动我们使用psycopg2,所以连接字符串类似:'opengauss://username:password@host:port/database'
创建类:SQLFileExecutor
代码结构:
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
import sqlparse
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'opengauss://username:password@host:port/database'
db = SQLAlchemy(app)
class SQLFileExecutor:
def __init__(self, db):
self.db = db
def execute_sql_file(self, file_path):
# 读取文件
with open(file_path, 'r') as f:
sql_script = f.read()
# 分割成多个语句
statements = sqlparse.split(sql_script)
# 过滤掉空语句
statements = [stmt.strip() for stmt in statements if stmt.strip()]
# 获取数据库引擎
engine = self.db.engine
connection = engine.raw_connection()
try:
cursor = connection.cursor()
for stmt in statements:
if stmt:
cursor.execute(stmt)
connection.commit()
except Exception as e:
connection.rollback()
raise e
finally:
connection.close()
但是,注意:使用原始游标执行,这样不会触发SQLAlchemy的事件等。但直接执行SQL是可行的。
另外,我们也可以使用SQLAlchemy的text和session.execute,但是同样,一次只能执行一个语句。所以这里使用原始连接更直接。
但是,我们也可以使用SQLAlchemy的`engine.execute()`,不过它也不支持多语句。所以我们需要循环执行每个语句。
使用原始连接的方式,我们使用psycopg2的游标来执行每个语句。
注意:openGauss使用psycopg2连接,所以这样是可行的。
我们也可以使用SQLAlchemy的`connection.execute(text(statement))`,但是同样需要循环。
这里我们使用原始连接,因为这样我们可以直接控制事务(一次提交所有,或者出错回滚)。
但是,我们也可以使用SQLAlchemy的connection.begin()来管理事务,但使用原始连接更直接。
因此,我们采用上述方法。
注意:如果SQL文件很大,可能需要考虑分块执行,但这里我们一次读取整个文件。
使用示例:
executor = SQLFileExecutor(db)
executor.execute_sql_file('path/to/file.sql')
现在,我们编写这个类。
注意:在Flask应用上下文中执行,因为我们需要确保db.engine是已经绑定的。
我们可以在没有请求上下文的情况下使用,但需要确保应用已经初始化。
另一种方式:我们也可以使用`with app.app_context():`来确保上下文,但是我们在类中不强制,由调用者确保。
完整代码:</think>下面是一个使用 Flask-SQLAlchemy 扩展读取并执行 .sql 文件中多行 SQL 脚本的 Python 类实现,适用于 openGauss 数据库:
```python
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
import sqlparse
import re
app = Flask(__name__)
# 配置 openGauss 数据库连接 (示例配置)
app.config['SQLALCHEMY_DATABASE_URI'] = 'opengauss://username:password@host:port/dbname'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 初始化 SQLAlchemy
db = SQLAlchemy(app)
class SQLScriptExecutor:
def __init__(self, db_instance):
self.db = db_instance
def execute_sql_file(self, file_path):
"""
执行 SQL 文件中的多个脚本
:param file_path: .sql 文件路径
"""
# 读取 SQL 文件内容
with open(file_path, 'r', encoding='utf-8') as f:
sql_content = f.read()
# 使用 sqlparse 分割 SQL 语句
statements = sqlparse.split(sql_content)
# 过滤空语句和注释
valid_statements = [
s.strip() for s in statements
if s.strip() and not s.strip().startswith('--')
]
# 获取数据库连接
connection = self.db.engine.raw_connection()
try:
cursor = connection.cursor()
# 逐条执行 SQL 语句
for idx, sql in enumerate(valid_statements, 1):
try:
cursor.execute(sql)
print(f"✅ 语句 {idx} 执行成功: {sql[:50]}...")
except Exception as e:
print(f"❌ 语句 {idx} 执行失败: {sql[:50]}...")
print(f"错误信息: {str(e)}")
# 提交事务
connection.commit()
except Exception as e:
connection.rollback()
raise RuntimeError(f"SQL 执行失败: {str(e)}")
finally:
cursor.close()
connection.close()
return len(valid_statements)
# 使用示例
if __name__ == '__main__':
with app.app_context():
executor = SQLScriptExecutor(db)
# 替换为你的 SQL 文件路径
count = executor.execute_sql_file('scripts.sql')
print(f"共执行 {count} 条 SQL 语句")
```
### 代码详解:
1. **Flask 应用配置**
- 配置 openGauss 数据库连接字符串
- 禁用 SQLALCHEMY_TRACK_MODIFICATIONS 避免额外开销
2. **SQLScriptExecutor 类核心功能**
- `__init__`: 接收 Flask-SQLAlchemy 实例
- `execute_sql_file`: 核心执行方法
- 读取 SQL 文件内容
- 使用 `sqlparse` 库分割多条 SQL 语句
- 过滤空语句和注释
- 获取原始数据库连接
- 逐条执行 SQL 并处理异常
- 事务管理(成功提交/失败回滚)
3. **关键处理逻辑**
- **SQL 分割**:使用 `sqlparse.split()` 智能分割 SQL 语句,正确处理存储过程等复杂语句
- **语句过滤**:移除空行和单行注释(`--`)
- **错误处理**:
- 单条语句失败不影响后续执行
- 详细错误日志记录
- 事务级回滚保证一致性
- **资源管理**:使用 `finally` 块确保关闭连接
4. **使用示例**
- 在应用上下文中创建执行器实例
- 调用 `execute_sql_file()` 方法执行 SQL 文件
- 返回成功执行的语句数量
### 依赖安装:
```bash
pip install flask flask-sqlalchemy sqlparse psycopg2-binary
```
### 示例 SQL 文件 (scripts.sql):
```sql
-- 创建表
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 插入数据
INSERT INTO users (username) VALUES ('john_doe');
INSERT INTO users (username) VALUES ('jane_smith');
-- 创建索引
CREATE INDEX idx_users_created ON users(created_at);
-- 存储过程示例
CREATE OR REPLACE FUNCTION update_username(user_id INT, new_name VARCHAR)
RETURNS VOID AS $$
BEGIN
UPDATE users SET username = new_name WHERE id = user_id;
END;
$$ LANGUAGE plpgsql;
```
### 注意事项:
1. 确保 openGauss 服务已启动且网络可达
2. 数据库用户需有执行 DDL/DML 的权限
3. 复杂存储过程建议单独执行
4. 大文件建议分批处理
5. 连接参数需替换为实际值:
- `username`: 数据库用户名
- `password`: 密码
- `host`: 数据库地址
- `port`: 服务端口(默认 5432)
- `dbname`: 数据库名称
阅读全文
相关推荐


















