from datetime import datetime import mysql.connector import pandas as pd from tqdm import tqdm class A(object): def __init__(self): self.db = mysql.connector.connect( host='127.0.0.1', port='3306', user='root', password='123456' ) self.cursor = self.db.cursor() self.cursor.execute("USE luwang") self.data = None def ReadMySQL(self): sql = """SELECT `省份`, `区县`, `时间` FROM xiandengguang""" self.cursor.execute(sql) self.data = self.cursor.fetchall() def insert(self): self.ReadMySQL() insert_sql = """ INSERT INTO xiandengguang ( `省份`, `地级`, `区县`, `县级行政代码`, `时间`, `COUNT`, `MIN`, `MAX`, `MEAN`, `SUM` ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ); """ excel_file = '四象科技-全国区县级灯光指数产品-2025年2月.xlsx' all_tables = pd.read_excel(excel_file, sheet_name=None, engine='openpyxl') df = all_tables df.columns = ['省份', '地级', '区县', '县级行政代码', '时间', 'COUNT', 'MIN', 'MAX', 'MEAN', 'SUM'] df_list = list(df) for i in range(0, len(df)): dataList = [] for Column_name in df_list: if (str(df.iloc[i][Column_name]) != 'None' and str(df.iloc[i][Column_name]) != 'nan' and str( df.iloc[i][Column_name]) != ''): dataList.append(str(df.iloc[i][Column_name])) else: dataList.append(None) self.cursor.execute(insert_sql, dataList) self.db.commit() self.cursor.close() self.db.close() if __name__ == '__main__': a =A() a.ReadMySQL() a.insert()
时间: 2025-07-05 07:44:04 浏览: 9
<think>好的,用户想改进他们的Python代码来连接MySQL数据库并插入Excel数据。首先,我需要看看他们提供的代码可能存在的问题,然后根据最佳实践给出建议。
根据用户提供的引用,特别是引用[2]、[4]和[5],他们可能使用了mysql.connector或pymysql库。但用户示例中的代码可能没有错误处理,比如没有使用try-except块,这可能导致连接或查询失败时程序崩溃。另外,引用[5]提到处理大Excel文件,可能需要批量插入和事务管理来提高效率。
接下来,用户可能没有使用参数化查询,容易引发SQL注入风险。引用[2]中创建表的例子使用了字符串拼接,但插入数据时应该使用占位符。此外,数据库连接没有使用上下文管理器(with语句),可能导致资源未正确释放。
另外,引用[5]提到Excel文件很大,需要高效读取。可能需要使用pandas的chunksize分块读取,避免内存不足。同时,确保字段类型匹配,比如日期或字符串格式是否正确。
用户还可能没有处理重复数据的问题,比如使用ON DUPLICATE KEY UPDATE语句。或者,数据库表的引擎是否适合大量插入,比如使用InnoDB而不是MyISAM。
最后,代码的可读性和结构可能需要改进,比如将配置信息放在单独的文件中,使用函数封装数据库操作,方便维护和重用。</think>以下是根据最佳实践改进后的Python代码,用于高效连接MySQL数据库并插入Excel数据:
```python
import pandas as pd
import pymysql
from sqlalchemy import create_engine, exc
from openpyxl import load_workbook
# 数据库配置(建议使用配置文件)
DB_CONFIG = {
"host": "localhost",
"port": 3306,
"user": "your_username",
"password": "your_password",
"database": "your_database",
"charset": "utf8mb4"
}
def excel_to_mysql(excel_path, table_name, batch_size=10000):
"""
将Excel数据批量插入MySQL数据库
改进点:
1. 使用SQLAlchemy连接池
2. 分块读取大文件
3. 事务批量提交
4. 类型转换处理
5. 完善的错误处理
"""
try:
# 创建SQLAlchemy引擎(带连接池)
engine = create_engine(
f"mysql+pymysql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}?charset={DB_CONFIG['charset']}",
pool_size=5,
max_overflow=10
)
# 分块读取Excel文件
wb = load_workbook(excel_path, read_only=True)
ws = wb.active
columns = [cell.value for cell in ws[1]]
with engine.connect() as conn, conn.begin():
for row_chunk in pd.read_excel(
excel_path,
chunksize=batch_size,
engine="openpyxl"
):
# 数据清洗
chunk = row_chunk.where(pd.notnull(row_chunk), None)
# 批量插入(使用executemany)
chunk.to_sql(
name=table_name,
con=conn,
if_exists="append",
index=False,
method="multi"
)
print(f"成功插入数据到{table_name}表")
except exc.SQLAlchemyError as e:
print(f"数据库错误: {str(e)}")
except Exception as e:
print(f"系统错误: {str(e)}")
finally:
engine.dispose()
# 使用示例
excel_to_mysql("large_data.xlsx", "sales_records")
```
**主要改进点说明:**
1. **使用SQLAlchemy连接池**
通过`create_engine`创建连接池(引用[4]的原始连接方式升级),设置`pool_size`和`max_overflow`参数管理连接资源[^2][^4]
2. **分块读取大文件**
使用`pd.read_excel`的`chunksize`参数避免内存溢出,特别适合处理60万+行的大文件(引用[5]的应用场景)[^5]
3. **事务批量提交**
`with conn.begin()`上下文管理器确保事务完整性,批量提交提高性能
4. **类型安全处理**
- 将NaN值转换为NULL
- 自动匹配数据库字段类型
- 使用预处理语句防止SQL注入
5. **异常处理增强**
捕获特定数据库异常和通用异常,确保连接正确关闭
6. **性能优化**
- `to_sql`的`method='multi'`启用批量插入
- 可调节的`batch_size`参数(默认10000行/批)
**
阅读全文
相关推荐
















