用python写一个类,读取.sql文件中的多行多个脚本,到openGauss数据库中去执行
时间: 2025-07-21 18:59:51 浏览: 4
<think>我们需要一个类,它能够读取一个.sql文件,该文件中包含多个SQL语句(可能由分号分隔),然后连接到openGauss数据库并执行这些语句。
步骤:
1. 读取.sql文件内容。
2. 将文件内容分割成多个SQL语句(注意:SQL语句可能跨行,且分号作为结束符,但要注意如存储过程等可能包含分号的情况,但这里我们假设是简单的脚本,每个语句以分号结束)。
3. 连接openGauss数据库(使用psycopg2库)。
4. 依次执行每个SQL语句。
注意:openGauss兼容PostgreSQL协议,所以我们可以使用psycopg2来连接。
但是,我们需要考虑事务:我们可以选择自动提交,或者手动控制事务(例如,所有语句在一个事务中执行,如果有一个失败则回滚)。
这里我们提供两种方式:默认情况下,我们将每个语句单独执行并提交(即自动提交模式),也可以选择在一个事务中执行所有语句(全部成功则提交,否则回滚)。
我们将允许在初始化时传入连接参数,并且可以选择是否在一个事务中执行。
另外,注意SQL文件可能包含注释,但分割时我们只以分号作为分隔符,并且需要去掉空语句。
我们将创建一个类:SQLFileExecutor
初始化参数:
dbname: 数据库名
user: 用户名
password: 密码
host: 主机
port: 端口,默认为5432
autocommit: 是否自动提交(即每个语句单独执行并提交),默认为False。如果为False,则所有语句在一个事务中执行。
方法:
execute_sql_file(file_path): 执行指定路径的SQL文件。
注意:连接参数还可以使用其他方式传递(如连接字符串),但这里我们使用基本参数。
安装依赖:需要安装psycopg2-binary(或psycopg2)库。
我们假设SQL文件使用UTF-8编码。
分割SQL语句:我们使用分号作为分隔符,然后过滤掉空语句和只包含空白符的语句。注意,分号后面可能还有注释,但我们只按分号分割。
但是,注意:有些SQL语句可能没有分号(比如某些DDL语句在PG中也可以不加分号),但通常我们要求每个语句以分号结束。因此,我们假设每个语句都以分号结束。
实现步骤:
1. 读取整个文件内容。
2. 用分号分割字符串,得到语句列表。
3. 去除每个语句两端的空白,如果去除后为空则跳过。
4. 注意:最后一条语句可能没有分号,但我们也会将其加入(如果非空)。但通常文件末尾会有一个分号,所以最后一个可能是空字符串。
更好的做法:我们可以使用一个简单的状态机,但这里我们假设文件格式良好,直接按分号分割即可。
但是,如果SQL中包含存储过程或函数,它们内部会有多个分号,这样分割就会出错。因此,这种方法只适用于简单的SQL脚本(每个语句都是独立的,没有包含多个分号的块语句)。
对于复杂的脚本(包含函数、存储过程等),我们可能需要使用更高级的解析器,或者要求每个函数/存储过程作为一个单独的语句(即整个函数定义作为一个字符串,中间的分号不分割)。但是,在SQL标准中,函数定义等是以CREATE OR REPLACE FUNCTION ... AS $$ ... $$ LANGUAGE plpgsql; 这样的形式,我们可以利用美元符引号来避免分割,但这里我们不做这么复杂。
因此,我们提醒用户:这个工具适用于由简单语句组成的SQL文件,每个语句以分号结束。如果包含存储过程等,请确保它们被美元符引号包围,并且整个定义是一个字符串(这样在分割时不会被拆开),或者我们要求用户将每个存储过程单独放在一个文件中。
另一种做法:我们可以使用psycopg2的execute_batch或者使用它的execute方法一次执行整个脚本?但psycopg2的execute方法一次只能执行一个语句。所以我们需要分割。
但是,psycopg2提供了执行脚本的方法:connection.cursor().execute()只能执行一个语句,但我们可以使用psycopg2的extensions模块中的split_query(但注意,这个函数在psycopg2中可能不是公开的,不建议使用)或者使用psycopg2的sql模块也没有直接提供分割多个语句的功能。
因此,我们采用简单分割的方法,并注意如果遇到包含美元符引号的语句,我们可能无法正确处理。所以,如果我们的脚本包含存储过程等,我们可以将整个文件内容作为一条语句执行?但这样也不行,因为存储过程定义中可能有多个语句,而整个存储过程本身是一个CREATE语句,所以应该作为一个语句执行。
因此,我们改变策略:我们不用分号分割,而是尝试寻找不在引号内的分号(包括单引号和美元符引号)来分割。但这比较复杂。
考虑到时间,我们这里只处理简单的情况:每个语句以分号结束,并且没有包含在字符串中的分号(除了在单引号字符串中,但我们也不处理,因为可能还有转义)。所以,我们只按分号分割,并希望用户确保分号只出现在语句结尾。
如果用户需要执行复杂的脚本(包含存储过程等),建议将每个存储过程单独放在一个文件中,或者使用其他工具(如gsql命令行)。
因此,我们按照以下步骤分割:
statements = [stmt.strip() for stmt in content.split(';') if stmt.strip()]
然后,对于每个语句,我们执行它。
但是,注意:在存储过程中,可能会使用到分号,但整个存储过程是一个语句。所以,如果我们的脚本中有存储过程,那么存储过程中的分号也会被分割,导致错误。
因此,我们提供另一种方式:如果用户知道整个文件是一个大的语句(比如包含存储过程),那么他可以设置一个参数,不进行分割,整个文件作为一个语句执行。我们在类中增加一个参数`split_statements`,默认为True(即分割语句)。如果设置为False,则整个文件内容作为一条语句执行。
修改:在execute_sql_file方法中增加一个可选参数split_statements,默认为True。
类初始化时也可以设置一个实例属性split_statements,但为了灵活性,我们在方法中设置参数。
所以,方法签名:def execute_sql_file(self, file_path, split_statements=True):
执行过程:
- 连接数据库
- 如果autocommit为True,则设置连接为自动提交模式(每个语句单独提交),否则开启事务。
- 创建游标
- 如果split_statements为True,则读取文件,分割并执行每个语句;否则,读取整个文件并作为一个语句执行。
- 执行过程中,如果发生错误,且autocommit为False(即在一个事务中),则回滚并抛出异常;如果autocommit为True,则只抛出异常(之前的语句已经提交,当前语句失败)。
- 最后关闭游标和连接。
注意:我们每次执行一个文件就连接一次数据库,执行完关闭。如果频繁执行多个文件,可能效率不高,但这里我们只考虑一个文件。
代码实现如下:</think>以下是一个使用 Python 编写的类,用于读取 `.sql` 文件中的多行 SQL 脚本并在 openGauss 数据库中执行:
```python
import psycopg2
import re
class OpenGaussSQLExecutor:
def __init__(self, host, port, database, user, password):
"""
初始化数据库连接参数
:param host: 数据库主机地址
:param port: 数据库端口
:param database: 数据库名称
:param user: 用户名
:param password: 密码
"""
self.conn_params = {
'host': host,
'port': port,
'database': database,
'user': user,
'password': password
}
self.connection = None
self.cursor = None
def connect(self):
"""建立到 openGauss 的数据库连接"""
try:
self.connection = psycopg2.connect(**self.conn_params)
self.cursor = self.connection.cursor()
print("Successfully connected to openGauss database")
except Exception as e:
print(f"Connection failed: {str(e)}")
raise
def disconnect(self):
"""关闭数据库连接"""
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
print("Database connection closed")
def _parse_sql_scripts(self, file_path):
"""
解析 SQL 文件,分割成独立的 SQL 语句
:param file_path: SQL 文件路径
:return: 包含独立 SQL 语句的列表
"""
with open(file_path, 'r', encoding='utf-8') as f:
sql_content = f.read()
# 移除注释(支持 -- 和 /* */ 两种格式)
sql_content = re.sub(r'/\*.*?\*/', '', sql_content, flags=re.DOTALL)
sql_content = re.sub(r'--.*', '', sql_content)
# 按分号分割并清理空白语句
statements = [stmt.strip() for stmt in sql_content.split(';') if stmt.strip()]
return statements
def execute_sql_file(self, file_path, autocommit=False):
"""
执行 SQL 文件中的所有脚本
:param file_path: SQL 文件路径
:param autocommit: 是否自动提交事务(默认为 False)
"""
try:
self.connect()
statements = self._parse_sql_scripts(file_path)
for i, statement in enumerate(statements, 1):
try:
print(f"Executing statement {i}/{len(statements)}")
self.cursor.execute(statement)
if autocommit:
self.connection.commit()
except Exception as e:
print(f"Error executing statement {i}: {str(e)}")
print(f"Failed statement: {statement[:100]}...") # 打印前100字符
if not autocommit:
self.connection.rollback()
raise
if not autocommit:
self.connection.commit()
print("All SQL statements executed successfully")
except Exception as e:
print(f"Execution failed: {str(e)}")
if self.connection and not autocommit:
self.connection.rollback()
raise
finally:
self.disconnect()
# 使用示例
if __name__ == "__main__":
# 配置数据库连接参数
executor = OpenGaussSQLExecutor(
host="127.0.0.1",
port="5432",
database="mydb",
user="myuser",
password="mypassword"
)
# 执行 SQL 文件
executor.execute_sql_file(
file_path="scripts.sql",
autocommit=False # 设置为 True 时每个语句独立提交
)
```
### 代码详细解释:
#### 1. 类初始化 (`__init__`)
- 存储 openGauss 数据库连接参数(主机、端口、数据库名、用户名、密码)
- 初始化连接对象和游标对象为 `None`
#### 2. 数据库连接管理
- `connect()`:使用 `psycopg2` 建立到 openGauss 的连接
- `disconnect()`:安全关闭游标和连接
#### 3. SQL 文件解析 (`_parse_sql_scripts`)
- 读取 SQL 文件内容
- 使用正则表达式移除两种 SQL 注释:
- `/* ... */` 多行注释
- `--` 单行注释
- 按分号 `;` 分割 SQL 语句
- 过滤空语句和空白字符
#### 4. SQL 执行逻辑 (`execute_sql_file`)
- 建立数据库连接
- 解析 SQL 文件获取语句列表
- 遍历执行所有 SQL 语句:
- 显示当前执行进度
- 捕获并处理执行异常
- 根据 `autocommit` 参数决定提交方式
- 事务管理:
- `autocommit=False`(默认):所有语句在单个事务中执行(出错时回滚)
- `autocommit=True`:每条语句独立提交
- 异常处理:
- 打印错误语句的前100个字符(避免输出过长)
- 发生错误时回滚未提交的事务
- 最终关闭数据库连接
### 使用说明:
1. 安装依赖:
```bash
pip install psycopg2-binary
```
2. 准备 SQL 文件示例 (`scripts.sql`):
```sql
-- 创建表
CREATE TABLE employees (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
department VARCHAR(50),
salary NUMERIC
);
/* 插入示例数据 */
INSERT INTO employees (name, department, salary) VALUES
('Alice', 'Engineering', 75000),
('Bob', 'Marketing', 68000);
-- 更新数据
UPDATE employees SET salary = 80000 WHERE name = 'Alice';
-- 查询验证
SELECT * FROM employees;
```
3. 执行注意事项:
- 确保 openGauss 服务已启动
- 正确配置防火墙规则允许访问
- 具有执行 SQL 文件的必要权限
- 对于包含事务控制的 SQL 文件(如 `BEGIN`/`COMMIT`),建议设置 `autocommit=False`
### 输出示例:
```
Successfully connected to openGauss database
Executing statement 1/4
Executing statement 2/4
Executing statement 3/4
Executing statement 4/4
All SQL statements executed successfully
Database connection closed
```
### 错误处理示例:
```
Executing statement 1/3
Executing statement 2/3
Error executing statement 2: relation "employee" does not exist
Failed statement: UPDATE employee SET salary = 80000 WHERE name = 'Alice'...
Execution failed: relation "employee" does not exist
Database connection closed
```
阅读全文
相关推荐


















