import datetime import numpy as np import oracledb import pandas as pd import matplotlib.pyplot as plt from pptx import Presentation from pptx.util import Inches import os import matplotlib.colors from matplotlib.patches import Patch import time import logging import test import concurrent.futures # 获取当前时间 now = datetime.datetime.now() # 将时间转换为字符串,格式为YYYY-MM-DD_HH-MM-SS timestamp = now.strftime("%Y-%m-%d") logger = logging.getLogger() logger.setLevel(logging.INFO) format = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s') logFile = f'{timestamp}-log.txt' fh = logging.FileHandler(logFile, mode='a', encoding='utf-8') fh.setLevel(logging.INFO) fh.setFormatter(format) logger.addHandler(fh) start_time1 = time.time() # 数据库连接信息 dsn = ( "(DESCRIPTION=" "(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=10.10.164.60)(PORT=1521)))" "(CONNECT_DATA=(SERVICE_NAME=YMSDB01)))" ) username = 'acme' password = 'acme' oracledb.init_oracle_client(lib_dir=r"C:\Users\Q06412\Downloads\instantclient-basic-windows.x64-21.17.0.0.0dbru\instantclient_21_17") # 创建连接 connection = oracledb.connect(user=username, password=password, dsn=dsn) try: # 创建一个游标 cursor = connection.cursor() logging.info("SQL查询5个状态为0的lot任务---开始") # SQL 查询语句 sql_query = """ SELECT DISTINCT DRB_ID FROM ( SELECT DISTINCT DRB_ID,LOT_ID,ROW_NUMBER () OVER (ORDER BY CREATE_TIME ASC) AS RN FROM DRB_INFO_CONFIG dic WHERE STATUS=0 ) DRB_CNT WHERE RN <=5 """ # 执行查询 cursor.execute(sql_query) # 获取所有结果 lotresults = cursor.fetchall() logging.info("SQL查询5个状态为0的lot任务---结束") # print(len(results)) ==5 遍历每个drblot for i in range(0, len(lotresults)): # 5个lot查出来立马修改status=2,表示正在进程中 sql_query = """ update DRB_INFO_CONFIG set STATUS = :1 where DRB_ID = :2 """ cursor.execute(sql_query, (2, lotresults[i][0])) connection.commit() except Exception as e: logging.info(e) finally: # 关闭游标和连接 cursor.close() connection.close() def execute_query(sql_query): # 每个线程独立创建连接(关键!避免线程安全问题) # 创建连接 connection = oracledb.connect(user=username, password=password, dsn=dsn) cursor = connection.cursor() try: cursor.execute(sql_query) results = cursor.fetchall() return results # 返回结果供主线程收集 except Exception as ee: logging.info(ee) finally: connection.close() # 确保关闭连接 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # 控制并发数 # 提交所有查询任务 future_to_query = { executor.submit(execute_query, test.sql_query): test for test in lotresults } # 收集结果(按完成顺序) results = [] for future in concurrent.futures.as_completed(future_to_query): test = future_to_query[future] try: result = future.result() results.append(result) # 存储结果 except Exception as e: print(f"Query {test.sql_query} failed: {e}") end_time1 = time.time() run_time = end_time1 - start_time1 logging.info("代码运行时间为: %s 秒", run_time)
时间: 2025-03-21 13:08:53 浏览: 34
<think>嗯,用户给了一段Python代码,还让我介绍一下里面的各个导入的库和整体功能。首先,我需要仔细看一下代码结构,了解各个部分的作用。
首先,用户导入了很多库,比如datetime、numpy、oracledb、pandas、matplotlib等等。这些都是Python中常用的数据处理、数据库连接、可视化相关的库。然后,代码里还涉及到生成PPT、日志记录、多线程处理,看起来像是一个自动化处理数据的脚本,可能用于从数据库提取数据,生成报告之类的。
接下来,代码里获取当前时间,设置日志记录,连接到Oracle数据库。这里用了oracledb库,可能需要客户端库,所以用户指定了init_oracle_client的路径。然后执行SQL查询,获取5个状态为0的DRB_ID,之后更新这些记录的状态为2,表示正在处理。这部分可能涉及到任务队列的处理,防止重复处理同一个任务。
然后,定义了一个execute_query函数,用来执行SQL查询,每个线程独立创建连接,避免线程安全问题。这里用了concurrent.futures的ThreadPoolExecutor来并发执行查询,最多5个线程同时进行。提交任务后,收集结果,最后计算整个代码的运行时间,并记录日志。
不过,用户的问题是要介绍这些导入的库,以及整个代码的功能。可能用户是想了解这段代码的作用,或者学习这些库的用途。需要注意代码中可能存在的问题,比如多线程处理数据库连接是否正确,是否有资源泄露,或者异常处理是否完善。
比如,在更新状态为2的时候,是在for循环里执行cursor.execute,但这里可能有问题,因为循环中的每个lotresults[i][0]是否都被正确处理?另外,多线程部分,每个线程都创建自己的连接,这是正确的做法,因为数据库连接不是线程安全的。但需要确认是否正确关闭了连接,在execute_query函数里用了connection.close(),应该没问题。
另外,用户可能没有注意到,在并发查询部分,test.sql_query可能有问题,因为test是从lotresults中取的,而lotresults的元素是之前查询得到的DRB_ID。这里的test变量可能并不是一个包含sql_query属性的对象,这部分代码可能有错误。因为lotresults的每个元素是从SELECT DISTINCT DRB_ID获取的,所以lotresults[i][0]是DRB_ID的值,而后面在提交任务时,循环的是test in lotresults,而test是元组,可能没有sql_query属性。这可能是个bug,需要指出。
此外,日志记录的部分是否正确配置,比如fh的处理程序是否添加正确,是否重复添加了handler导致日志重复输出。还有,在多线程环境下,日志记录是否线程安全,但logging模块本身是线程安全的,所以应该没问题。
总结下来,这段代码的主要功能是从Oracle数据库获取待处理的任务,更新状态为处理中,然后并发执行多个查询,收集结果,记录运行时间。可能用于批量处理任务,生成报告或进行数据分析。但代码中存在可能的错误,比如test.sql_query的使用,需要用户检查这部分是否正确。</think>我将分步骤解析这段代码的功能和结构,并着重说明主要模块的作用:
一、模块导入部分(技术选型分析)
1. 数据处理核心库
- `numpy`:用于科学计算,处理多维数组 $shape = (m,n)$
- `pandas`:数据清洗/分析的利器,DataFrame 结构 $df = pd.DataFrame(data)$
- `matplotlib`:数据可视化核心库,生成 $y = f(x)$ 等图表
2. 数据库交互
- `oracledb`:Oracle 官方 Python 驱动,支持最新数据库特性
- 连接池配置:`init_oracle_client` 指定客户端库路径
3. 自动化办公
- `pptx`:实现 PPT 自动生成,通过 `Inches` 控制元素布局
- `os`:文件路径操作,保障跨平台兼容性
4. 辅助工具
- `datetime` + `time`:时间戳生成 $t = \frac{1}{2}gt^2$ 和性能监控
- `logging`:标准化日志记录,格式为 `%(asctime)s %(filename)s...`
- `concurrent.futures`:实现多线程并发查询
二、核心执行流程(架构图示意)
$$
\begin{aligned}
1.\ & \text{初始化日志系统} \\
2.\ & \text{建立数据库连接} \\
3.\ & \text{查询待处理任务(状态0的DRB_ID)} \\
4.\ & \text{批量更新任务状态为处理中(状态2)} \\
5.\ & \text{使用线程池并发执行查询} \\
6.\ & \text{结果收集与性能分析}
\end{aligned}
$$
三、关键技术亮点
1. 多线程优化
- 采用 `ThreadPoolExecutor` 实现并发查询
- 每个线程独立创建连接(解决线程安全问题)
- 最大并发数控制为5(`max_workers=5`)
2. 数据库操作安全
```python
try:
cursor.execute(sql)
connection.commit() # 显式提交事务
except Exception as e:
logger.error(e) # 异常捕获
finally:
connection.close() # 资源释放保障
```
四、潜在优化建议
1. 连接池管理:建议使用 `SessionPool` 替代频繁创建连接
2. 异常处理增强:增加重试机制应对网络波动
3. 配置分离:将数据库连接参数提取到配置文件中
4. SQL 注入防护:对 `lotresults[i][0]` 进行参数化校验
五、性能监控设计
- 使用时间差计算执行时长:
$$ \Delta t = t_{end} - t_{start} $$
- 日志记录运行时长:`logging.info("代码运行时间为:%s 秒", run_time)`
该代码实现了从 Oracle 数据库获取任务、并行处理数据到生成分析报告的完整自动化流程,适用于制造业数据分析、质量监控等场景。建议重点检查 `test.sql_query` 的引用是否正确(当前代码存在未定义风险)。
阅读全文
相关推荐



















