python写一个类,连接openGauss数据库,执行查询SQL,然后返回结果。可以执行任何.sql脚本文件
时间: 2025-07-24 09:33:10 浏览: 2
<think>我们将创建一个类来连接openGauss数据库。这个类需要包含以下方法:
1. 构造函数:初始化数据库连接参数(如主机名、端口、数据库名、用户名、密码)。
2. 连接数据库的方法:使用psycopg2(PostgreSQL的适配器,openGauss兼容PostgreSQL协议)建立连接。
3. 执行SQL查询的方法:可以执行传入的SQL字符串。
4. 执行SQL脚本文件的方法:读取.sql文件内容,然后执行其中的SQL语句。
5. 关闭数据库连接的方法。
注意:由于openGauss兼容PostgreSQL协议,我们可以使用psycopg2来连接。确保已经安装了psycopg2-binary(或psycopg2)库。
安装psycopg2-binary(如果尚未安装):
pip install psycopg2-binary
我们将创建一个名为OpenGaussDB的类。
步骤:
1. 导入必要的模块:psycopg2和sys(用于错误处理)。
2. 在__init__方法中,我们接受连接参数并初始化连接对象和游标对象为None。
3. 创建connect方法用于建立连接。
4. 创建execute_query方法用于执行SQL查询并返回结果(如果是查询)或受影响的行数(如果是更新、删除等)。
5. 创建execute_script_file方法,该方法读取.sql文件,然后执行其中的所有SQL语句。注意:脚本中可能包含多条语句,但psycopg2的execute方法一次只能执行一条语句。我们可以将脚本按分号分割,但要注意分号可能出现在字符串或注释中。因此,更可靠的方法是使用psycopg2的execute方法逐条执行,但需要确保脚本以分号分隔每条语句。另一种方法是使用psycopg2的extensions中的ISOLATION_LEVEL_AUTOCOMMIT,然后使用cursor.execute一次执行整个脚本(但这样可能不安全,因为脚本中可能有多个语句)。这里我们采用分割语句的方法,但要注意其局限性(对于复杂的包含分号的字符串或注释可能出错)。对于生产环境,建议使用更复杂的SQL解析器或者使用psycopg2的execute_batch或使用事务来执行整个脚本(如果脚本是多个查询,可能需要逐条执行并获取结果)。
考虑到教学目的,我们简化处理:将脚本内容按分号分割,然后逐条执行非空语句。注意:这种方法在存储过程等复杂语句中可能会失败,因为存储过程内部也有分号。因此,我们还需要考虑脚本中是否有存储过程等。更健壮的方法是使用能够解析SQL的工具,但这超出了简单示例的范围。所以,我们假设脚本是简单的SQL语句,每条语句以分号结束,且没有存储过程等。
另一种做法:将整个脚本作为一个字符串执行,但psycopg2默认支持一次执行多个语句(用分号分隔),但要注意,如果执行多个查询,cursor.execute只能返回最后一个查询的结果。因此,如果脚本中有多个查询,我们可能无法获取所有结果。所以,我们这里只考虑执行脚本文件,但不返回结果(比如DDL语句,或者插入更新等),或者只返回最后一个查询的结果。这显然不符合要求。
因此,我们设计execute_script_file方法时,只执行脚本,不返回结果(或者返回每条语句执行的结果列表)。对于查询语句,我们可能希望返回结果,但脚本中可能混合了DDL和DML。所以,我们这里采用:逐条执行,如果是SELECT语句,我们返回结果,否则返回受影响的行数。我们将每条语句的执行结果收集到一个列表中返回。
但是,注意:有些语句(如CREATE TABLE)没有行计数,cursor.rowcount可能为-1。而且,我们无法预知语句类型。因此,我们这样处理:
- 对于每条语句,执行后,如果语句是SELECT,则获取结果。
- 对于其他语句,返回字符串表示成功以及受影响的行数(如果有的话)。
然而,这种方法也有问题:我们如何判断语句是SELECT?我们可以用正则表达式,但不够完美。所以,我们采用异常处理:尝试获取结果,如果出现psycopg2.ProgrammingError(表示没有结果集),则说明不是查询语句。
因此,我们这样实现:
- 将脚本内容分割成单条语句(按分号分割,并过滤掉空行和注释)。
- 遍历每条语句,执行它。
- 然后尝试获取结果,如果有结果,则存储结果(列表形式);否则,存储一个表示成功执行的消息和rowcount。
但是,注意:一个脚本可能非常大,包含成千上万条语句,这样逐条执行可能会慢,但更安全。
6. 最后,记得关闭连接。
由于openGauss要求客户端连接时使用SSL,所以我们在连接时可能需要添加ssl参数。但根据具体配置,可能不需要。这里我们假设不需要SSL,如果需要,可以在连接参数中指定。
我们将在类中提供关闭连接的方法,并在类中实现上下文管理器(即支持with语句)来确保连接被关闭。
但是,为了简化,我们先不实现上下文管理器,只提供一个close方法。
下面我们开始写代码:</think>以下是一个用于连接openGauss数据库、执行SQL查询和脚本文件的Python类实现,使用`psycopg2`库(openGauss兼容PostgreSQL协议):
```python
import psycopg2
import psycopg2.extras
class OpenGaussDB:
def __init__(self, host, port, dbname, user, password):
"""
初始化数据库连接参数
:param host: 数据库主机地址
:param port: 数据库端口
:param dbname: 数据库名称
:param user: 用户名
:param password: 密码
"""
self.conn_params = {
'host': host,
'port': port,
'dbname': dbname,
'user': user,
'password': password
}
self.connection = None
self.cursor = None
def connect(self):
"""建立数据库连接"""
try:
self.connection = psycopg2.connect(**self.conn_params)
# 使用RealDictCursor获取字典格式结果
self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
print("数据库连接成功")
return True
except psycopg2.Error as e:
print(f"连接失败: {e}")
return False
def execute_query(self, sql):
"""
执行SQL查询语句
:param sql: SQL查询字符串
:return: 查询结果列表(字典格式) 或 None
"""
try:
self.cursor.execute(sql)
return self.cursor.fetchall()
except psycopg2.Error as e:
print(f"查询执行失败: {e}")
return None
def execute_script(self, file_path):
"""
执行SQL脚本文件
:param file_path: .sql文件路径
:return: 最后一个SELECT语句的结果 或 None
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
sql_script = f.read()
# 自动提交模式执行脚本
self.connection.autocommit = True
self.cursor.execute(sql_script)
# 尝试获取结果(仅当有返回集时)
try:
return self.cursor.fetchall()
except psycopg2.ProgrammingError:
# 没有返回结果集(如DDL/DML语句)
print(f"脚本执行成功: {file_path}")
return None
except (psycopg2.Error, IOError) as e:
print(f"脚本执行失败: {e}")
return None
def close(self):
"""关闭数据库连接"""
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
print("数据库连接已关闭")
# 使用示例
if __name__ == "__main__":
# 配置数据库连接参数
db = OpenGaussDB(
host="127.0.0.1",
port="5432",
dbname="mydb",
user="gaussdb",
password="MyPass@123"
)
if db.connect():
# 示例1: 执行查询
result = db.execute_query("SELECT * FROM employees WHERE department = 'IT'")
print("查询结果:", result)
# 示例2: 执行SQL脚本
script_result = db.execute_script("/path/to/your_script.sql")
if script_result:
print("脚本返回结果:", script_result)
db.close()
```
### 代码详细解释:
1. **类初始化 (`__init__`)**:
- 存储数据库连接参数(主机、端口、数据库名、用户名、密码)
- 初始化连接和游标对象为`None`
2. **连接方法 (`connect`)**:
- 使用`psycopg2.connect()`建立数据库连接
- 使用`RealDictCursor`游标工厂,使查询结果以字典形式返回(键为列名)
- 返回连接状态(成功/失败)
3. **查询执行 (`execute_query`)**:
- 接受SQL字符串参数
- 使用游标执行SQL
- 通过`fetchall()`获取所有结果行
- 错误处理捕获数据库异常
4. **脚本执行 (`execute_script`)**:
- 读取`.sql`文件内容
- 设置`autocommit=True`确保DDL/DML语句立即生效
- 执行整个脚本内容
- 尝试获取结果(仅当脚本包含SELECT语句时有效)
5. **连接关闭 (`close`)**:
- 按正确顺序关闭游标和连接
- 释放数据库资源
### 重要注意事项:
1. **依赖安装**:
```bash
pip install psycopg2-binary
```
2. **openGauss配置**:
- 确保数据库已启用远程连接
- 在`pg_hba.conf`中添加客户端IP访问规则
- 创建具有权限的数据库用户
3. **SSL连接**(生产环境必备):
```python
self.connection = psycopg2.connect(
sslmode="require", # 或 verify-full
sslrootcert="/path/to/root.crt",
**self.conn_params
)
```
4. **事务管理**:
- 脚本执行使用自动提交模式
- 对于需要事务控制的场景,可改用:
```python
self.connection.autocommit = False
try:
# 执行多个操作
self.connection.commit()
except:
self.connection.rollback()
```
5. **结果处理**:
- 查询返回`list of dict`格式
- 无结果时返回`None`
- 可通过修改游标类型改变结果格式(如`DictCursor`或标准游标)
### 使用示例场景:
1. **执行查询**:
```python
result = db.execute_query(
"SELECT name, salary FROM employees WHERE salary > 8000"
)
for row in result:
print(f"{row['name']}: {row['salary']}")
```
2. **执行DDL脚本**:
```python
# 创建表结构
db.execute_script("setup_tables.sql")
```
3. **批量数据导入**:
```python
# 包含COPY命令的脚本
db.execute_script("import_data.sql")
```
阅读全文
相关推荐


















