活动介绍

import numpy as np # 参数设置 n_steps = 1000 # 时间步长 dt = 0.1 # 时间间隔(秒) # 初始状态(真实值) p_true = np.zeros((n_steps, 3)) # 位置 (x, y, z) v_true = np.zeros((n_steps, 3)) # 速度 (vx, vy, vz) theta_true = np.zeros((n_steps, 2)) # 方向 (偏航角, 俯仰角) # 假设无人机匀速上升并转弯 v_true[:, 0] = 5.0 # x方向速度 5 m/s v_true[:, 2] = 2.0 # z方向速度 2 m/s(上升) theta_true[:, 0] = np.linspace(0, 2*np.pi, n_steps) # 偏航角匀速旋转 # 生成真实轨迹 for t in range(1, n_steps): p_true[t] = p_true[t-1] + v_true[t-1] * dt theta_true[t] = theta_true[t-1] + [0.1, 0.0] * dt # 简单方向变化

时间: 2025-06-16 11:48:04 浏览: 35
<think>好的,用户想用numpy模拟无人机在三维空间中匀速上升并转弯的运动轨迹,包括位置、速度和方向角的变化。首先,我需要理清楚用户的需求。用户提到的“匀速上升”意味着在垂直方向(可能是z轴)上速度恒定,而“转弯”可能涉及水平面内的运动,比如在x-y平面上做圆周运动或者改变方向角。 接下来,我需要考虑无人机的运动学模型。根据引用[1],运动学模型需要处理位置、速度、加速度等随时间的变化。由于用户要求匀速上升,所以z轴方向的加速度应该为零,速度是恒定的。而在转弯时,如果是水平面的圆周运动,可能需要向心加速度,这会涉及到速度和方向的变化。 对于转弯部分,假设无人机在水平面做匀速圆周运动,那么需要确定转弯半径和角速度。或者,用户可能只是想让无人机改变航向角,比如从某个初始方向逐渐转向,这时候可能需要用方向角随时间变化的函数来处理。 接下来,我需要确定如何表示位置、速度和方向角。在三维空间中,位置可以用三个坐标轴(x, y, z)表示。速度矢量则需要分解到三个轴上。方向角通常包括偏航角(yaw)、俯仰角(pitch)和横滚角(roll),但这里可能主要关注偏航角,因为转弯主要影响水平方向。 假设无人机在转弯过程中保持高度匀速上升,水平方向做匀速圆周运动。那么,在x-y平面上的运动可以用圆周运动的参数方程表示:x = r*cos(ωt + θ0),y = r*sin(ωt + θ0),其中r是半径,ω是角速度,θ0是初始角度。速度在x和y方向的分量则是导数:vx = -rω*sin(ωt + θ0),vy = rω*cos(ωt + θ0)。这样,水平速度的大小是rω,保持恒定,方向随时间变化,符合匀速圆周运动的条件。而z方向的速度是恒定的vz0。 不过,用户提到的是匀速上升并转弯,可能更希望无人机在转弯时方向角随时间线性变化,比如偏航角从0开始逐渐增加,这样无人机的运动轨迹可能是一个螺旋上升的路径。这种情况下,方向角的变化率是恒定的,即角速度ω恒定。 这时候,可以参数化时间t,计算每个时间点的位置。例如,假设转弯时的水平速度为v_horizontal,方向角θ(t) = ω*t,那么x和y方向的速度分量为vx = v_horizontal * cos(θ(t)),vy = v_horizontal * sin(θ(t))。而z方向的速度vz是恒定的。这样,位置随时间积分得到:x = ∫vx dt = (v_horizontal / ω) * sin(ω t),y = (v_horizontal / ω) * (1 - cos(ω t)),z = vz * t。这可能形成一个螺旋形轨迹,但如果是平面转弯,可能需要调整参数。 或者,如果转弯是圆周运动,那么半径r = v_horizontal / ω,这样x = r * sin(ω t),y = r * (1 - cos(ω t)),这可能更直观。但需要根据用户的具体需求来调整模型。 接下来,我需要用numpy来实现这些计算。首先确定时间步长,比如总时间T,采样点数N,生成时间数组t = np.linspace(0, T, N)。然后计算各方向的速度和位置。 对于方向角,偏航角(yaw)θ(t) = ω * t,俯仰角(pitch)可能不变,因为匀速上升,而横滚角(roll)可能在转弯时有一定的倾斜,但用户可能暂时不需要考虑这么复杂,先忽略。 然后,将速度分解到x、y、z轴,积分得到位置。例如,vz是恒定值,z位置就是vz * t。水平方向的速度vx和vy由偏航角决定,所以vx = v_horizontal * np.cos(theta),vy = v_horizontal * np.sin(theta),其中theta随时间变化。如果转弯的角速度是omega,那么theta = omega * t。这样,位置x和y可以通过对vx和vy积分得到,即x = v_horizontal / omega * np.sin(omega * t),y = v_horizontal / omega * (1 - np.cos(omega * t))。这样,当t增加时,x和y会形成圆周运动,而z线性上升,整体轨迹是螺旋形。 不过,用户可能希望无人机在转弯时方向角变化,但水平速度的大小不变,这样就需要保持v_horizontal恒定,同时方向随时间改变。这种情况下,速度的x和y分量会随时间变化,而位置需要通过对速度积分得到,也就是累积和。在离散时间中,可以使用np.cumsum来计算位置。 例如,假设每个时间步长dt,计算每个时间点的速度,然后位置是速度乘以dt的累积和。这可能更准确,特别是当角速度变化或非均匀时间步长时。 现在,具体实现步骤: 1. 设定参数:总时间T,时间步数N,初始位置,上升速度vz,水平速度v_horizontal,转弯角速度omega(即偏航角变化率)。 2. 生成时间数组t = np.linspace(0, T, N) 3. 计算z方向的位置:z = vz * t 4. 计算每个时间点的偏航角theta = omega * t 5. 计算水平速度分量:vx = v_horizontal * np.cos(theta),vy = v_horizontal * np.sin(theta) 6. 计算x和y的位置:需要对vx和vy进行积分,即累积和乘以时间步长dt。因为时间步长是均匀的,dt = T/(N-1)。所以x = np.cumsum(vx) * dt,同样y = np.cumsum(vy) * dt。不过初始位置可能不为零,需要加上初始位置。 但这里有个问题,如果使用np.cumsum,初始位置应该是初始值,比如x[0] = x0,然后x[i] = x[i-1] + vx[i] * dt。但使用cumsum的话,如果vx是每个时间点的速度,那么积分应该是vx * dt的累积。例如: dt = t[1] - t[0] x = x0 + np.cumsum(vx) * dt 但这样第一个元素x[0] = x0 + vx[0] * dt,而实际上,初始位置x0应在t=0时,x[0]=x0,然后x[1] = x0 + vx[0]*dt,x[2] = x0 + vx[0]*dt + vx[1]*dt,依此类推。因此,正确的计算应该是x = x0 + np.concatenate([[0], np.cumsum(vx[:-1] * dt)]) 不过这样处理可能比较复杂。另一种方法是使用初始位置,然后在每个时间步加上当前速度乘以dt。例如: x = np.zeros_like(t) x[0] = x0 for i in range(1, N): x[i] = x[i-1] + vx[i-1] * dt 同样处理y。不过使用循环可能效率不高,对于numpy来说,可以用cumsum来处理,但需要注意起始点。或者,假设初始速度为vx[0],在t=0到t=dt时间段内,速度是vx[0],那么x[0] = x0 + vx[0] * dt/2?这可能需要更精确的积分方法,比如梯形法则,但用户可能只需要简单的欧拉积分,即每个时间步的速度乘以dt,得到位置的增量。 所以,正确的离散积分应该是x[i] = x[i-1] + vx[i-1] * dt,因为vx在时间i-1到i期间是恒定的。因此,vx数组的长度是N,而时间步数是N-1。或者,如果t有N个点,则dt = T/(N-1),每个时间步长是dt,而速度数组的长度也是N,这样在计算位置时,需要使用vx的前N-1个元素乘以dt,然后累积求和。 例如: dt = t[1] - t[0] # 假设t是均匀划分的 x = x0 + np.cumsum(vx[:-1] * dt) x = np.concatenate([[x0], x]) 这样,x的长度是N。或者,更简单的方法是生成时间数组为N个点,每个时间点的速度对应的是该点开始的速度,那么位置的计算应该从x0开始,每个步骤加上vx[i] * dt。例如: x = np.zeros(N) x[0] = x0 for i in range(1, N): x[i] = x[i-1] + vx[i-1] * dt 同样处理y和z。不过z方向是匀速,所以z = z0 + vz * t,这样更直接,不需要积分,因为速度恒定。 所以,可能对于z方向直接计算z = z0 + vz * t,而对于x和y方向,由于速度随时间变化,需要用积分。 综上,代码的大致步骤: 1. 设定参数: T = 10 # 总时间,秒 N = 1000 # 时间点数 vz = 2.0 # 上升速度,m/s v_horizontal = 5.0 # 水平速度,m/s omega = np.pi / 5 # 角速度,rad/s,控制转弯速率 x0, y0, z0 = 0, 0, 0 # 初始位置 2. 生成时间数组: t = np.linspace(0, T, N) dt = t[1] - t[0] 3. 计算z方向位置: z = z0 + vz * t 4. 计算偏航角theta随时间的变化: theta = omega * t # 假设初始偏航角为0 5. 计算水平速度分量vx和vy: vx = v_horizontal * np.cos(theta) vy = v_horizontal * np.sin(theta) 6. 计算x和y方向的位置: x = np.zeros(N) y = np.zeros(N) x[0] = x0 y[0] = y0 for i in range(1, N): x[i] = x[i-1] + vx[i-1] * dt y[i] = y[i-1] + vy[i-1] * dt 或者使用numpy的累积和: dx = np.concatenate([[0], vx[:-1] * dt]) dy = np.concatenate([[0], vy[:-1] * dt]) x = x0 + np.cumsum(dx) y = y0 + np.cumsum(dy) 这样可以避免循环,提高效率。 7. 方向角的变化:theta = omega * t,即每个时间点的偏航角。 最后,将位置、速度和方向角保存或输出。可以用matplotlib绘制三维轨迹。 用户可能需要的是位置、速度、方向角随时间的变化数据,所以代码需要生成这些数组。 现在,检查是否满足用户的要求:三维空间中的匀速上升(z方向速度恒定),并转弯(水平方向速度方向变化)。方向角的变化是偏航角随时间线性增加,符合转弯的需求。 可能存在的问题:当水平速度v_horizontal和角速度omega的关系,如果转弯半径r = v_horizontal / omega,这样轨迹的半径是恒定的。例如,v_horizontal = 5 m/s,omega = π/5 rad/s,则r = 5 / (π/5) ≈ 25/π ≈ 7.96米。这会形成一个水平面的圆周运动,同时z方向匀速上升,整体是螺旋形轨迹。 用户的问题是否希望这样的螺旋轨迹?或者只是简单的转弯,比如从某个方向转向另一个方向,而轨迹可能是一个弧线?如果用户的问题中的转弯是指改变航向,比如从北向东转,而保持直线运动,则可能不需要圆周运动。但通常,转弯意味着方向持续变化,形成曲线轨迹。 另外,方向角θ(t) = omega * t,即随时间线性增加,这会导致无人机在水平面做圆周运动,因为角速度恒定,水平速度大小恒定,方向变化,从而产生向心加速度,这样符合匀速圆周运动的条件。 因此,这样的模型是正确的。 现在,编写代码: import numpy as np import matplotlib.pyplot as plt # 参数设置 T = 10 # 总时间(秒) N = 1000 # 时间步数 vz = 2.0 # 上升速度(m/s) v_horizontal = 5.0 # 水平速度(m/s) omega = np.pi / 5 # 角速度(rad/s) # 初始位置 x0, y0, z0 = 0, 0, 0 # 生成时间数组 t = np.linspace(0, T, N) dt = t[1] - t[0] # 计算z轴位置 z = z0 + vz * t # 计算偏航角(水平面转角) theta = omega * t # 计算水平速度分量 vx = v_horizontal * np.cos(theta) vy = v_horizontal * np.sin(theta) # 计算x和y轴位置(积分速度) x = x0 + np.cumsum(np.concatenate([[0], vx[:-1] * dt])) y = y0 + np.cumsum(np.concatenate([[0], vy[:-1] * dt])) # 方向角(偏航角)即为theta数组 # 可视化 fig = plt.figure() ax = fig.add_subplot(111, projection='3d') ax.plot(x, y, z) ax.set_xlabel('X (m)') ax.set_ylabel('Y (m)') ax.set_zlabel('Z (m)') plt.title('无人机三维运动轨迹') plt.show() 这段代码应该能生成一个螺旋上升的轨迹。用户可能需要调整参数,比如omega,来改变转弯的紧密度。 关于方向角的变化,theta是偏航角,这里假设初始角度为0,随时间线性增加,导致无人机在水平面顺时针旋转(因为角速度是正的,而坐标系的定义可能影响方向,需要确认)。如果用户希望不同的初始角度,可以调整theta的初始值。 另外,速度的大小在水平方向是恒定的,因为v_horizontal固定,而vx和vy的平方和等于v_horizontal^2。所以速度矢量的大小保持不变,方向变化,符合匀速圆周运动的条件。 最后,用户可能需要输出位置、速度、方向角的数据,因此可以将它们保存为数组,或者进一步处理。</think>下面是用numpy模拟无人机三维匀速上升并转弯运动的实现方法: ### 1. 建立运动模型 无人机在垂直方向匀速上升,水平面做匀速圆周运动。设上升速度$v_z$恒定,水平速度$v_{xy}$大小恒定且方向持续变化,符合运动学几何约束[^1]。 ### 2. 参数设定 ```python import numpy as np # 基础参数 T = 10.0 # 总时间(s) N = 1000 # 采样点数 v_z = 2.0 # 上升速度(m/s) omega = np.pi # 转弯角速度(rad/s) r = 5.0 # 水平转弯半径(m) ``` ### 3. 计算运动轨迹 $$ \begin{cases} x(t) = r\cos(\omega t) \\ y(t) = r\sin(\omega t) \\ z(t) = v_z t \end{cases} $$ ```python # 生成时间序列 t = np.linspace(0, T, N) # 计算三维坐标 x = r * np.cos(omega * t) y = r * np.sin(omega * t) z = v_z * t # 计算速度分量 v_x = -r * omega * np.sin(omega * t) v_y = r * omega * np.cos(omega * t) v_z_arr = np.full_like(t, v_z) # 计算偏航角(水平面方向角) yaw = np.arctan2(v_y, v_x) ``` ### 4. 可视化轨迹 ```python import matplotlib.pyplot as plt fig = plt.figure() ax = fig.add_subplot(111, projection='3d') ax.plot(x, y, z, lw=1) ax.set_xlabel('X(m)'); ax.set_ylabel('Y(m)'); ax.set_zlabel('Z(m)') plt.show() ``` ### 5. 运动特性分析 - 位置变化:螺旋上升轨迹 - 速度特征: $$ \begin{cases} \|v_{xy}\| = r\omega = \text{恒定} \\ v_z = \text{恒定} \end{cases} $$ - 方向角:偏航角$\psi(t) = \omega t + \pi/2$(因速度矢量与位置矢量相位差$\pi/2$)
阅读全文

相关推荐

class HeadingFilter: def __init__(self): self.current_heading = 0.0 # 当前融合后的航向角(度) self.last_gyro_time = None # 上次陀螺仪更新时间戳 self.temp_angle = 0.1 # 修正步长(需根据实际调试) def update_gyro(self, gyro_z_raw, timestamp): """更新陀螺仪数据(5ms周期调用) Args: gyro_z_raw: 陀螺仪Z轴原始数据 timestamp: 当前时间(秒) """ # 转换原始数据为角速度(°/s) gyro_z = gyro_z_raw / 16.4 # 计算时间间隔 if self.last_gyro_time is None: dt = 0.005 # 默认5ms周期 else: dt = timestamp - self.last_gyro_time self.last_gyro_time = timestamp # 陀螺仪积分更新航向 self.current_heading += gyro_z * dt def update_gps(self, gps_heading): """更新GPS航向角(100ms周期调用) Args: gps_heading: GPS测量的航向角(度) """ # 一阶互补滤波修正 if gps_heading < self.current_heading: self.current_heading += self.temp_angle else: self.current_heading -= self.temp_angle def get_heading(self): """获取当前融合后的航向角""" return self.current_heading # ================= 使用示例 ================= if __name__ == "__main__": import time import numpy as np # 初始化滤波器 filter = HeadingFilter() # 模拟数据(陀螺仪漂移+0.5°/s,真实航向90°) start_time = time.time() for i in range(1000): # 模拟5秒数据(1000*5ms) # 模拟陀螺仪数据(含漂移) gyro_z_raw = np.random.normal(1640, 50) # 模拟±2000dps量程数据 # 更新陀螺仪(5ms周期) t = start_time + i * 0.005 filter.update_gyro(gyro_z_raw, t) # 模拟GPS更新(100ms周期) if i % 20 == 0: true_heading = 90.0 # 真实航向 gps_noise = np.random.normal(0, 1) # GPS噪声 filter.update_gps(true_heading + gps_noise) # 每100ms输出一次 if i % 20 == 0: print(f"Time: {t-start_time:.3f}s, " f"Filtered Heading: {filter.get_heading():.2f}°") 请将该python代码,改成c语言代码

import numpy as np import matplotlib.pyplot as plt def fractional_runge_kutta(alpha, f, y0, t_span, h): """ 使用改进的分数阶龙格-库塔方法求数值解。 参数: alpha (float): 分数阶导数阶次 f (function): 微分方程右端项函数 y0 (array-like): 初始条件 t_span (tuple): 时间区间 [t_start, t_end] h (float): 步长 返回: ts (list): 时间序列 ys (list of lists): 解的时间演化 """ t_start, t_end = t_span n_steps = int((t_end - t_start) / h) ts = np.linspace(t_start, t_end, n_steps + 1) ys = np.zeros((n_steps + 1, len(y0))) ys[0] = y0 # 计算权重系数(Grünwald-Letnikov) weights = [] w_prev = 1.0 for k in range(n_steps + 1): weight = (-alpha * w_prev) / k if k != 0 else 1.0 weights.append(weight) w_prev = weight for i in range(1, n_steps + 1): sum_term = 0 for j in range(i): sum_term += weights[i-j-1] * f(ts[j], ys[j]) ys[i] = ys[i-1] + h**alpha * sum_term / np.math.gamma(2-alpha) return ts, ys # 蔡氏混沌系统的定义 def chua_system(t, state, a=15.6, b=28, c=-7/5): x, y, z = state g_x = -(c/7)*x + (c/2)*(abs(x+1)-abs(x-1)) dx_dt = a*(y-x-g_x) dy_dt = x-y+z dz_dt = -b*y-z return np.array([dx_dt, dy_dt, dz_dt]) # 参数设置 initial_state = [0.1, 0.1, 0.1] time_interval = (0, 100) step_size = 0.01 fractional_order = 0.95 # 求解 ts, states = fractional_runge_kutta(fractional_order, lambda t, s: chua_system(t, s), initial_state, time_interval, step_size) # 提取变量轨迹 xs, ys, zs = states[:, 0], states[:, 1], states[:, 2] # 绘制时间序列图 plt.figure(figsize=(12, 6)) plt.plot(ts, xs, label="x(t)", color='blue') plt.plot(ts, ys, label="y(t)", color='green', linestyle="--") plt.plot(ts, zs, label="z(t)", color='red', linestyle="-.") plt.title("Chua System Time Evolution", fontsize=16) plt.xlabel("Time", fontsize=14) plt.ylabel("State Variables", fontsize=14) plt.legend() plt.grid(True) plt.show() # 绘制相空间图 fig = plt.figure(figsize=(12, 8)) ax = fig.add_subplot(projection='3d') ax.plot(xs, ys, zs, lw=0.5, color='purple') ax.set_title("Phase Space Trajectory of Chua System", fontsize=16) ax.set_xlabel('X Axis', fontsize=14) ax.set_ylabel('Y Axis', fontsize=14) ax.set_zlabel('Z Axis', fontsize=14) plt.show() # 分叉图分析 param_values = np.linspace(10, 20, 500) # 可变参数范围 last_points = [] for param in param_values: def modified_chua_system(t, state): return chua_system(t, state, a=param) _, temp_states = fractional_runge_kutta(fractional_order, lambda t, s: modified_chua_system(t, s), initial_state, time_interval, step_size) last_point = temp_states[-1][0] # 获取最终状态的第一个坐标 last_points.append(last_point) plt.figure(figsize=(12, 6)) plt.scatter(param_values, last_points, s=1, color='black') plt.title("Bifurcation Diagram of Chua System", fontsize=16) plt.xlabel("Parameter 'a'", fontsize=14) plt.ylabel("Last Point of X", fontsize=14) plt.grid(True) plt.show()

import os import time import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split from sklearn.metrics import mean_absolute_error, mean_squared_error import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader, TensorDataset from torch.optim.lr_scheduler import ReduceLROnPlateau # 设置随机种子确保结果可复现 torch.manual_seed(42) np.random.seed(42) sns.set_style('whitegrid') # 设备配置 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"使用设备: {device}") # 1. 数据加载与预处理函数 # -------------------------------------------------- def load_and_preprocess_data(): """加载并预处理所有数据源""" print("开始数据加载与预处理...") start_time = time.time() # 加载EC气象数据 ec_df = pd.read_csv('阿拉山口风电场_EC_data.csv', parse_dates=['生成日期', '预测日期']) ec_df = ec_df[ec_df['场站名'] == '阿拉山口风电场'] # 计算EC风速和风向 ec_df['EC风速(m/s)'] = np.sqrt(ec_df['U风分量(m/s)']**2 + ec_df['V风分量(m/s)']**2) ec_df['EC风向(度)'] = np.degrees(np.arctan2(ec_df['V风分量(m/s)'], ec_df['U风分量(m/s)'])) % 360 # 添加EC数据可用时间(生成时间+12小时) ec_df['可用时间'] = ec_df['生成日期'] + pd.Timedelta(hours=12) # 选择关键特征 ec_features = [ '可用时间', '预测日期', 'EC风速(m/s)', 'EC风向(度)', '位势高度_850hPa(gpm)', '温度_850hPa(K)', '相对湿度_850hPa(%)', '位势高度_500hPa(gpm)', '温度_500hPa(K)' ] ec_df = ec_df[ec_features] # 加载风机数据 turbine_df = pd.read_csv('阿拉山口风电场风机数据.csv', encoding='gbk', parse_dates=[0]) turbine_df.columns = ['timestamp', 'wind_speed', 'active_power'] # 加载远动数据 scada_df = pd.read_csv('阿拉山口风电场远动数据.csv', encoding='gbk', parse_dates=[0]) scada_df.columns = ['timestamp', 'active_power_total'] # 合并风机和远动数据 power_df = pd.merge(turbine_df[['timestamp', 'wind_speed']], scada_df, on='timestamp', how='outer') # 按时间排序并填充缺失值 power_df.sort_values('timestamp', inplace=True) power_df['active_power_total'].ffill(inplace=True) power_df['wind_speed'].ffill(inplace=True) # 创建完整的时间序列索引(15分钟间隔) full_range = pd.date_range( start=power_df['timestamp'].min(), end=power_df['timestamp'].max(), freq='15T' ) power_df = power_df.set_index('timestamp').reindex(full_range).reset_index() power_df.rename(columns={'index': 'timestamp'}, inplace=True) power_df[['wind_speed', 'active_power_total']] = power_df[['wind_speed', 'active_power_total']].ffill() # 合并EC数据到主数据集 ec_data = [] for idx, row in power_df.iterrows(): ts = row['timestamp'] # 获取可用的EC预测(可用时间 <= 当前时间) available_ec = ec_df[ec_df['可用时间'] <= ts] if not available_ec.empty: # 获取最近发布的EC数据 latest_gen = available_ec['可用时间'].max() latest_ec = available_ec[available_ec['可用时间'] == latest_gen] # 找到最接近当前时间点的预测 time_diff = (latest_ec['预测日期'] - ts).abs() closest_idx = time_diff.idxmin() ec_point = latest_ec.loc[closest_idx].copy() ec_point['timestamp'] = ts ec_data.append(ec_point) # 创建EC数据DataFrame并合并 ec_ts_df = pd.DataFrame(ec_data) merged_df = pd.merge(power_df, ec_ts_df, on='timestamp', how='left') # 填充缺失的EC数据 ec_cols = [col for col in ec_ts_df.columns if col not in ['timestamp', '可用时间', '预测日期']] for col in ec_cols: merged_df[col] = merged_df[col].interpolate(method='time') # 添加时间特征 merged_df['hour'] = merged_df['timestamp'].dt.hour merged_df['day_of_week'] = merged_df['timestamp'].dt.dayofweek merged_df['day_of_year'] = merged_df['timestamp'].dt.dayofyear merged_df['month'] = merged_df['timestamp'].dt.month # 计算实际风向(如果有测风塔数据,这里使用EC风向) merged_df['风向(度)'] = merged_df['EC风向(度)'] # 移除包含NaN的行 merged_df.dropna(inplace=True) # 特征选择 feature_cols = [ 'wind_speed', 'active_power_total', 'EC风速(m/s)', '风向(度)', '位势高度_850hPa(gpm)', '温度_850hPa(K)', '相对湿度_850hPa(%)', '位势高度_500hPa(gpm)', '温度_500hPa(K)', 'hour', 'day_of_week', 'day_of_year', 'month' ] target_col = 'active_power_total' print(f"数据处理完成! 耗时: {time.time()-start_time:.2f}秒") print(f"数据集形状: {merged_df.shape}") print(f"特征数量: {len(feature_cols)}") return merged_df[feature_cols], merged_df[target_col], merged_df['timestamp'] # 2. 数据准备类 (PyTorch Dataset) # -------------------------------------------------- class WindPowerDataset(Dataset): """风功率预测数据集类""" def __init__(self, X, y, look_back, forecast_steps): """ :param X: 特征数据 (n_samples, n_features) :param y: 目标数据 (n_samples,) :param look_back: 回溯时间步长 :param forecast_steps: 预测步长 """ self.X = X self.y = y self.look_back = look_back self.forecast_steps = forecast_steps self.n_samples = len(X) - look_back - forecast_steps + 1 def __len__(self): return self.n_samples def __getitem__(self, idx): # 获取历史序列 x_seq = self.X[idx:idx+self.look_back] # 获取未来目标序列 y_seq = self.y[idx+self.look_back:idx+self.look_back+self.forecast_steps] # 转换为PyTorch张量 x_tensor = torch.tensor(x_seq, dtype=torch.float32) y_tensor = torch.tensor(y_seq, dtype=torch.float32) return x_tensor, y_tensor # 3. LSTM模型 (PyTorch实现) # -------------------------------------------------- class WindPowerLSTM(nn.Module): """风功率预测LSTM模型""" def __init__(self, input_size, hidden_size, num_layers, output_steps): """ :param input_size: 输入特征维度 :param hidden_size: LSTM隐藏层大小 :param num_layers: LSTM层数 :param output_steps: 输出步长 """ super(WindPowerLSTM, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.output_steps = output_steps # LSTM层 self.lstm = nn.LSTM( input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, dropout=0.2 if num_layers > 1 else 0 ) # 全连接层 self.fc = nn.Sequential( nn.Linear(hidden_size, 128), nn.ReLU(), nn.Dropout(0.3), nn.Linear(128, 64), nn.ReLU(), nn.Dropout(0.2), nn.Linear(64, output_steps) ) def forward(self, x): # 初始化隐藏状态 h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) # 前向传播LSTM out, _ = self.lstm(x, (h0, c0)) # 只取最后一个时间步的输出 out = out[:, -1, :] # 全连接层 out = self.fc(out) return out # 4. 训练和评估函数 # -------------------------------------------------- def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, epochs, model_name): """训练模型""" print(f"\n开始训练 {model_name} 模型...") start_time = time.time() best_val_loss = float('inf') history = {'train_loss': [], 'val_loss': []} for epoch in range(epochs): # 训练阶段 model.train() train_loss = 0.0 for inputs, targets in train_loader: inputs, targets = inputs.to(device), targets.to(device) # 前向传播 outputs = model(inputs) loss = criterion(outputs, targets) # 反向传播和优化 optimizer.zero_grad() loss.backward() optimizer.step() train_loss += loss.item() * inputs.size(0) # 验证阶段 model.eval() val_loss = 0.0 with torch.no_grad(): for inputs, targets in val_loader: inputs, targets = inputs.to(device), targets.to(device) outputs = model(inputs) loss = criterion(outputs, targets) val_loss += loss.item() * inputs.size(0) # 计算平均损失 train_loss = train_loss / len(train_loader.dataset) val_loss = val_loss / len(val_loader.dataset) history['train_loss'].append(train_loss) history['val_loss'].append(val_loss) # 更新学习率 if scheduler: scheduler.step(val_loss) # 保存最佳模型 if val_loss < best_val_loss: best_val_loss = val_loss torch.save(model.state_dict(), f'best_{model_name}_model.pth') # 打印进度 if (epoch + 1) % 5 == 0: print(f"Epoch [{epoch+1}/{epochs}] - Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}") print(f"训练完成! 耗时: {time.time()-start_time:.2f}秒") print(f"最佳验证损失: {best_val_loss:.6f}") return history def evaluate_model(model, test_loader, scaler, model_name): """评估模型性能""" model.eval() actuals = [] predictions = [] with torch.no_grad(): for inputs, targets in test_loader: inputs = inputs.to(device) # 预测 outputs = model(inputs) # 反归一化 outputs_np = outputs.cpu().numpy() targets_np = targets.numpy() # 反归一化 outputs_inv = scaler.inverse_transform(outputs_np) targets_inv = scaler.inverse_transform(targets_np.reshape(-1, 1)).flatten() # 收集结果 actuals.extend(targets_inv) predictions.extend(outputs_inv.flatten()) # 转换为numpy数组 actuals = np.array(actuals) predictions = np.array(predictions) # 计算性能指标 mae = mean_absolute_error(actuals, predictions) rmse = np.sqrt(mean_squared_error(actuals, predictions)) print(f"\n{model_name} 模型评估结果:") print(f"MAE: {mae:.2f} kW") print(f"RMSE: {rmse:.2f} kW") return actuals, predictions, mae, rmse # 5. 可视化函数 # -------------------------------------------------- def plot_predictions(actuals, predictions, timestamps, model_name, forecast_steps, mae): """可视化预测结果""" # 创建结果DataFrame results = pd.DataFrame({ 'timestamp': timestamps, 'actual': actuals, 'predicted': predictions }) # 设置时间索引 results.set_index('timestamp', inplace=True) # 选择一段代表性的时间序列展示 sample = results.iloc[1000:1300] plt.figure(figsize=(15, 7)) # 绘制实际值 plt.plot(sample.index, sample['actual'], label='实际功率', color='blue', alpha=0.7, linewidth=2) # 绘制预测值 plt.plot(sample.index, sample['predicted'], label='预测功率', color='red', alpha=0.7, linestyle='--', linewidth=2) plt.title(f'{model_name}风功率预测 (预测步长: {forecast_steps}步, MAE: {mae:.2f} kW)', fontsize=14) plt.xlabel('时间', fontsize=12) plt.ylabel('有功功率 (kW)', fontsize=12) plt.legend(fontsize=12) plt.grid(True, linestyle='--', alpha=0.7) plt.xticks(rotation=45) plt.tight_layout() plt.savefig(f'{model_name}_prediction_plot.png', dpi=300) plt.show() return results def plot_training_history(history, model_name): """绘制训练过程中的损失曲线""" plt.figure(figsize=(12, 6)) # 绘制训练损失 plt.plot(history['train_loss'], label='训练损失') # 绘制验证损失 if 'val_loss' in history: plt.plot(history['val_loss'], label='验证损失') plt.title(f'{model_name} 训练过程', fontsize=14) plt.xlabel('训练轮次', fontsize=12) plt.ylabel('损失 (MSE)', fontsize=12) plt.legend(fontsize=12) plt.grid(True, linestyle='--', alpha=0.7) plt.tight_layout() plt.savefig(f'{model_name}_training_history.png', dpi=300) plt.show() # 6. 主函数 # -------------------------------------------------- def main(): # 加载数据 X, y, timestamps = load_and_preprocess_data() # 定义预测配置 ULTRA_SHORT_CONFIG = { 'name': '超短期', 'look_back': 24, # 6小时历史 (24*15min) 'forecast_steps': 16, # 4小时预测 (16*15min) 'batch_size': 64, 'hidden_size': 128, 'num_layers': 2, 'epochs': 100, 'lr': 0.001 } SHORT_TERM_CONFIG = { 'name': '短期', 'look_back': 96, # 24小时历史 (96*15min) 'forecast_steps': 288, # 72小时预测 (288*15min) 'batch_size': 32, 'hidden_size': 256, 'num_layers': 3, 'epochs': 150, 'lr': 0.0005 } # 准备超短期预测数据集 print("\n准备超短期预测数据集...") # 数据标准化 X_scaler = StandardScaler() y_scaler = StandardScaler() X_scaled = X_scaler.fit_transform(X) y_scaled = y_scaler.fit_transform(y.values.reshape(-1, 1)).flatten() # 创建数据集 dataset = WindPowerDataset(X_scaled, y_scaled, ULTRA_SHORT_CONFIG['look_back'], ULTRA_SHORT_CONFIG['forecast_steps']) # 划分数据集 train_size = int(0.8 * len(dataset)) val_size = int(0.1 * len(dataset)) test_size = len(dataset) - train_size - val_size train_dataset, val_dataset, test_dataset = torch.utils.data.random_split( dataset, [train_size, val_size, test_size] ) # 创建数据加载器 train_loader = DataLoader(train_dataset, batch_size=ULTRA_SHORT_CONFIG['batch_size'], shuffle=True) val_loader = DataLoader(val_dataset, batch_size=ULTRA_SHORT_CONFIG['batch_size']) test_loader = DataLoader(test_dataset, batch_size=ULTRA_SHORT_CONFIG['batch_size']) # 创建模型 model_ultra = WindPowerLSTM( input_size=X.shape[1], hidden_size=ULTRA_SHORT_CONFIG['hidden_size'], num_layers=ULTRA_SHORT_CONFIG['num_layers'], output_steps=ULTRA_SHORT_CONFIG['forecast_steps'] ).to(device) # 损失函数和优化器 criterion = nn.MSELoss() optimizer = optim.Adam(model_ultra.parameters(), lr=ULTRA_SHORT_CONFIG['lr']) scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True) # 训练模型 history_ultra = train_model( model_ultra, train_loader, val_loader, criterion, optimizer, scheduler, ULTRA_SHORT_CONFIG['epochs'], 'ultra_short' ) # 评估模型 actuals_ultra, preds_ultra, mae_ultra, rmse_ultra = evaluate_model( model_ultra, test_loader, y_scaler, '超短期' ) # 可视化结果 plot_training_history(history_ultra, '超短期模型') # 获取对应的时间戳 ultra_timestamps = timestamps[ULTRA_SHORT_CONFIG['look_back']:][:len(actuals_ultra)] results_ultra = plot_predictions( actuals_ultra, preds_ultra, ultra_timestamps, '超短期', ULTRA_SHORT_CONFIG['forecast_steps'], mae_ultra ) # 准备短期预测数据集 print("\n准备短期预测数据集...") # 创建数据集 dataset_short = WindPowerDataset(X_scaled, y_scaled, SHORT_TERM_CONFIG['look_back'], SHORT_TERM_CONFIG['forecast_steps']) # 划分数据集 train_size_short = int(0.8 * len(dataset_short)) val_size_short = int(0.1 * len(dataset_short)) test_size_short = len(dataset_short) - train_size_short - val_size_short train_dataset_short, val_dataset_short, test_dataset_short = torch.utils.data.random_split( dataset_short, [train_size_short, val_size_short, test_size_short] ) # 创建数据加载器 train_loader_short = DataLoader(train_dataset_short, batch_size=SHORT_TERM_CONFIG['batch_size'], shuffle=True) val_loader_short = DataLoader(val_dataset_short, batch_size=SHORT_TERM_CONFIG['batch_size']) test_loader_short = DataLoader(test_dataset_short, batch_size=SHORT_TERM_CONFIG['batch_size']) # 创建模型 model_short = WindPowerLSTM( input_size=X.shape[1], hidden_size=SHORT_TERM_CONFIG['hidden_size'], num_layers=SHORT_TERM_CONFIG['num_layers'], output_steps=SHORT_TERM_CONFIG['forecast_steps'] ).to(device) # 损失函数和优化器 optimizer_short = optim.Adam(model_short.parameters(), lr=SHORT_TERM_CONFIG['lr']) scheduler_short = ReduceLROnPlateau(optimizer_short, mode='min', factor=0.5, patience=10, verbose=True) # 训练模型 history_short = train_model( model_short, train_loader_short, val_loader_short, criterion, optimizer_short, scheduler_short, SHORT_TERM_CONFIG['epochs'], 'short_term' ) # 评估模型 actuals_short, preds_short, mae_short, rmse_short = evaluate_model( model_short, test_loader_short, y_scaler, '短期' ) # 可视化结果 plot_training_history(history_short, '短期模型') # 获取对应的时间戳 short_timestamps = timestamps[SHORT_TERM_CONFIG['look_back']:][:len(actuals_short)] results_short = plot_predictions( actuals_short, preds_short, short_timestamps, '短期', SHORT_TERM_CONFIG['forecast_steps'], mae_short ) # 最终报告 print("\n" + "="*50) print("风功率预测模型训练完成!") print("="*50) print(f"超短期模型 (4小时预测):") print(f" - 回溯步长: {ULTRA_SHORT_CONFIG['look_back']} (6小时)") print(f" - 预测步长: {ULTRA_SHORT_CONFIG['forecast_steps']} (4小时)") print(f" - 测试集MAE: {mae_ultra:.2f} kW") print(f" - 测试集RMSE: {rmse_ultra:.2f} kW") print(f"\n短期模型 (72小时预测):") print(f" - 回溯步长: {SHORT_TERM_CONFIG['look_back']} (24小时)") print(f" - 预测步长: {SHORT_TERM_CONFIG['forecast_steps']} (72小时)") print(f" - 测试集MAE: {mae_short:.2f} kW") print(f" - 测试集RMSE: {rmse_short:.2f} kW") print("="*50) # 保存预测结果 results_df = pd.DataFrame({ 'timestamp': short_timestamps, '实际功率': actuals_short, '超短期预测': results_ultra['predicted'].values[:len(actuals_short)], '短期预测': preds_short }) results_df.to_csv('风功率预测结果.csv', index=False) print("预测结果已保存到 '风功率预测结果.csv'") if __name__ == "__main__": main() 请在上述的代码基础上修改,本次修改要求只使用"阿拉山口风电场_EC_data"和"阿拉山口风电场风机数据"这两个数据集进行训练和预测。

import numpy as np import matplotlib.pyplot as plt from scipy.sparse import diags, kron from scipy.sparse.linalg import spsolve # 参数设置 L = 2.0 # 空间域边长 [-1,1] Nx, Ny = 101, 101 # 空间网格数 dx = L/(Nx-1) # 空间步长 dy = L/(Ny-1) T = 1.0 # 总时间 Nt = 100 # 时间步数 dt = T/Nt # 时间步长 x = np.linspace(-1,1,Nx) y = np.linspace(-1,1,Ny) X, Y = np.meshgrid(x, y) # 系数矩阵生成函数 def build_system_matrix(N, a_func, dx): main_diag = np.zeros(N) off_diag = np.zeros(N-1) for i in range(1,N-1): a_L = 0.5*(a_func[i] + a_func[i-1]) a_R = 0.5*(a_func[i] + a_func[i+1]) main_diag[i] = -(a_L + a_R)/dx**2 off_diag[i-1] = a_R/dx**2 return diags([off_diag, main_diag, off_diag], [-1, 0, 1]) # 定义变系数函数 a = np.exp(-(2*X-1)**2 - (2*Y-1)**2) # 初始化解矩阵 u = np.sin(np.pi*X) * np.sin(np.pi*Y) # 初始条件 u = u.reshape(-1) # 展平为一维向量 # 构建x方向和y方向的离散矩阵 A_x = build_system_matrix(Nx, a[0,:], dx) A_y = build_system_matrix(Ny, a[:,0], dy) A_total = kron(A_y, np.eye(Nx)) + kron(np.eye(Ny), A_x) # 隐式时间推进 I = diags([np.ones(Nx*Ny)], [0]) for n in range(Nt): rhs = u + 0.1*dt*(A_total.dot(u)) u = spsolve(I - 0.1*dt*A_total, rhs) # 结果可视化 u_plot = u.reshape(Ny, Nx) fig = plt.figure(figsize=(12,5)) ax1 = fig.add_subplot(121, projection='3d') ax1.plot_surface(X, Y, u_plot, cmap='coolwarm') ax1.set_title('Numerical Solution') ax2 = fig.add_subplot(122) levels = np.linspace(u_plot.min(), u_plot.max(), 20) contour = ax2.contourf(X, Y, u_plot, levels=levels, cmap='coolwarm') plt.colorbar(contour) ax2.set_title('Heatmap') plt.show()解释这个代码中的方程并检查错误

1. 时间步长归一化问题 问题:dt的数值范围可能与其他特征(如t、h)差异巨大,但未做归一化处理 例如:t(天)≈1e3,h(水位)≈1e1,dt(天)≈1e-2~1e2 这会导致神经网络难以学习有效的权重 2. 物理方程离散化误差 问题:原代码假设Δt=1,离散方程为 h_{t+1} = h_t - k·h_t 改为动态dt后,方程变为 h_{t+1} = h_t - k·h_t·dt 当dt较大时(如多日间隔),线性衰减假设可能不成立 物理损失项可能主导训练,压制数据拟合项 3. 递归预测误差累积 问题:在predict()中采用自回归预测时,每个时间步都使用前一步的预测值作为输入 动态dt会放大误差传播(尤其当dt不稳定时) 4. 时间步长计算异常 问题:数据中可能存在dt=0或极端值(如仪器故障导致的时间戳重复) 5. 模型架构不适应动态dt 现象:简单的全连接网络难以学习t、h、dt之间的复杂关系 请你根据上述几点对下面代码进行修改,只给出修改部分即可import tkinter as tk from tkinter import ttk, filedialog, messagebox import pandas as pd import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import tensorflow as tf from tensorflow.keras.models import Model from tensorflow.keras.layers import Input, Dense, Lambda from tensorflow.keras.optimizers import Adam from sklearn.preprocessing import MinMaxScaler import os import time import warnings warnings.filterwarnings('ignore', category=UserWarning, module='tensorflow') mpl.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Arial Unicode MS'] mpl.rcParams['axes.unicode_minus'] = False # 关键修复:使用 ASCII 减号 # 设置中文字体支持 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False class PINNModel(tf.keras.Model): def __init__(self, num_layers=4, hidden_units=32, **kwargs): super(PINNModel, self).__init__(**kwargs) self.dense_layers = [Dense(hidden_units, activation='tanh') for _ in range(num_layers)] self.final_layer = Dense(1, activation='linear') # 添加带约束的物理参数 self.k_raw = tf.Variable(0.001, trainable=True, dtype=tf.float32, name='k_raw') self.k = tf.math.sigmoid(self.k_raw) * 0.5 # 约束在0-0.5之间 def call(self, inputs): t, h, dt = inputs # 修改:添加dt作为输入 # 将时间、水位和时间步长作为输入特征 x = tf.concat([t, h, dt], axis=1) for layer in self.dense_layers: x = layer(x) return self.final_layer(x) def physics_loss(self, t, h_current, dt): """计算物理损失(基于离散渗流方程)""" # 预测下一时刻的水位 h_next_pred = self([t, h_current, dt]) # 离散渗流方程: h_{t+1} = h_t - k * h_t * dt residual = h_next_pred - (h_current - self.k * h_current * dt) return tf.reduce_mean(tf.square(residual)) class DamSeepageModel: def __init__(self, root): self.root = root self.root.title("大坝渗流预测模型(PINNs)") self.root.geometry("1200x800") # 初始化数据 self.train_df = None # 训练集 self.test_df = None # 测试集 self.model = None self.scaler = MinMaxScaler(feature_range=(0, 1)) self.evaluation_metrics = {} # 添加历史记录字典 self.history_records = {} self.current_history_key = None # 创建主界面 self.create_widgets() def create_widgets(self): # 创建主框架 main_frame = ttk.Frame(self.root, padding=10) main_frame.pack(fill=tk.BOTH, expand=True) # 左侧控制面板 control_frame = ttk.LabelFrame(main_frame, text="模型控制", padding=10) control_frame.pack(side=tk.LEFT, fill=tk.Y, padx=5, pady=5) # ====== 新增历史记录UI ====== # 在控制面板中添加历史记录部分 history_frame = ttk.LabelFrame(control_frame, text="历史训练记录", padding=10) history_frame.pack(fill=tk.X, pady=10) # 历史记录选择框 ttk.Label(history_frame, text="选择记录:").grid(row=0, column=0, sticky=tk.W, pady=5) self.history_var = tk.StringVar() self.history_combobox = ttk.Combobox( history_frame, textvariable=self.history_var, width=25, state='readonly' ) self.history_combobox.grid(row=0, column=1, padx=5) self.history_combobox.bind('<<ComboboxSelected>>', self.load_history_record) # 历史记录操作按钮 btn_frame = ttk.Frame(history_frame) btn_frame.grid(row=0, column=2, padx=5) ttk.Button(btn_frame, text="添加当前", command=self.save_current_as_history).pack(side=tk.LEFT, padx=2) ttk.Button(btn_frame, text="删除", command=self.delete_history_record).pack(side=tk.LEFT, padx=2) # 文件选择部分 file_frame = ttk.LabelFrame(control_frame, text="数据文件", padding=10) file_frame.pack(fill=tk.X, pady=5) # 训练集选择 ttk.Label(file_frame, text="训练集:").grid(row=0, column=0, sticky=tk.W, pady=5) self.train_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.train_file_var, width=30, state='readonly').grid(row=0, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("train")).grid(row=0, column=2) # 测试集选择 ttk.Label(file_frame, text="测试集:").grid(row=1, column=0, sticky=tk.W, pady=5) self.test_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.test_file_var, width=30, state='readonly').grid(row=1, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("test")).grid(row=1, column=2) # PINNs参数设置 param_frame = ttk.LabelFrame(control_frame, text="PINNs参数", padding=10) param_frame.pack(fill=tk.X, pady=10) # 验证集切分比例 ttk.Label(param_frame, text="验证集比例:").grid(row=0, column=0, sticky=tk.W, pady=5) self.split_ratio_var = tk.DoubleVar(value=0.2) ttk.Spinbox(param_frame, from_=0, to=1, increment=0.05, textvariable=self.split_ratio_var, width=10).grid(row=0, column=1, padx=5) # 隐藏层数量 ttk.Label(param_frame, text="网络层数:").grid(row=1, column=0, sticky=tk.W, pady=5) self.num_layers_var = tk.IntVar(value=4) ttk.Spinbox(param_frame, from_=2, to=8, increment=1, textvariable=self.num_layers_var, width=10).grid(row=1, column=1, padx=5) # 每层神经元数量 ttk.Label(param_frame, text="神经元数/层:").grid(row=2, column=0, sticky=tk.W, pady=5) self.hidden_units_var = tk.IntVar(value=32) ttk.Spinbox(param_frame, from_=16, to=128, increment=4, textvariable=self.hidden_units_var, width=10).grid(row=2, column=1, padx=5) # 训练轮次 ttk.Label(param_frame, text="训练轮次:").grid(row=3, column=0, sticky=tk.W, pady=5) self.epochs_var = tk.IntVar(value=500) ttk.Spinbox(param_frame, from_=100, to=2000, increment=100, textvariable=self.epochs_var, width=10).grid(row=3, column=1, padx=5) # 物理损失权重 ttk.Label(param_frame, text="物理损失权重:").grid(row=4, column=0, sticky=tk.W, pady=5) self.physics_weight_var = tk.DoubleVar(value=0.5) ttk.Spinbox(param_frame, from_=0.1, to=1.0, increment=0.1, textvariable=self.physics_weight_var, width=10).grid(row=4, column=1, padx=5) # 控制按钮 btn_frame = ttk.Frame(control_frame) btn_frame.pack(fill=tk.X, pady=10) ttk.Button(btn_frame, text="训练模型", command=self.train_model).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="预测结果", command=self.predict).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="保存结果", command=self.save_results).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="重置", command=self.reset).pack(side=tk.RIGHT, padx=5) # 状态栏 self.status_var = tk.StringVar(value="就绪") status_bar = ttk.Label(control_frame, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W) status_bar.pack(fill=tk.X, side=tk.BOTTOM) # 右侧结果显示区域 result_frame = ttk.Frame(main_frame) result_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=5, pady=5) # 创建标签页 self.notebook = ttk.Notebook(result_frame) self.notebook.pack(fill=tk.BOTH, expand=True) # 损失曲线标签页 self.loss_frame = ttk.Frame(self.notebook) self.notebook.add(self.loss_frame, text="训练损失") # 预测结果标签页 self.prediction_frame = ttk.Frame(self.notebook) self.notebook.add(self.prediction_frame, text="预测结果") # 指标显示 self.metrics_var = tk.StringVar() metrics_label = ttk.Label( self.prediction_frame, textvariable=self.metrics_var, font=('TkDefaultFont', 10, 'bold'), relief='ridge', padding=5 ) metrics_label.pack(fill=tk.X, padx=5, pady=5) # 初始化绘图区域 self.fig, self.ax = plt.subplots(figsize=(10, 6)) self.canvas = FigureCanvasTkAgg(self.fig, master=self.prediction_frame) self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) # 损失曲线画布 self.loss_fig, self.loss_ax = plt.subplots(figsize=(10, 4)) self.loss_canvas = FigureCanvasTkAgg(self.loss_fig, master=self.loss_frame) self.loss_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) def save_current_as_history(self): """将当前训练状态保存为历史记录""" if not hasattr(self, 'train_history') or not hasattr(self, 'predictions'): messagebox.showwarning("警告", "没有可保存的训练记录") return # 生成唯一键(时间戳) timestamp = time.strftime("%Y%m%d-%H%M%S") key = f"记录-{timestamp}" # 保存历史记录 self.history_records[key] = { 'train_df': self.train_df.copy(), 'test_df': self.test_df.copy(), 'train_history': self.train_history.copy(), 'predictions': self.predictions.copy(), 'actual_values': self.actual_values.copy(), 'test_time': self.test_time.copy(), 'evaluation_metrics': self.evaluation_metrics.copy(), 'scaler': self.scaler, 'model_params': { 'num_layers': self.num_layers_var.get(), 'hidden_units': self.hidden_units_var.get(), 'epochs': self.epochs_var.get(), 'physics_weight': self.physics_weight_var.get(), 'split_ratio': self.split_ratio_var.get() }, 'file_paths': { 'train': self.train_file_var.get(), 'test': self.test_file_var.get() } } # 更新下拉框 self.update_history_combobox() self.history_var.set(key) self.status_var.set(f"已保存当前训练为历史记录: {key}") def update_history_combobox(self): """更新历史记录下拉框选项""" records = list(self.history_records.keys()) self.history_combobox['values'] = records def load_history_record(self, event=None): """加载选中的历史记录""" key = self.history_var.get() if not key or key not in self.history_records: return record = self.history_records[key] self.current_history_key = key # 恢复数据集 self.train_df = record['train_df'].copy() self.test_df = record['test_df'].copy() # 恢复模型参数设置 params = record['model_params'] self.num_layers_var.set(params['num_layers']) self.hidden_units_var.set(params['hidden_units']) self.epochs_var.set(params['epochs']) self.physics_weight_var.set(params['physics_weight']) self.split_ratio_var.set(params['split_ratio']) # 恢复文件路径显示 files = record['file_paths'] self.train_file_var.set(files['train']) self.test_file_var.set(files['test']) # 恢复训练历史 self.train_history = record['train_history'].copy() # 恢复预测结果 self.predictions = record['predictions'].copy() self.actual_values = record['actual_values'].copy() self.test_time = record['test_time'].copy() self.evaluation_metrics = record['evaluation_metrics'].copy() # 更新状态 self.status_var.set(f"已加载历史记录: {key}") # 显示预测结果 self.show_prediction_results() # 显示损失曲线 self.show_loss_history() def show_prediction_results(self): """显示历史记录的预测结果""" if not hasattr(self, 'predictions'): return # 清除现有图表 self.ax.clear() # 绘制结果 self.ax.plot(self.test_time, self.actual_values, 'b-', label='真实值') self.ax.plot(self.test_time, self.predictions, 'r--', label='预测值') self.ax.set_title(f'大坝渗流水位预测结果(历史记录: {self.current_history_key})') self.ax.set_xlabel('时间') self.ax.set_ylabel('测压管水位', rotation=0) self.ax.legend() # 设置时间轴格式 import matplotlib.dates as mdates self.ax.xaxis.set_major_locator(mdates.YearLocator()) self.ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y')) self.ax.xaxis.set_minor_locator(mdates.MonthLocator(interval=2)) self.ax.grid(which='minor', axis='x', linestyle='--', color='gray', alpha=0.3) self.ax.grid(which='major', axis='y', linestyle='-', color='lightgray', alpha=0.5) self.ax.tick_params(axis='x', which='major', rotation=0, labelsize=10) self.ax.tick_params(axis='x', which='minor', length=3) # 显示评估指标 metrics_text = ( f"MSE: {self.evaluation_metrics['MSE']:.4f} | " f"RMSE: {self.evaluation_metrics['RMSE']:.4f} | " f"MAE: {self.evaluation_metrics['MAE']:.4f} | " f"MAPE: {self.evaluation_metrics['MAPE']:.2f}% | " f"R²: {self.evaluation_metrics['R2']:.4f}" ) self.metrics_var.set(metrics_text) # 在图表上添加指标 self.ax.text( 0.5, 1.08, metrics_text, transform=self.ax.transAxes, ha='center', fontsize=10, bbox=dict(facecolor='white', alpha=0.8) ) # 调整布局 plt.tight_layout(pad=2.0) self.canvas.draw() def show_loss_history(self): """显示历史记录的损失曲线""" if not hasattr(self, 'train_history') or 'train_data_loss' not in self.train_history: return # 修复:清除现有图表 self.loss_ax.clear() # 修正此行 # 绘制损失曲线 epochs_range = range(1, len(self.train_history['train_data_loss']) + 1) self.loss_ax.plot(epochs_range, self.train_history['train_data_loss'], 'b-', label='训练数据损失') if 'physics_loss' in self.train_history: self.loss_ax.plot(epochs_range, self.train_history['physics_loss'], 'r--', label='物理损失') if 'valid_data_loss' in self.train_history: self.loss_ax.plot(epochs_range, self.train_history['valid_data_loss'], 'g-.', label='验证数据损失') self.loss_ax.set_title(f'PINNs训练损失曲线(历史记录: {self.current_history_key})') self.loss_ax.set_xlabel('轮次') self.loss_ax.set_ylabel('损失', rotation=0) self.loss_ax.legend() self.loss_ax.grid(True, alpha=0.3) self.loss_ax.set_yscale('log') self.loss_canvas.draw() def delete_history_record(self): """删除选中的历史记录""" key = self.history_var.get() if not key or key not in self.history_records: return # 确认删除 if not messagebox.askyesno("确认删除", f"确定要删除历史记录 '{key}' 吗?"): return # 删除记录 del self.history_records[key] # 更新下拉框 self.update_history_combobox() # 清空选择 self.history_var.set('') self.status_var.set(f"已删除历史记录: {key}") def select_file(self, file_type): """选择Excel文件并计算时间步长""" try: file_path = filedialog.askopenfilename( title=f"选择{file_type}集Excel文件", filetypes=[("Excel文件", "*.xlsx *.xls"), ("所有文件", "*.*")] ) if not file_path: return df = pd.read_excel(file_path) # 验证必需列是否存在 required_cols = ['year', 'month', 'day', '水位'] missing_cols = [col for col in required_cols if col not in df.columns] if missing_cols: messagebox.showerror("列名错误", f"缺少必需列: {', '.join(missing_cols)}") return # 时间特征处理 time_features = ['year', 'month', 'day'] missing_time_features = [feat for feat in time_features if feat not in df.columns] if missing_time_features: messagebox.showerror("列名错误", f"Excel文件缺少预处理后的时间特征列: {', '.join(missing_time_features)}") return # 创建时间戳列 (增强兼容性) time_cols = ['year', 'month', 'day'] if 'hour' in df.columns: time_cols.append('hour') if 'minute' in df.columns: time_cols.append('minute') if 'second' in df.columns: time_cols.append('second') # 填充缺失的时间单位 for col in ['hour', 'minute', 'second']: if col not in df.columns: df[col] = 0 df['datetime'] = pd.to_datetime(df[time_cols]) # 设置时间索引 df = df.set_index('datetime') # 计算相对时间(天) df['days'] = (df.index - df.index[0]).days # 新增:计算时间步长dt(单位:天) df['dt'] = df['days'].diff() # 处理第一个时间点的dt(设为平均时间步长或默认值1) if len(df) > 1: avg_dt = df['dt'].iloc[1:].mean() else: avg_dt = 1.0 df['dt'] = df['dt'].fillna(avg_dt) # 保存数据 if file_type == "train": self.train_df = df self.train_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载训练集: {len(self.train_df)}条数据") else: self.test_df = df self.test_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载测试集: {len(self.test_df)}条数据") except Exception as e : error_msg = f"文件读取失败: {str(e)}\n\n请确保:\n1. 文件不是打开状态\n2. 文件格式正确\n3. 包含必需的时间和水位列" messagebox.showerror("文件错误", error_msg) def calculate_metrics(self, y_true, y_pred): """计算评估指标""" from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score mse = mean_squared_error(y_true, y_pred) rmse = np.sqrt(mse) mae = mean_absolute_error(y_true, y_pred) non_zero_idx = np.where(y_true != 0)[0] if len(non_zero_idx) > 0: mape = np.mean(np.abs((y_true[non_zero_idx] - y_pred[non_zero_idx]) / y_true[non_zero_idx])) * 100 else: mape = float('nan') r2 = r2_score(y_true, y_pred) return { 'MSE': mse, 'RMSE': rmse, 'MAE': mae, 'MAPE': mape, 'R2': r2 } def train_model(self): """训练PINNs模型(带早停机制+训练指标监控,无指标绘图)""" if self.train_df is None: messagebox.showwarning("警告", "请先选择训练集文件") return try: self.status_var.set("正在预处理数据...") self.root.update() # 从训练集中切分训练子集和验证子集(时间顺序切分) split_ratio = 1 - self.split_ratio_var.get() split_idx = int(len(self.train_df) * split_ratio) train_subset = self.train_df.iloc[:split_idx] valid_subset = self.train_df.iloc[split_idx:] # 检查数据量是否足够 if len(train_subset) < 2 or len(valid_subset) < 2: messagebox.showerror("数据错误", "训练集数据量不足(至少需要2个时间步)") return # 数据预处理(训练子集拟合scaler,验证子集用相同scaler) train_subset_scaled = self.scaler.fit_transform(train_subset[['水位']]) valid_subset_scaled = self.scaler.transform(valid_subset[['水位']]) # 准备训练数据(原始值用于指标计算) t_train = train_subset['days'].values[1:].reshape(-1, 1).astype(np.float32) h_train = train_subset_scaled[:-1].astype(np.float32) dt_train = train_subset['dt'].values[1:].reshape(-1, 1).astype(np.float32) # 时间步长 h_next_train_scaled = train_subset_scaled[1:].astype(np.float32) # 归一化后的标签 h_next_train_true = train_subset['水位'].values[1:].reshape(-1, 1) # 原始真实值(反归一化前) # 准备验证数据(原始值用于指标计算) t_valid = valid_subset['days'].values[1:].reshape(-1, 1).astype(np.float32) h_valid = valid_subset_scaled[:-1].astype(np.float32) dt_valid = valid_subset['dt'].values[1:].reshape(-1, 1).astype(np.float32) # 时间步长 h_next_valid_scaled = valid_subset_scaled[1:].astype(np.float32) # 归一化后的标签 h_next_valid_true = valid_subset['水位'].values[1:].reshape(-1, 1) # 原始真实值 # 创建模型和优化器 self.model = PINNModel( num_layers=self.num_layers_var.get(), hidden_units=self.hidden_units_var.get() ) optimizer = Adam(learning_rate=0.001) # 构建训练/验证数据集(现在包含时间步长dt) train_dataset = tf.data.Dataset.from_tensor_slices(((t_train, h_train, dt_train), h_next_train_scaled)) train_dataset = train_dataset.shuffle(buffer_size=1024).batch(32) valid_dataset = tf.data.Dataset.from_tensor_slices(((t_valid, h_valid, dt_valid), h_next_valid_scaled)) valid_dataset = valid_dataset.batch(32) # 验证集无需shuffle # 损失记录(新增指标记录) train_data_loss_history = [] physics_loss_history = [] valid_data_loss_history = [] # 新增:训练集和验证集的指标历史(MSE, RMSE等) train_metrics_history = [] # 每个元素是字典(如{'MSE':..., 'RMSE':...}) valid_metrics_history = [] # 早停机制参数 patience = int(self.epochs_var.get() / 3) min_delta = 1e-4 best_valid_loss = float('inf') wait = 0 best_epoch = 0 best_weights = None start_time = time.time() # 自定义训练循环(新增指标计算) for epoch in range(self.epochs_var.get()): # 训练阶段 epoch_train_data_loss = [] epoch_physics_loss = [] # 收集训练预测值(归一化后) train_pred_scaled = [] for step, ((t_batch, h_batch, dt_batch), h_next_batch) in enumerate(train_dataset): with tf.GradientTape() as tape: # 预测下一时刻水位 h_pred = self.model([t_batch, h_batch, dt_batch]) data_loss = tf.reduce_mean(tf.square(h_next_batch - h_pred)) # 计算物理损失(传入时间步长dt) physics_loss = self.model.physics_loss(t_batch, h_batch, dt_batch) loss = data_loss + self.physics_weight_var.get() * physics_loss grads = tape.gradient(loss, self.model.trainable_variables) optimizer.apply_gradients(zip(grads, self.model.trainable_variables)) epoch_train_data_loss.append(data_loss.numpy()) epoch_physics_loss.append(physics_loss.numpy()) train_pred_scaled.append(h_pred.numpy()) # 保存训练预测值(归一化) # 合并训练预测值(归一化后) train_pred_scaled = np.concatenate(train_pred_scaled, axis=0) # 反归一化得到原始预测值 train_pred_true = self.scaler.inverse_transform(train_pred_scaled) # 计算训练集指标(使用原始真实值和预测值) train_metrics = self.calculate_metrics( y_true=h_next_train_true.flatten(), y_pred=train_pred_true.flatten() ) train_metrics_history.append(train_metrics) # 验证阶段 epoch_valid_data_loss = [] valid_pred_scaled = [] for ((t_v_batch, h_v_batch, dt_v_batch), h_v_next_batch) in valid_dataset: h_v_pred = self.model([t_v_batch, h_v_batch, dt_v_batch]) valid_data_loss = tf.reduce_mean(tf.square(h_v_next_batch - h_v_pred)) epoch_valid_data_loss.append(valid_data_loss.numpy()) valid_pred_scaled.append(h_v_pred.numpy()) # 保存验证预测值(归一化) # 合并验证预测值(归一化后) valid_pred_scaled = np.concatenate(valid_pred_scaled, axis=0) # 反归一化得到原始预测值 valid_pred_true = self.scaler.inverse_transform(valid_pred_scaled) # 计算验证集指标(使用原始真实值和预测值) valid_metrics = self.calculate_metrics( y_true=h_next_valid_true.flatten(), y_pred=valid_pred_true.flatten() ) valid_metrics_history.append(valid_metrics) # 计算平均损失 avg_train_data_loss = np.mean(epoch_train_data_loss) avg_physics_loss = np.mean(epoch_physics_loss) avg_valid_data_loss = np.mean(epoch_valid_data_loss) # 记录损失 train_data_loss_history.append(avg_train_data_loss) physics_loss_history.append(avg_physics_loss) valid_data_loss_history.append(avg_valid_data_loss) # 早停机制逻辑(与原代码一致) current_valid_loss = avg_valid_data_loss if current_valid_loss < best_valid_loss - min_delta: best_valid_loss = current_valid_loss best_epoch = epoch + 1 wait = 0 best_weights = self.model.get_weights() else: wait += 1 if wait >= patience: self.status_var.set(f"触发早停!最佳轮次: {best_epoch},最佳验证损失: {best_valid_loss:.4f}") if best_weights is not None: self.model.set_weights(best_weights) break # 更新状态(新增指标显示) if epoch % 10 == 0: # 提取当前训练/验证的关键指标(如RMSE) train_rmse = train_metrics['RMSE'] valid_rmse = valid_metrics['RMSE'] train_r2 = train_metrics['R2'] valid_r2 = valid_metrics['R2'] k_value = self.model.k.numpy() elapsed = time.time() - start_time self.status_var.set( f"训练中 | 轮次: {epoch + 1}/{self.epochs_var.get()} | " f"训练RMSE: {train_rmse:.4f} | 验证RMSE: {valid_rmse:.4f} | " f"训练R²: {train_r2:.4f} | 验证R²: {valid_r2:.4f} | " f"k: {k_value:.6f} | 时间: {elapsed:.1f}秒 | 早停等待: {wait}/{patience}" ) self.root.update() # 绘制损失曲线(仅保留原始损失曲线) self.loss_ax.clear() epochs_range = range(1, len(train_data_loss_history) + 1) self.loss_ax.plot(epochs_range, train_data_loss_history, 'b-', label='训练数据损失') self.loss_ax.plot(epochs_range, physics_loss_history, 'r--', label='物理损失') self.loss_ax.plot(epochs_range, valid_data_loss_history, 'g-.', label='验证数据损失') self.loss_ax.set_title('PINNs训练与验证损失') self.loss_ax.set_xlabel('轮次') self.loss_ax.set_ylabel('损失', rotation=0) self.loss_ax.legend() self.loss_ax.grid(True, alpha=0.3) self.loss_ax.set_yscale('log') self.loss_canvas.draw() # 训练完成提示(保留指标总结) elapsed = time.time() - start_time if wait >= patience: completion_msg = ( f"早停触发 | 最佳轮次: {best_epoch} | 最佳验证损失: {best_valid_loss:.4f} | " f"最佳验证RMSE: {valid_metrics_history[best_epoch - 1]['RMSE']:.4f} | " f"总时间: {elapsed:.1f}秒" ) else: completion_msg = ( f"训练完成 | 总轮次: {self.epochs_var.get()} | " f"最终训练RMSE: {train_metrics_history[-1]['RMSE']:.4f} | " f"最终验证RMSE: {valid_metrics_history[-1]['RMSE']:.4f} | " f"最终训练R²: {train_metrics_history[-1]['R2']:.4f} | " f"最终验证R²: {valid_metrics_history[-1]['R2']:.4f} | " f"总时间: {elapsed:.1f}秒" ) # 在训练循环结束后,保存训练历史 self.train_history = { 'train_data_loss': train_data_loss_history, 'physics_loss': physics_loss_history, 'valid_data_loss': valid_data_loss_history, 'train_metrics': train_metrics_history, 'valid_metrics': valid_metrics_history } self.status_var.set(completion_msg) messagebox.showinfo("训练完成", f"PINNs模型训练成功完成!\n{completion_msg}") except Exception as e: messagebox.showerror("训练错误", f"模型训练失败:\n{str(e)}") self.status_var.set("训练失败") def predict(self): """使用PINNs模型进行递归预测(自回归预测)""" if self.model is None: messagebox.showwarning("警告", "请先训练模型") return if self.test_df is None: messagebox.showwarning("警告", "请先选择测试集文件") return try: self.status_var.set("正在生成预测...") self.root.update() # 预处理测试数据 test_scaled = self.scaler.transform(self.test_df[['水位']]) # 准备时间特征 t_test = self.test_df['days'].values.reshape(-1, 1).astype(np.float32) dt_test = self.test_df['dt'].values.reshape(-1, 1).astype(np.float32) # 时间步长 # 递归预测(自回归) n = len(t_test) # 初始化预测序列(归一化),第一个点使用真实值 predicted_scaled = np.zeros((n, 1), dtype=np.float32) predicted_scaled[0] = test_scaled[0] # 第一个点使用真实值 # 从第二个时间点开始预测 for i in range(1, n): # 使用上一个时间点的特征 t_prev = t_test[i - 1:i] # 上一个时间点的时间 h_prev = predicted_scaled[i - 1:i] # 上一个时间点的水位(预测值) dt_i = dt_test[i:i + 1] # 当前时间步长 # 预测当前时间点的水位 h_pred = self.model([t_prev, h_prev, dt_i]) predicted_scaled[i] = h_pred.numpy()[0][0] # 反归一化 predictions = self.scaler.inverse_transform(predicted_scaled) actual_values = self.scaler.inverse_transform(test_scaled) # 创建时间索引 test_time = self.test_df.index # 清除现有图表 self.ax.clear() # 绘制结果 self.ax.plot(test_time, actual_values, 'b-', label='真实值') self.ax.plot(test_time, predictions, 'r--', label='预测值') self.ax.set_title('大坝渗流水位预测结果(PINNs)') self.ax.set_xlabel('时间') self.ax.set_ylabel('测压管水位', rotation=0) self.ax.legend() # 优化时间轴刻度 import matplotlib.dates as mdates self.ax.xaxis.set_major_locator(mdates.YearLocator()) self.ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y')) self.ax.xaxis.set_minor_locator(mdates.MonthLocator(interval=2)) self.ax.grid(which='minor', axis='x', linestyle=':', color='gray', alpha=0.3) self.ax.grid(which='major', axis='y', linestyle='-', color='lightgray', alpha=0.5) self.ax.tick_params(axis='x', which='major', rotation=0, labelsize=9) self.ax.tick_params(axis='x', which='minor', length=2) # 计算评估指标(排除第一个点) eval_actual = actual_values[1:].flatten() eval_pred = predictions[1:].flatten() self.evaluation_metrics = self.calculate_metrics(eval_actual, eval_pred) metrics_text = ( f"MSE: {self.evaluation_metrics['MSE']:.4f} | " f"RMSE: {self.evaluation_metrics['RMSE']:.4f} | " f"MAE: {self.evaluation_metrics['MAE']:.4f} | " f"MAPE: {self.evaluation_metrics['MAPE']:.2f}% | " f"R²: {self.evaluation_metrics['R2']:.4f}" ) self.metrics_var.set(metrics_text) # 在图表上添加指标 self.ax.text( 0.5, 1.05, metrics_text, transform=self.ax.transAxes, ha='center', fontsize=9, bbox=dict(facecolor='white', alpha=0.8) ) # 调整布局 plt.tight_layout(pad=2.0) self.canvas.draw() # 保存预测结果 self.predictions = predictions self.actual_values = actual_values self.test_time = test_time self.status_var.set("预测完成,结果已显示") except Exception as e: messagebox.showerror("预测错误", f"预测失败:\n{str(e)}") self.status_var.set("预测失败") # 记录详细错误信息 import traceback traceback.print_exc() def save_results(self): """保存预测结果和训练历史数据""" if not hasattr(self, 'predictions') or not hasattr(self, 'train_history'): messagebox.showwarning("警告", "请先生成预测结果并完成训练") return # 选择保存路径 save_path = filedialog.asksaveasfilename( defaultextension=".xlsx", filetypes=[("Excel文件", "*.xlsx"), ("所有文件", "*.*")], title="保存结果" ) if not save_path: return try: # 1. 创建预测结果DataFrame result_df = pd.DataFrame({ '时间': self.test_time, '实际水位': self.actual_values.flatten(), '预测水位': self.predictions.flatten() }) # 2. 创建评估指标DataFrame metrics_df = pd.DataFrame([self.evaluation_metrics]) # 3. 创建训练历史DataFrame history_data = { '轮次': list(range(1, len(self.train_history['train_data_loss']) + 1)), '训练数据损失': self.train_history['train_data_loss'], '物理损失': self.train_history['physics_loss'], '验证数据损失': self.train_history['valid_data_loss'] } # 添加训练集指标 for metric in ['MSE', 'RMSE', 'MAE', 'MAPE', 'R2']: history_data[f'训练集_{metric}'] = [item[metric] for item in self.train_history['train_metrics']] # 添加验证集指标 for metric in ['MSE', 'RMSE', 'MAE', 'MAPE', 'R2']: history_data[f'验证集_{metric}'] = [item[metric] for item in self.train_history['valid_metrics']] history_df = pd.DataFrame(history_data) # 保存到Excel with pd.ExcelWriter(save_path) as writer: result_df.to_excel(writer, sheet_name='预测结果', index=False) metrics_df.to_excel(writer, sheet_name='评估指标', index=False) history_df.to_excel(writer, sheet_name='训练历史', index=False) # 保存图表 chart_path = os.path.splitext(save_path)[0] + "_chart.png" self.fig.savefig(chart_path, dpi=300) # 保存损失曲线图 loss_path = os.path.splitext(save_path)[0] + "_loss.png" self.loss_fig.savefig(loss_path, dpi=300) self.status_var.set(f"结果已保存至: {os.path.basename(save_path)}") messagebox.showinfo("保存成功", f"预测结果和图表已保存至:\n" f"主文件: {save_path}\n" f"预测图表: {chart_path}\n" f"损失曲线: {loss_path}") except Exception as e: messagebox.showerror("保存错误", f"保存结果失败:\n{str(e)}") def reset(self): """重置程序状态""" self.train_df = None self.test_df = None self.model = None self.train_file_var.set("") self.test_file_var.set("") # 清除训练历史 if hasattr(self, 'train_history'): del self.train_history # 清除图表 if hasattr(self, 'ax'): self.ax.clear() if hasattr(self, 'loss_ax'): self.loss_ax.clear() # 重绘画布 if hasattr(self, 'canvas'): self.canvas.draw() if hasattr(self, 'loss_canvas'): self.loss_canvas.draw() # 清除状态 self.status_var.set("已重置,请选择新数据") # 清除预测结果 if hasattr(self, 'predictions'): del self.predictions # 清除指标文本 if hasattr(self, 'metrics_var'): self.metrics_var.set("") messagebox.showinfo("重置", "程序已重置,可以开始新的分析") if __name__ == "__main__": root = tk.Tk() app = DamSeepageModel(root) root.mainloop()

import sys import cv2 import time import torch import traceback import threading import queue import dxcam import ctypes import os import glob import numpy as np import logitech.lg from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QPushButton, QComboBox, QSlider, QSpinBox, QDoubleSpinBox, QLineEdit, QTabWidget, QGroupBox, QTextEdit, QFileDialog, QMessageBox, QSizePolicy, QSplitter, QDialog, QScrollArea) from PyQt6.QtCore import Qt, QTimer, QThread, pyqtSignal from PyQt6.QtGui import QImage, QPixmap, QPainter, QColor, QFont, QIcon, QKeyEvent, QMouseEvent from PyQt6.QtSvg import QSvgRenderer from PIL import Image from ultralytics import YOLO from pynput import mouse class PIDController: """PID控制器""" def __init__(self, kp, ki, kd, output_min=-100, output_max=100): self.kp = kp # 比例增益 self.ki = ki # 积分增益 self.kd = kd # 微分增益 self.output_min = output_min self.output_max = output_max # 状态变量 self.integral = 0.0 self.prev_error = 0.0 self.last_time = time.perf_counter() def compute(self, setpoint, current_value): """计算PID控制输出""" current_time = time.perf_counter() dt = current_time - self.last_time # 防止过小的时间差导致计算问题 MIN_DT = 0.0001 if dt < MIN_DT: dt = MIN_DT # 计算误差 error = setpoint - current_value # 比例项 P = self.kp * error # 积分项(防饱和) self.integral += error * dt I = self.ki * self.integral # 微分项 derivative = (error - self.prev_error) / dt D = self.kd * derivative # 合成输出 output = P + I + D # 输出限幅 if output > self.output_max: output = self.output_max elif output < self.output_min: output = self.output_min # 更新状态 self.prev_error = error self.last_time = current_time return output def reset(self): """重置控制器状态""" self.integral = 0.0 self.prev_error = 0.0 self.last_time = time.perf_counter() class ScreenDetector: def __init__(self, config_path): # 解析配置文件 self._parse_config(config_path) # 设备检测与模型加载 self.device = self._determine_device() self.model = YOLO(self.model_path).to(self.device) # 屏幕信息初始化 self._init_screen_info() # 控制参数初始化 self._init_control_params() # 状态管理 self.stop_event = threading.Event() self.camera_lock = threading.Lock() self.target_lock = threading.Lock() self.offset_lock = threading.Lock() self.button_lock = threading.Lock() # 推理状态控制 self.inference_active = False self.inference_lock = threading.Lock() # 初始化相机 self._init_camera() # 初始化鼠标监听器 self._init_mouse_listener() # 初始化PID控制器 self._init_pid_controllers() def _parse_config(self, config_path): """解析并存储配置参数""" self.cfg = self._parse_txt_config(config_path) # 存储常用参数 self.model_path = self.cfg['model_path'] self.model_device = self.cfg['model_device'] self.screen_target_size = int(self.cfg['screen_target_size']) self.detection_conf_thres = float(self.cfg['detection_conf_thres']) self.detection_iou_thres = float(self.cfg['detection_iou_thres']) self.detection_classes = [int(x) for x in self.cfg['detection_classes'].split(',')] self.visualization_color = tuple(map(int, self.cfg['visualization_color'].split(','))) self.visualization_line_width = int(self.cfg['visualization_line_width']) self.visualization_font_scale = float(self.cfg['visualization_font_scale']) self.visualization_show_conf = bool(self.cfg['visualization_show_conf']) self.fov_horizontal = float(self.cfg.get('move_fov_horizontal', '90')) self.mouse_dpi = int(self.cfg.get('move_mouse_dpi', '400')) self.target_offset_x_percent = float(self.cfg.get('target_offset_x', '50')) self.target_offset_y_percent = 100 - float(self.cfg.get('target_offset_y', '50')) # PID参数 self.pid_kp = float(self.cfg.get('pid_kp', '1.0')) self.pid_ki = float(self.cfg.get('pid_ki', '0.05')) self.pid_kd = float(self.cfg.get('pid_kd', '0.2')) # 贝塞尔曲线参数 self.bezier_steps = int(self.cfg.get('bezier_steps', '100')) self.bezier_duration = float(self.cfg.get('bezier_duration', '0.1')) self.bezier_curve = float(self.cfg.get('bezier_curve', '0.3')) def update_config(self, config_path): """动态更新配置""" try: # 重新解析配置文件 self._parse_config(config_path) # 更新可以直接修改的参数 self.detection_conf_thres = float(self.cfg['detection_conf_thres']) self.detection_iou_thres = float(self.cfg['detection_iou_thres']) self.target_offset_x_percent = float(self.cfg.get('target_offset_x', '50')) self.target_offset_y_percent = 100 - float(self.cfg.get('target_offset_y', '50')) # PID参数更新 self.pid_kp = float(self.cfg.get('pid_kp', '1.0')) self.pid_ki = float(self.cfg.get('pid_ki', '0.05')) self.pid_kd = float(self.cfg.get('pid_kd', '0.2')) # 更新PID控制器 self.pid_x = PIDController(self.pid_kp, self.pid_ki, self.pid_kd) self.pid_y = PIDController(self.pid_kp, self.pid_ki, self.pid_kd) # FOV和DPI更新 self.fov_horizontal = float(self.cfg.get('move_fov_horizontal', '90')) self.mouse_dpi = int(self.cfg.get('move_mouse_dpi', '400')) # 更新贝塞尔曲线参数 self.bezier_steps = int(self.cfg.get('bezier_steps', '100')) self.bezier_duration = float(self.cfg.get('bezier_duration', '0.1')) self.bezier_curve = float(self.cfg.get('bezier_curve', '0.3')) print("配置已动态更新") return True except Exception as e: print(f"更新配置失败: {str(e)}") traceback.print_exc() return False def _parse_txt_config(self, path): """解析TXT格式的配置文件""" config = {} with open(path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if not line or line.startswith('#'): continue if '=' in line: key, value = line.split('=', 1) config[key.strip()] = value.strip() return config def _init_pid_controllers(self): """初始化PID控制器""" # 创建XY方向的PID控制器 self.pid_x = PIDController(self.pid_kp, self.pid_ki, self.pid_kd) self.pid_y = PIDController(self.pid_kp, self.pid_ki, self.pid_kd) def start_inference(self): """启动推理""" with self.inference_lock: self.inference_active = True def stop_inference(self): """停止推理""" with self.inference_lock: self.inference_active = False def _determine_device(self): """确定运行设备""" if self.model_device == 'auto': return 'cuda' if torch.cuda.is_available() and torch.cuda.device_count() > 0 else 'cpu' return self.model_device def _init_screen_info(self): """初始化屏幕信息""" user32 = ctypes.windll.user32 self.screen_width, self.screen_height = user32.GetSystemMetrics(0), user32.GetSystemMetrics(1) self.screen_center = (self.screen_width // 2, self.screen_height // 2) # 计算截图区域 left = (self.screen_width - self.screen_target_size) // 2 top = (self.screen_height - self.screen_target_size) // 2 self.region = ( max(0, int(left)), max(0, int(top)), min(self.screen_width, int(left + self.screen_target_size)), min(self.screen_height, int(top + self.screen_target_size)) ) def _init_control_params(self): """初始化控制参数""" self.previous_target_info = None self.closest_target_absolute = None self.target_offset = None self.right_button_pressed = False # 改为鼠标右键状态 def _init_camera(self): """初始化相机""" try: with self.camera_lock: self.camera = dxcam.create( output_idx=0, output_color="BGR", region=self.region ) self.camera.start(target_fps=120, video_mode=True) except Exception as e: print(f"相机初始化失败: {str(e)}") try: # 降级模式 with self.camera_lock: self.camera = dxcam.create() self.camera.start(target_fps=60, video_mode=True) except Exception as fallback_e: print(f"降级模式初始化失败: {str(fallback_e)}") self.camera = None def _init_mouse_listener(self): """初始化鼠标监听器""" self.mouse_listener = mouse.Listener( on_click=self.on_mouse_click # 监听鼠标点击事件 ) self.mouse_listener.daemon = True self.mouse_listener.start() def on_mouse_click(self, x, y, button, pressed): """处理鼠标点击事件""" try: if button == mouse.Button.right: # 监听鼠标右键 with self.button_lock: self.right_button_pressed = pressed # 更新状态 # 当右键释放时重置PID控制器 if not pressed: self.pid_x.reset() self.pid_y.reset() except Exception as e: print(f"鼠标事件处理错误: {str(e)}") def calculate_fov_movement(self, dx, dy): """基于FOV算法计算鼠标移动量""" # 计算屏幕对角线长度 screen_diagonal = (self.screen_width ** 2 + self.screen_height ** 2) ** 0.5 # 计算垂直FOV aspect_ratio = self.screen_width / self.screen_height fov_vertical = self.fov_horizontal / aspect_ratio # 计算每像素对应角度 angle_per_pixel_x = self.fov_horizontal / self.screen_width angle_per_pixel_y = fov_vertical / self.screen_height # 计算角度偏移 angle_offset_x = dx * angle_per_pixel_x angle_offset_y = dy * angle_per_pixel_y # 转换为鼠标移动量 move_x = (angle_offset_x / 360) * self.mouse_dpi move_y = (angle_offset_y / 360) * self.mouse_dpi return move_x, move_y def move_mouse_to_target(self): """移动鼠标对准目标点""" if not self.target_offset: return try: # 获取目标点与屏幕中心的偏移量 with self.offset_lock: dx, dy = self.target_offset # 使用FOV算法将像素偏移转换为鼠标移动量 move_x, move_y = self.calculate_fov_movement(dx, dy) # 使用PID计算平滑的移动量 pid_move_x = self.pid_x.compute(0, -move_x) # 将dx取反 pid_move_y = self.pid_y.compute(0, -move_y) # 将dy取反 # 移动鼠标 if pid_move_x != 0 or pid_move_y != 0: logitech.lg.start_mouse_move(int(pid_move_x), int(pid_move_y), self.bezier_steps, self.bezier_duration, self.bezier_curve) except Exception as e: print(f"移动鼠标时出错: {str(e)}") def run(self, frame_queue): """主检测循环""" while not self.stop_event.is_set(): try: # 检查推理状态 with self.inference_lock: if not self.inference_active: time.sleep(0.01) continue # 截图 grab_start = time.perf_counter() screenshot = self._grab_screenshot() grab_time = (time.perf_counter() - grab_start) * 1000 # ms if screenshot is None: time.sleep(0.001) continue # 推理 inference_start = time.perf_counter() results = self._inference(screenshot) inference_time = (time.perf_counter() - inference_start) * 1000 # ms # 处理检测结果 target_info, closest_target_relative, closest_offset = self._process_detection_results(results) # 更新目标信息 self._update_target_info(target_info, closest_offset) # 移动鼠标 self._move_mouse_if_needed() # 可视化处理 annotated_frame = self._visualize_results(results, closest_target_relative) if frame_queue else None # 放入队列 if frame_queue: try: frame_queue.put( (annotated_frame, len(target_info), inference_time, grab_time, target_info), timeout=0.01 ) except queue.Full: pass except Exception as e: print(f"检测循环异常: {str(e)}") traceback.print_exc() self._reset_camera() time.sleep(0.5) def _grab_screenshot(self): """安全获取截图""" with self.camera_lock: if self.camera: return self.camera.grab() return None def _inference(self, screenshot): """执行模型推理""" return self.model.predict( screenshot, conf=self.detection_conf_thres, iou=self.detection_iou_thres, classes=self.detection_classes, device=self.device, verbose=False ) def _process_detection_results(self, results): """处理检测结果""" target_info = [] min_distance = float('inf') closest_target_relative = None closest_target_absolute = None closest_offset = None for box in results[0].boxes: # 获取边界框坐标 x1, y1, x2, y2 = map(int, box.xyxy[0]) # 计算绝对坐标 x1_abs = x1 + self.region[0] y1_abs = y1 + self.region[1] x2_abs = x2 + self.region[0] y2_abs = y2 + self.region[1] # 计算边界框尺寸 width = x2_abs - x1_abs height = y2_abs - y1_abs # 应用偏移百分比计算目标点 target_x = x1_abs + int(width * (self.target_offset_x_percent / 100)) target_y = y1_abs + int(height * (self.target_offset_y_percent / 100)) # 计算偏移量 dx = target_x - self.screen_center[0] dy = target_y - self.screen_center[1] distance = (dx ** 2 + dy ** 2) ** 0.5 # 更新最近目标 if distance < min_distance: min_distance = distance # 计算相对坐标(用于可视化) closest_target_relative = ( x1 + int(width * (self.target_offset_x_percent / 100)), y1 + int(height * (self.target_offset_y_percent / 100)) ) closest_target_absolute = (target_x, target_y) closest_offset = (dx, dy) # 保存目标信息 class_id = int(box.cls) class_name = self.model.names[class_id] target_info.append(f"{class_name}:{x1_abs},{y1_abs},{x2_abs},{y2_abs}") return target_info, closest_target_relative, closest_offset def _update_target_info(self, target_info, closest_offset): """更新目标信息""" # 检查目标信息是否有变化 if target_info != self.previous_target_info: self.previous_target_info = target_info.copy() print(f"{len(target_info)}|{'|'.join(target_info)}") # 更新目标偏移量 with self.offset_lock: self.target_offset = closest_offset def _visualize_results(self, results, closest_target): """可视化处理结果""" frame = results[0].plot( line_width=self.visualization_line_width, font_size=self.visualization_font_scale, conf=self.visualization_show_conf ) # 绘制最近目标 if closest_target: # 绘制目标中心点 cv2.circle( frame, (int(closest_target[0]), int(closest_target[1])), 3, (0, 0, 255), -1 ) # 计算屏幕中心在截图区域内的相对坐标 screen_center_x = self.screen_center[0] - self.region[0] screen_center_y = self.screen_center[1] - self.region[1] # 绘制中心到目标的连线 cv2.line( frame, (int(screen_center_x), int(screen_center_y)), (int(closest_target[0]), int(closest_target[1])), (0, 255, 0), 1 ) return frame def _move_mouse_if_needed(self): """如果需要则移动鼠标""" with self.button_lock: if self.right_button_pressed and self.target_offset: # 使用right_button_pressed self.move_mouse_to_target() def _reset_camera(self): """重置相机""" print("正在重置相机...") try: self._init_camera() except Exception as e: print(f"相机重置失败: {str(e)}") traceback.print_exc() def stop(self): """安全停止检测器""" self.stop_event.set() self._safe_stop() if hasattr(self, 'mouse_listener') and self.mouse_listener.running: # 改为停止鼠标监听器 self.mouse_listener.stop() def _safe_stop(self): """同步释放资源""" print("正在安全停止相机...") try: with self.camera_lock: if self.camera: self.camera.stop() print("相机已停止") except Exception as e: print(f"停止相机时发生错误: {str(e)}") print("屏幕检测器已停止") class DetectionThread(QThread): update_signal = pyqtSignal(object) def __init__(self, detector, frame_queue): super().__init__() self.detector = detector self.frame_queue = frame_queue self.running = True def run(self): self.detector.run(self.frame_queue) def stop(self): self.running = False self.detector.stop() class MainWindow(QMainWindow): def __init__(self, detector): super().__init__() self.detector = detector self.setWindowTitle("EFAI 1.1") self.setGeometry(100, 100, 600, 400) # 添加缺失的属性初始化 self.visualization_enabled = True self.inference_active = False # 初始推理状态为停止 #窗口置顶 self.setWindowFlag(Qt.WindowType.WindowStaysOnTopHint) # 创建帧队列 self.frame_queue = queue.Queue(maxsize=3) # 初始化UI self.init_ui() # 启动检测线程 self.detection_thread = DetectionThread(self.detector, self.frame_queue) self.detection_thread.start() # 启动UI更新定时器 self.update_timer = QTimer() self.update_timer.timeout.connect(self.update_ui) self.update_timer.start(1) # 每1ms更新一次 def toggle_visualization(self): # 实际更新可视化状态属性 self.visualization_enabled = not self.visualization_enabled # 更新按钮文本 if self.visualization_enabled: self.toggle_visualization_btn.setText("禁用可视化") else: self.toggle_visualization_btn.setText("启用可视化") def toggle_inference(self): """切换推理状态""" self.inference_active = not self.inference_active if self.inference_active: self.toggle_inference_btn.setText("停止推理") self.toggle_inference_btn.setStyleSheet(""" QPushButton { background-color: #F44336; color: white; border: none; padding: 8px; border-radius: 4px; font-family: Segoe UI; font-size: 10pt; } """) self.detector.start_inference() else: self.toggle_inference_btn.setText("开始推理") self.toggle_inference_btn.setStyleSheet(""" QPushButton { background-color: #4CAF50; color: white; border: none; padding: 8px; border-radius: 4px; font-family: Segoe UI; font-size: 10pt; } """) self.detector.stop_inference() def init_ui(self): # 主布局 central_widget = QWidget() self.setCentralWidget(central_widget) main_layout = QVBoxLayout(central_widget) # 分割器(左侧图像/目标信息,右侧控制面板) splitter = QSplitter(Qt.Orientation.Horizontal) main_layout.addWidget(splitter) # 左侧区域(图像显示和目标信息) left_widget = QWidget() left_layout = QVBoxLayout(left_widget) # 图像显示区域 self.image_label = QLabel() self.image_label.setAlignment(Qt.AlignmentFlag.AlignCenter) self.image_label.setMinimumSize(320, 320) left_layout.addWidget(self.image_label) # 目标信息区域 self.target_info_text = QTextEdit() self.target_info_text.setReadOnly(True) self.target_info_text.setFixedHeight(150) self.target_info_text.setStyleSheet(""" QTextEdit { background-color: #2D2D30; color: #DCDCDC; font-family: Consolas; font-size: 10pt; border: 1px solid #3F3F46; border-radius: 4px; } """) left_layout.addWidget(self.target_info_text) # 右侧控制面板 right_widget = QWidget() right_layout = QVBoxLayout(right_widget) right_layout.setAlignment(Qt.AlignmentFlag.AlignTop) # 性能信息 perf_group = QGroupBox("性能信息") perf_layout = QVBoxLayout(perf_group) self.target_count_label = QLabel("目标数量: 0") self.inference_time_label = QLabel("推理时间: 0.000s") self.grab_time_label = QLabel("截图时间: 0.000s") for label in [self.target_count_label, self.inference_time_label, self.grab_time_label]: label.setStyleSheet("font-family: Consolas; font-size: 10pt;") perf_layout.addWidget(label) right_layout.addWidget(perf_group) # 系统信息 sys_group = QGroupBox("系统信息") sys_layout = QVBoxLayout(sys_group) # 获取模型名称(只显示文件名) model_name = os.path.basename(self.detector.model_path) # 获取显示器编号(如果配置中有则显示,否则显示默认值0) monitor_index = self.detector.cfg.get('screen_monitor', '0') self.model_label = QLabel(f"模型: {model_name}") self.device_label = QLabel(f"设备: {self.detector.device.upper()}") self.monitor_label = QLabel(f"显示器:{monitor_index}") self.screen_res_label = QLabel(f"屏幕分辨率: {self.detector.screen_width}x{self.detector.screen_height}") self.region_label = QLabel(f"检测区域: {self.detector.region}") for label in [self.model_label, self.device_label, self.monitor_label, self.screen_res_label, self.region_label]: label.setStyleSheet("font-family: Consolas; font-size: 9pt; color: #A0A0A0;") sys_layout.addWidget(label) right_layout.addWidget(sys_group) # 鼠标状态 mouse_group = QGroupBox("自瞄状态") mouse_layout = QVBoxLayout(mouse_group) self.mouse_status = QLabel("未瞄准") self.mouse_status.setStyleSheet(""" QLabel { font-family: Consolas; font-size: 10pt; color: #FF5252; } """) mouse_layout.addWidget(self.mouse_status) right_layout.addWidget(mouse_group) # 控制按钮 btn_group = QGroupBox("控制") btn_layout = QVBoxLayout(btn_group) # 添加推理切换按钮 self.toggle_inference_btn = QPushButton("开始推理") self.toggle_inference_btn.clicked.connect(self.toggle_inference) self.toggle_inference_btn.setStyleSheet(""" QPushButton { background-color: #4CAF50; color: white; border: none; padding: 8px; border-radius: 4px; font-family: Segoe UI; font-size: 10pt; } QPushButton:hover { background-color: #45A049; } QPushButton:pressed { background-color: #3D8B40; } """) btn_layout.addWidget(self.toggle_inference_btn) self.toggle_visualization_btn = QPushButton("禁用可视化") self.toggle_visualization_btn.clicked.connect(self.toggle_visualization) self.settings_btn = QPushButton("设置") self.settings_btn.clicked.connect(self.open_settings) for btn in [self.toggle_visualization_btn, self.settings_btn]: btn.setStyleSheet(""" QPushButton { background-color: #0078D7; color: white; border: none; padding: 8px; border-radius: 4px; font-family: Segoe UI; font-size: 10pt; } QPushButton:hover { background-color: #106EBE; } QPushButton:pressed { background-color: #005A9E; } """) btn_layout.addWidget(btn) right_layout.addWidget(btn_group) # 添加左右区域到分割器 splitter.addWidget(left_widget) splitter.addWidget(right_widget) splitter.setSizes([600, 200]) # 设置样式 self.setStyleSheet(""" QMainWindow { background-color: #252526; } QGroupBox { font-family: Segoe UI; font-size: 10pt; color: #CCCCCC; border: 1px solid #3F3F46; border-radius: 4px; margin-top: 1ex; } QGroupBox::title { subcontrol-origin: margin; left: 10px; padding: 0 5px; background-color: transparent; } """) def open_settings(self): settings_dialog = SettingsDialog(self.detector.cfg, self) settings_dialog.exec() def update_ui(self): try: # 获取最新数据 latest_data = None while not self.frame_queue.empty(): latest_data = self.frame_queue.get_nowait() if latest_data: # 解包数据 frame, targets_count, inference_time, grab_time, target_info = latest_data # 更新性能信息 self.target_count_label.setText(f"目标数量: {targets_count}") self.inference_time_label.setText(f"推理时间: {inference_time / 1000:.3f}s") self.grab_time_label.setText(f"截图时间: {grab_time / 1000:.3f}s") # 更新目标信息 self.display_target_info(target_info) # 更新图像显示 if self.visualization_enabled and frame is not None: # 转换图像为Qt格式 height, width, channel = frame.shape bytes_per_line = 3 * width q_img = QImage(frame.data, width, height, bytes_per_line, QImage.Format.Format_BGR888) pixmap = QPixmap.fromImage(q_img) # 等比例缩放 scaled_pixmap = pixmap.scaled( self.image_label.width(), self.image_label.height(), Qt.AspectRatioMode.KeepAspectRatio, Qt.TransformationMode.SmoothTransformation ) self.image_label.setPixmap(scaled_pixmap) else: # 显示黑色背景 pixmap = QPixmap(self.image_label.size()) pixmap.fill(QColor(0, 0, 0)) self.image_label.setPixmap(pixmap) # 更新鼠标状态 self.update_mouse_status() except Exception as e: print(f"更新UI时出错: {str(e)}") def display_target_info(self, target_info): """在文本框中显示目标信息""" if not target_info: self.target_info_text.setPlainText("无检测目标") return info_text = "目标类别与坐标:\n" for i, data in enumerate(target_info): try: parts = data.split(":", 1) if len(parts) == 2: class_name, coords_str = parts coords = list(map(int, coords_str.split(','))) if len(coords) == 4: display_text = f"{class_name}: [{coords[0]}, {coords[1]}, {coords[2]}, {coords[3]}]" else: display_text = f"坐标格式错误: {data}" else: display_text = f"数据格式错误: {data}" except: display_text = f"解析错误: {data}" info_text += f"{display_text}\n" self.target_info_text.setPlainText(info_text) def update_mouse_status(self): """更新鼠标右键状态显示""" with self.detector.button_lock: if self.detector.right_button_pressed: self.mouse_status.setText("瞄准中") self.mouse_status.setStyleSheet("color: #4CAF50; font-family: Consolas; font-size: 10pt;") else: self.mouse_status.setText("未瞄准") self.mouse_status.setStyleSheet("color: #FF5252; font-family: Consolas; font-size: 10pt;") def closeEvent(self, event): """安全关闭程序""" self.detection_thread.stop() self.detection_thread.wait() event.accept() class SettingsDialog(QDialog): def __init__(self, config, parent=None): super().__init__(parent) self.config = config # 保存原始配置的副本用于比较 self.original_config = config.copy() self.setWindowTitle("设置") self.setGeometry(100, 100, 600, 500) self.init_ui() def init_ui(self): layout = QVBoxLayout() self.setLayout(layout) # 标签页 tabs = QTabWidget() layout.addWidget(tabs) # 检测设置标签页 detection_tab = QWidget() detection_layout = QVBoxLayout(detection_tab) self.create_detection_settings(detection_layout) tabs.addTab(detection_tab, "检测") # 移动设置标签页 move_tab = QWidget() move_layout = QVBoxLayout(move_tab) self.create_move_settings(move_layout) tabs.addTab(move_tab, "FOV") # 目标点设置标签页 target_tab = QWidget() target_layout = QVBoxLayout(target_tab) self.create_target_settings(target_layout) tabs.addTab(target_tab, "目标点") # PID设置标签页 pid_tab = QWidget() pid_layout = QVBoxLayout(pid_tab) self.create_pid_settings(pid_layout) tabs.addTab(pid_tab, "PID") # 贝塞尔曲线设置标签页 bezier_tab = QWidget() bezier_layout = QVBoxLayout(bezier_tab) self.create_bezier_settings(bezier_layout) tabs.addTab(bezier_tab, "贝塞尔曲线") # 按钮区域 btn_layout = QHBoxLayout() layout.addLayout(btn_layout) save_btn = QPushButton("保存配置") save_btn.clicked.connect(self.save_config) cancel_btn = QPushButton("取消") cancel_btn.clicked.connect(self.reject) for btn in [save_btn, cancel_btn]: btn.setStyleSheet(""" QPushButton { background-color: #0078D7; color: white; border: none; padding: 8px 16px; border-radius: 4px; font-family: Segoe UI; font-size: 10pt; } QPushButton:hover { background-color: #106EBE; } QPushButton:pressed { background-color: #005A9E; } """) btn_layout.addWidget(btn) btn_layout.addStretch() def create_detection_settings(self, layout): # 模型选择 model_group = QGroupBox("模型设置") model_layout = QVBoxLayout(model_group) # 获取基础路径 if getattr(sys, 'frozen', False): base_path = sys._MEIPASS else: base_path = os.path.dirname(os.path.abspath(__file__)) # 获取模型文件列表 models_dir = os.path.join(base_path, 'models') model_files = [] if os.path.exists(models_dir): model_files = glob.glob(os.path.join(models_dir, '*.pt')) # 处理模型显示名称 model_display_names = [os.path.basename(f) for f in model_files] if model_files else ["未找到模型文件"] self.model_name_to_path = {os.path.basename(f): f for f in model_files} # 当前配置的模型处理 current_model_path = self.config['model_path'] current_model_name = os.path.basename(current_model_path) # 确保当前模型在列表中 if current_model_name not in model_display_names: model_display_names.append(current_model_name) self.model_name_to_path[current_model_name] = current_model_path # 模型选择下拉框 model_layout.addWidget(QLabel("选择模型:")) self.model_combo = QComboBox() self.model_combo.addItems(model_display_names) self.model_combo.setCurrentText(current_model_name) model_layout.addWidget(self.model_combo) # 设备选择 model_layout.addWidget(QLabel("运行设备:")) self.device_combo = QComboBox() self.device_combo.addItems(['auto', 'cuda', 'cpu']) self.device_combo.setCurrentText(self.config['model_device']) model_layout.addWidget(self.device_combo) layout.addWidget(model_group) # 检测参数 param_group = QGroupBox("检测参数") param_layout = QVBoxLayout(param_group) # 置信度阈值 param_layout.addWidget(QLabel("置信度阈值:")) conf_layout = QHBoxLayout() self.conf_slider = QSlider(Qt.Orientation.Horizontal) self.conf_slider.setRange(10, 100) # 0.1到1.0,步长0.01 self.conf_slider.setValue(int(float(self.config['detection_conf_thres']) * 100)) conf_layout.addWidget(self.conf_slider) self.conf_value = QLabel(f"{float(self.config['detection_conf_thres']):.2f}") self.conf_value.setFixedWidth(50) conf_layout.addWidget(self.conf_value) param_layout.addLayout(conf_layout) # 连接滑块值变化事件 self.conf_slider.valueChanged.connect(lambda value: self.conf_value.setText(f"{value / 100:.2f}")) # IOU阈值 - 改为滑动条 param_layout.addWidget(QLabel("IOU阈值:")) iou_layout = QHBoxLayout() self.iou_slider = QSlider(Qt.Orientation.Horizontal) self.iou_slider.setRange(10, 100) # 0.1到1.0,步长0.01 self.iou_slider.setValue(int(float(self.config['detection_iou_thres']) * 100)) iou_layout.addWidget(self.iou_slider) self.iou_value = QLabel(f"{float(self.config['detection_iou_thres']):.2f}") self.iou_value.setFixedWidth(50) iou_layout.addWidget(self.iou_value) param_layout.addLayout(iou_layout) # 连接滑块值变化事件 self.iou_slider.valueChanged.connect(lambda value: self.iou_value.setText(f"{value / 100:.2f}")) # 检测类别 param_layout.addWidget(QLabel("检测类别 (逗号分隔):")) self.classes_edit = QLineEdit() self.classes_edit.setText(self.config['detection_classes']) param_layout.addWidget(self.classes_edit) layout.addWidget(param_group) # 屏幕设置 screen_group = QGroupBox("屏幕设置") screen_layout = QVBoxLayout(screen_group) # 显示器编号 screen_layout.addWidget(QLabel("显示器编号:")) self.monitor_spin = QSpinBox() self.monitor_spin.setRange(0, 3) # 假设最多支持4个显示器 self.monitor_spin.setValue(int(self.config.get('screen_monitor', '0'))) screen_layout.addWidget(self.monitor_spin) # 屏幕区域大小 screen_layout.addWidget(QLabel("截屏尺寸:")) self.screen_size_spin = QSpinBox() self.screen_size_spin.setRange(100, 2000) self.screen_size_spin.setValue(int(self.config['screen_target_size'])) screen_layout.addWidget(self.screen_size_spin) layout.addWidget(screen_group) layout.addStretch() def create_move_settings(self, layout): group = QGroupBox("鼠标移动参数") group_layout = QVBoxLayout(group) # FOV设置 group_layout.addWidget(QLabel("横向FOV(度):")) self.fov_spin = QDoubleSpinBox() self.fov_spin.setRange(1, 179) self.fov_spin.setValue(float(self.config.get('move_fov_horizontal', '90'))) group_layout.addWidget(self.fov_spin) # 鼠标DPI group_layout.addWidget(QLabel("鼠标DPI:")) self.dpi_spin = QSpinBox() self.dpi_spin.setRange(100, 20000) self.dpi_spin.setValue(int(self.config.get('move_mouse_dpi', '400'))) group_layout.addWidget(self.dpi_spin) layout.addWidget(group) layout.addStretch() def create_target_settings(self, layout): group = QGroupBox("目标点偏移") group_layout = QVBoxLayout(group) # X轴偏移 - 添加百分比显示 group_layout.addWidget(QLabel("X轴偏移:")) x_layout = QHBoxLayout() self.x_offset_slider = QSlider(Qt.Orientation.Horizontal) self.x_offset_slider.setRange(0, 100) self.x_offset_slider.setValue(int(float(self.config.get('target_offset_x', '50')))) x_layout.addWidget(self.x_offset_slider) self.x_offset_value = QLabel(f"{int(float(self.config.get('target_offset_x', '50')))}%") self.x_offset_value.setFixedWidth(50) x_layout.addWidget(self.x_offset_value) group_layout.addLayout(x_layout) # 连接滑块值变化事件 self.x_offset_slider.valueChanged.connect(lambda value: self.x_offset_value.setText(f"{value}%")) # Y轴偏移 - 添加百分比显示 group_layout.addWidget(QLabel("Y轴偏移:")) y_layout = QHBoxLayout() self.y_offset_slider = QSlider(Qt.Orientation.Horizontal) self.y_offset_slider.setRange(0, 100) self.y_offset_slider.setValue(int(float(self.config.get('target_offset_y', '50')))) y_layout.addWidget(self.y_offset_slider) self.y_offset_value = QLabel(f"{int(float(self.config.get('target_offset_y', '50')))}%") self.y_offset_value.setFixedWidth(50) y_layout.addWidget(self.y_offset_value) group_layout.addLayout(y_layout) # 连接滑块值变化事件 self.y_offset_slider.valueChanged.connect(lambda value: self.y_offset_value.setText(f"{value}%")) # 说明 info_label = QLabel("(0% = 左上角, 50% = 中心, 100% = 右下角)") info_label.setStyleSheet("font-size: 9pt; color: #888888;") group_layout.addWidget(info_label) layout.addWidget(group) layout.addStretch() def create_pid_settings(self, layout): group = QGroupBox("PID参数") group_layout = QVBoxLayout(group) # Kp参数 group_layout.addWidget(QLabel("比例增益(Kp):")) kp_layout = QHBoxLayout() self.kp_slider = QSlider(Qt.Orientation.Horizontal) self.kp_slider.setRange(1, 1000) # 0.01到10.0,步长0.01 self.kp_slider.setValue(int(float(self.config.get('pid_kp', '1.0')) * 100)) kp_layout.addWidget(self.kp_slider) self.kp_value = QLabel(f"{float(self.config.get('pid_kp', '1.0')):.2f}") self.kp_value.setFixedWidth(50) kp_layout.addWidget(self.kp_value) group_layout.addLayout(kp_layout) # 连接滑块值变化事件 self.kp_slider.valueChanged.connect(lambda value: self.kp_value.setText(f"{value / 100:.2f}")) # Ki参数 group_layout.addWidget(QLabel("积分增益(Ki):")) ki_layout = QHBoxLayout() self.ki_slider = QSlider(Qt.Orientation.Horizontal) self.ki_slider.setRange(0, 500) # 0.0000到0.1000,步长0.001 self.ki_slider.setValue(int(float(self.config.get('pid_ki', '0.05')) * 10000)) ki_layout.addWidget(self.ki_slider) self.ki_value = QLabel(f"{float(self.config.get('pid_ki', '0.05')):.4f}") self.ki_value.setFixedWidth(50) ki_layout.addWidget(self.ki_value) group_layout.addLayout(ki_layout) # 连接滑块值变化事件 self.ki_slider.valueChanged.connect(lambda value: self.ki_value.setText(f"{value / 10000:.4f}")) # Kd参数 group_layout.addWidget(QLabel("微分增益(Kd):")) kd_layout = QHBoxLayout() self.kd_slider = QSlider(Qt.Orientation.Horizontal) self.kd_slider.setRange(0, 5000) # 0.000到5.000,步长0.001 self.kd_slider.setValue(int(float(self.config.get('pid_kd', '0.2')) * 1000)) kd_layout.addWidget(self.kd_slider) self.kd_value = QLabel(f"{float(self.config.get('pid_kd', '0.2')):.3f}") self.kd_value.setFixedWidth(50) kd_layout.addWidget(self.kd_value) group_layout.addLayout(kd_layout) # 连接滑块值变化事件 self.kd_slider.valueChanged.connect(lambda value: self.kd_value.setText(f"{value / 1000:.3f}")) # 说明 info_text = "建议调整顺序: Kp → Kd → Ki\n\n" \ "先调整Kp至响应迅速但不过冲\n" \ "再增加Kd抑制震荡\n" \ "最后微调Ki消除剩余误差" info_label = QLabel(info_text) info_label.setStyleSheet("font-size: 9pt; color: #888888;") group_layout.addWidget(info_label) layout.addWidget(group) layout.addStretch() # 创建贝塞尔曲线设置 def create_bezier_settings(self, layout): group = QGroupBox("贝塞尔曲线参数") group_layout = QVBoxLayout(group) # 步数设置 group_layout.addWidget(QLabel("步数:")) steps_layout = QHBoxLayout() self.steps_slider = QSlider(Qt.Orientation.Horizontal) self.steps_slider.setRange(1, 500) self.steps_slider.setValue(int(self.config.get('bezier_steps', 100))) steps_layout.addWidget(self.steps_slider) self.steps_value = QLabel(str(self.config.get('bezier_steps', 100))) self.steps_value.setFixedWidth(50) steps_layout.addWidget(self.steps_value) group_layout.addLayout(steps_layout) # 连接滑块值变化事件 self.steps_slider.valueChanged.connect(lambda value: self.steps_value.setText(str(value))) # 总移动时间设置 (秒) group_layout.addWidget(QLabel("总移动时间 (秒):")) duration_layout = QHBoxLayout() self.duration_slider = QSlider(Qt.Orientation.Horizontal) self.duration_slider.setRange(0, 100) # 0.01到1.0,步长0.01 self.duration_slider.setValue(int(float(self.config.get('bezier_duration', 0.1)) * 100)) duration_layout.addWidget(self.duration_slider) self.duration_value = QLabel(f"{float(self.config.get('bezier_duration', 0.1)):.2f}") self.duration_value.setFixedWidth(50) duration_layout.addWidget(self.duration_value) group_layout.addLayout(duration_layout) # 连接滑块值变化事件 self.duration_slider.valueChanged.connect(lambda value: self.duration_value.setText(f"{value / 100:.2f}")) # 控制点偏移幅度 group_layout.addWidget(QLabel("控制点偏移幅度 (0-1):")) curve_layout = QHBoxLayout() self.curve_slider = QSlider(Qt.Orientation.Horizontal) self.curve_slider.setRange(0, 100) # 0.00到1.00,步长0.01 self.curve_slider.setValue(int(float(self.config.get('bezier_curve', 0.3)) * 100)) curve_layout.addWidget(self.curve_slider) self.curve_value = QLabel(f"{float(self.config.get('bezier_curve', 0.3)):.2f}") self.curve_value.setFixedWidth(50) curve_layout.addWidget(self.curve_value) group_layout.addLayout(curve_layout) # 连接滑块值变化事件 self.curve_slider.valueChanged.connect(lambda value: self.curve_value.setText(f"{value / 100:.2f}")) # 说明 info_text = "贝塞尔曲线参数说明:\n\n" \ "• 步数: 鼠标移动的细分步数,值越大移动越平滑\n" \ "• 总移动时间: 鼠标移动的总时间,值越小移动越快\n" \ "• 控制点偏移幅度: 控制贝塞尔曲线的弯曲程度,0为直线,1为最大弯曲" info_label = QLabel(info_text) info_label.setStyleSheet("font-size: 9pt; color: #888888;") group_layout.addWidget(info_label) layout.addWidget(group) layout.addStretch() def save_config(self): try: # 保存配置到字典 model_name = self.model_combo.currentText() model_path = self.model_name_to_path.get(model_name, model_name) self.config['model_path'] = model_path self.config['model_device'] = self.device_combo.currentText() self.config['screen_monitor'] = str(self.monitor_spin.value()) self.config['screen_target_size'] = str(self.screen_size_spin.value()) # 检测参数 self.config['detection_conf_thres'] = str(self.conf_slider.value() / 100) self.config['detection_iou_thres'] = str(self.iou_slider.value() / 100) self.config['detection_classes'] = self.classes_edit.text() # 移动设置 self.config['move_fov_horizontal'] = str(self.fov_spin.value()) self.config['move_mouse_dpi'] = str(self.dpi_spin.value()) # 目标点偏移设置 self.config['target_offset_x'] = str(self.x_offset_slider.value()) self.config['target_offset_y'] = str(self.y_offset_slider.value()) # PID设置 self.config['pid_kp'] = str(self.kp_slider.value() / 100) self.config['pid_ki'] = str(self.ki_slider.value() / 10000) self.config['pid_kd'] = str(self.kd_slider.value() / 1000) # 贝塞尔曲线设置 self.config['bezier_steps'] = str(self.steps_slider.value()) self.config['bezier_duration'] = str(self.duration_slider.value() / 100) self.config['bezier_curve'] = str(self.curve_slider.value() / 100) # 保存为TXT格式 with open('detection_config.txt', 'w', encoding='utf-8') as f: for key, value in self.config.items(): f.write(f"{key} = {value}\n") # 检查需要重启的参数是否被修改 restart_required = False restart_params = [] # 比较模型路径是否变化 if self.config['model_path'] != self.original_config.get('model_path', ''): restart_required = True restart_params.append("模型路径") # 比较设备类型是否变化 if self.config['model_device'] != self.original_config.get('model_device', ''): restart_required = True restart_params.append("设备类型") # 比较屏幕区域大小是否变化 if self.config['screen_target_size'] != self.original_config.get('screen_target_size', ''): restart_required = True restart_params.append("屏幕区域大小") # 比较检测类别是否变化 if self.config['detection_classes'] != self.original_config.get('detection_classes', ''): restart_required = True restart_params.append("检测类别") # 动态更新检测器配置 if self.parent() and hasattr(self.parent(), 'detector'): success = self.parent().detector.update_config('detection_config.txt') if success: if restart_required: # 需要重启的参数已修改 param_list = "、".join(restart_params) QMessageBox.information( self, "配置已保存", f"配置已保存!以下参数需要重启才能生效:\n{param_list}\n\n" "其他参数已实时更新。" ) else: # 所有参数都已实时更新 QMessageBox.information(self, "成功", "配置已实时更新生效!") else: QMessageBox.warning(self, "部分更新", "配置更新失败,请查看日志") else: QMessageBox.information(self, "成功", "配置已保存!部分参数需重启生效") self.accept() except Exception as e: QMessageBox.critical(self, "错误", f"保存配置失败: {str(e)}") if __name__ == "__main__": detector = ScreenDetector('detection_config.txt') print(f"\nDXcam检测器初始化完成 | 设备: {detector.device.upper()}") app = QApplication(sys.argv) # 设置全局样式 app.setStyle("Fusion") app.setStyleSheet(""" QWidget { background-color: #252526; color: #D4D4D4; selection-background-color: #0078D7; selection-color: white; } QPushButton { background-color: #0078D7; color: white; border: none; padding: 5px 10px; border-radius: 4px; } QPushButton:hover { background-color: #106EBE; } QPushButton:pressed { background-color: #005A9E; } QComboBox, QLineEdit, QSpinBox, QDoubleSpinBox, QSlider { background-color: #3C3C40; color: #D4D4D4; border: 1px solid #3F3F46; border-radius: 4px; padding: 3px; } QComboBox:editable { background-color: #3C3C40; } QComboBox QAbstractItemView { background-color: #2D2D30; color: #D4D4D4; selection-background-color: #0078D7; selection-color: white; } QLabel { color: #D4D4D4; } QTabWidget::pane { border: 1px solid #3F3F46; background: #252526; } QTabBar::tab { background: #1E1E1E; color: #A0A0A0; padding: 8px 12px; border-top-left-radius: 4px; border-top-right-radius: 4px; } QTabBar::tab:selected { background: #252526; color: #FFFFFF; border-bottom: 2px solid #0078D7; } QTabBar::tab:hover { background: #2D2D30; } QGroupBox { background-color: #252526; border: 1px solid #3F3F46; border-radius: 4px; margin-top: 1ex; } QGroupBox::title { subcontrol-origin: margin; left: 10px; padding: 0 5px; background-color: transparent; color: #CCCCCC; } """) window = MainWindow(detector) window.show() sys.exit(app.exec()) 重构我的代码

import pandas as pd import numpy as np import matplotlib.pyplot as plt from sklearn.preprocessing import StandardScaler from sklearn.metrics import mean_absolute_error, mean_squared_error from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense, Dropout from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau import tensorflow as tf import os from datetime import timedelta # 设置随机种子确保结果可复现 tf.random.set_seed(42) np.random.seed(42) # 1. 数据加载与预处理函数 # -------------------------------------------------- def load_and_preprocess_data(): """加载并预处理所有数据源""" # 加载EC气象数据 ec_df = pd.read_csv('阿拉山口风电场_EC_data.csv', parse_dates=['生成日期', '预测日期']) ec_df = ec_df[ec_df['场站名'] == '阿拉山口风电场'] # 计算EC风速和风向 ec_df['EC风速(m/s)'] = np.sqrt(ec_df['U风分量(m/s)']**2 + ec_df['V风分量(m/s)']**2) ec_df['EC风向(度)'] = np.degrees(np.arctan2(ec_df['V风分量(m/s)'], ec_df['U风分量(m/s)'])) % 360 # 添加EC数据可用时间(生成时间+12小时) ec_df['可用时间'] = ec_df['生成日期'] + timedelta(hours=12) # 选择关键特征 ec_features = [ '可用时间', '预测日期', 'EC风速(m/s)', 'EC风向(度)', '位势高度_850hPa(gpm)', '温度_850hPa(K)', '相对湿度_850hPa(%)', '位势高度_500hPa(gpm)', '温度_500hPa(K)' ] ec_df = ec_df[ec_features] # 加载风机数据 turbine_df = pd.read_csv('阿拉山口风电场风机数据.csv', encoding='utf-8', parse_dates=[0]) turbine_df.columns = ['timestamp', 'wind_speed', 'active_power'] # 加载远动数据 scada_df = pd.read_csv('阿拉山口风电场远动数据.csv', encoding='utf-8', parse_dates=[0]) scada_df.columns = ['timestamp', 'active_power_total'] # 合并风机和远动数据 power_df = pd.merge(turbine_df[['timestamp', 'wind_speed']], scada_df, on='timestamp', how='outer') # 按时间排序并填充缺失值 power_df.sort_values('timestamp', inplace=True) power_df['active_power_total'].ffill(inplace=True) power_df['wind_speed'].ffill(inplace=True) # 创建完整的时间序列索引(15分钟间隔) full_range = pd.date_range( start=power_df['timestamp'].min(), end=power_df['timestamp'].max(), freq='15T' ) power_df = power_df.set_index('timestamp').reindex(full_range).reset_index() power_df.rename(columns={'index': 'timestamp'}, inplace=True) power_df[['wind_speed', 'active_power_total']] = power_df[['wind_speed', 'active_power_total']].ffill() # 合并EC数据到主数据集 ec_data = [] for idx, row in power_df.iterrows(): ts = row['timestamp'] # 获取可用的EC预测(可用时间 <= 当前时间) available_ec = ec_df[ec_df['可用时间'] <= ts] if not available_ec.empty: # 获取最近发布的EC数据 latest_gen = available_ec['可用时间'].max() latest_ec = available_ec[available_ec['可用时间'] == latest_gen] # 找到最接近当前时间点的预测 time_diff = (latest_ec['预测日期'] - ts).abs() closest_idx = time_diff.idxmin() ec_point = latest_ec.loc[closest_idx].copy() ec_point['timestamp'] = ts ec_data.append(ec_point) # 创建EC数据DataFrame并合并 ec_ts_df = pd.DataFrame(ec_data) merged_df = pd.merge(power_df, ec_ts_df, on='timestamp', how='left') # 填充缺失的EC数据 ec_cols = [col for col in ec_ts_df.columns if col not in ['timestamp', '可用时间', '预测日期']] for col in ec_cols: merged_df[col] = merged_df[col].interpolate(method='time') # 添加时间特征 merged_df['hour'] = merged_df['timestamp'].dt.hour merged_df['day_of_week'] = merged_df['timestamp'].dt.dayofweek merged_df['day_of_year'] = merged_df['timestamp'].dt.dayofyear merged_df['month'] = merged_df['timestamp'].dt.month # 计算实际风向(如果有测风塔数据,这里使用EC风向) merged_df['风向(度)'] = merged_df['EC风向(度)'] return merged_df # 2. 数据准备函数 # -------------------------------------------------- def prepare_dataset(df, look_back, forecast_steps, target_col='active_power_total'): """ 准备LSTM训练数据集 :param df: 包含所有特征的DataFrame :param look_back: 回溯时间步长 :param forecast_steps: 预测步长 :param target_col: 目标列名 :return: 标准化后的特征和目标数据集 """ # 选择特征列 feature_cols = [ 'wind_speed', 'active_power_total', 'EC风速(m/s)', '风向(度)', '位势高度_850hPa(gpm)', '温度_850hPa(K)', '相对湿度_850hPa(%)', '位势高度_500hPa(gpm)', '温度_500hPa(K)', 'hour', 'day_of_week', 'day_of_year', 'month' ] # 确保目标列在特征中(用于自回归) if target_col not in feature_cols: feature_cols.append(target_col) # 提取特征和目标 features = df[feature_cols].values target = df[target_col].values # 创建时间序列样本 X, y = [], [] for i in range(len(features) - look_back - forecast_steps): X.append(features[i:i+look_back]) y.append(target[i+look_back:i+look_back+forecast_steps]) X = np.array(X) y = np.array(y) return X, y # 3. LSTM模型构建 # -------------------------------------------------- def create_lstm_model(input_shape, output_steps): """ 创建LSTM模型 :param input_shape: 输入形状 (time_steps, features) :param output_steps: 输出时间步长 :return: 编译好的Keras模型 """ model = Sequential([ LSTM(256, return_sequences=True, input_shape=input_shape), Dropout(0.3), LSTM(128, return_sequences=True), Dropout(0.2), LSTM(64, return_sequences=False), Dropout(0.2), Dense(64, activation='relu'), Dense(32, activation='relu'), Dense(output_steps) ]) model.compile(optimizer='adam', loss='mse', metrics=['mae']) return model # 4. 训练和评估函数 # -------------------------------------------------- def train_and_evaluate_model(X_train, y_train, X_test, y_test, look_back, forecast_steps, model_name): """ 训练和评估LSTM模型 :return: 训练好的模型和评估结果 """ # 数据标准化 X_scaler = StandardScaler() y_scaler = StandardScaler() # 重塑数据用于标准化 X_train_reshaped = X_train.reshape(-1, X_train.shape[2]) X_test_reshaped = X_test.reshape(-1, X_test.shape[2]) # 拟合和转换特征 X_train_scaled = X_scaler.fit_transform(X_train_reshaped).reshape(X_train.shape) X_test_scaled = X_scaler.transform(X_test_reshaped).reshape(X_test.shape) # 拟合和转换目标 y_train_scaled = y_scaler.fit_transform(y_train.reshape(-1, 1)).reshape(y_train.shape) y_test_scaled = y_scaler.transform(y_test.reshape(-1, 1)).reshape(y_test.shape) # 创建模型 model = create_lstm_model( (look_back, X_train.shape[2]), forecast_steps ) # 回调函数 callbacks = [ EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True), ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-6) ] # 训练模型 history = model.fit( X_train_scaled, y_train_scaled, epochs=100, batch_size=64, validation_split=0.2, callbacks=callbacks, verbose=1 ) # 评估模型 y_pred_scaled = model.predict(X_test_scaled) y_pred = y_scaler.inverse_transform(y_pred_scaled.reshape(-1, 1)).reshape(y_test.shape) # 计算性能指标 mae = mean_absolute_error(y_test.flatten(), y_pred.flatten()) rmse = np.sqrt(mean_squared_error(y_test.flatten(), y_pred.flatten())) print(f"{model_name} 模型评估结果:") print(f"MAE: {mae:.2f} kW") print(f"RMSE: {rmse:.2f} kW") # 保存模型 model.save(f'{model_name}_wind_power_model.h5') return model, y_pred, history, mae, rmse # 5. 可视化函数 # -------------------------------------------------- def plot_results(y_true, y_pred, model_name, forecast_steps, mae): """ 可视化预测结果 :param y_true: 实际值 :param y_pred: 预测值 :param model_name: 模型名称 :param forecast_steps: 预测步长 :param mae: 平均绝对误差 """ # 选择一段代表性的时间序列展示 start_idx = 500 end_idx = start_idx + 200 plt.figure(figsize=(15, 7)) # 绘制实际值 plt.plot(y_true.flatten()[start_idx:end_idx], label='实际功率', color='blue', alpha=0.7, linewidth=2) # 绘制预测值 plt.plot(y_pred.flatten()[start_idx:end_idx], label='预测功率', color='red', alpha=0.7, linestyle='--', linewidth=2) plt.title(f'{model_name}风功率预测 (预测步长: {forecast_steps}步, MAE: {mae:.2f} kW)', fontsize=14) plt.xlabel('时间步', fontsize=12) plt.ylabel('有功功率 (kW)', fontsize=12) plt.legend(fontsize=12) plt.grid(True, linestyle='--', alpha=0.7) plt.tight_layout() plt.savefig(f'{model_name}_prediction_plot.png', dpi=300) plt.show() def plot_training_history(history, model_name): """绘制训练过程中的损失曲线""" plt.figure(figsize=(12, 6)) # 绘制训练损失 plt.plot(history.history['loss'], label='训练损失') # 绘制验证损失 if 'val_loss' in history.history: plt.plot(history.history['val_loss'], label='验证损失') plt.title(f'{model_name} 训练过程', fontsize=14) plt.xlabel('训练轮次', fontsize=12) plt.ylabel('损失 (MSE)', fontsize=12) plt.legend(fontsize=12) plt.grid(True, linestyle='--', alpha=0.7) plt.tight_layout() plt.savefig(f'{model_name}_training_history.png', dpi=300) plt.show() # 6. 主函数 # -------------------------------------------------- def main(): print("开始数据加载与预处理...") df = load_and_preprocess_data() # 定义预测配置 ULTRA_SHORT_CONFIG = { 'name': '超短期', 'look_back': 24, # 6小时历史 (24*15min) 'forecast_steps': 16, # 4小时预测 (16*15min) 'test_size': 0.1 # 最后10%作为测试集 } SHORT_TERM_CONFIG = { 'name': '短期', 'look_back': 96, # 24小时历史 (96*15min) 'forecast_steps': 288, # 72小时预测 (288*15min) 'test_size': 0.05 # 最后5%作为测试集(长期预测需要更多历史数据) } # 准备超短期预测数据集 print("\n准备超短期预测数据集...") X_ultra, y_ultra = prepare_dataset( df, ULTRA_SHORT_CONFIG['look_back'], ULTRA_SHORT_CONFIG['forecast_steps'] ) # 划分训练集和测试集 split_idx_ultra = int(len(X_ultra) * (1 - ULTRA_SHORT_CONFIG['test_size'])) X_train_ultra, X_test_ultra = X_ultra[:split_idx_ultra], X_ultra[split_idx_ultra:] y_train_ultra, y_test_ultra = y_ultra[:split_idx_ultra], y_ultra[split_idx_ultra:] # 准备短期预测数据集 print("\n准备短期预测数据集...") X_short, y_short = prepare_dataset( df, SHORT_TERM_CONFIG['look_back'], SHORT_TERM_CONFIG['forecast_steps'] ) # 划分训练集和测试集 split_idx_short = int(len(X_short) * (1 - SHORT_TERM_CONFIG['test_size'])) X_train_short, X_test_short = X_short[:split_idx_short], X_short[split_idx_short:] y_train_short, y_test_short = y_short[:split_idx_short], y_short[split_idx_short:] print("\n数据集统计信息:") print(f"超短期数据集: 总样本={len(X_ultra)}, 训练集={len(X_train_ultra)}, 测试集={len(X_test_ultra)}") print(f"短期数据集: 总样本={len(X_short)}, 训练集={len(X_train_short)}, 测试集={len(X_test_short)}") # 训练和评估超短期模型 print("\n训练超短期预测模型...") ultra_model, ultra_pred, ultra_history, ultra_mae, ultra_rmse = train_and_evaluate_model( X_train_ultra, y_train_ultra, X_test_ultra, y_test_ultra, ULTRA_SHORT_CONFIG['look_back'], ULTRA_SHORT_CONFIG['forecast_steps'], 'ultra_short_term' ) # 可视化超短期结果 plot_results(y_test_ultra, ultra_pred, '超短期', ULTRA_SHORT_CONFIG['forecast_steps'], ultra_mae) plot_training_history(ultra_history, '超短期模型') # 训练和评估短期模型 print("\n训练短期预测模型...") short_model, short_pred, short_history, short_mae, short_rmse = train_and_evaluate_model( X_train_short, y_train_short, X_test_short, y_test_short, SHORT_TERM_CONFIG['look_back'], SHORT_TERM_CONFIG['forecast_steps'], 'short_term' ) # 可视化短期结果 plot_results(y_test_short, short_pred, '短期', SHORT_TERM_CONFIG['forecast_steps'], short_mae) plot_training_history(short_history, '短期模型') # 最终报告 print("\n" + "="*50) print("风功率预测模型训练完成!") print("="*50) print(f"超短期模型 (4小时预测):") print(f" - 回溯步长: {ULTRA_SHORT_CONFIG['look_back']} (6小时)") print(f" - 预测步长: {ULTRA_SHORT_CONFIG['forecast_steps']} (4小时)") print(f" - 测试集MAE: {ultra_mae:.2f} kW") print(f" - 测试集RMSE: {ultra_rmse:.2f} kW") print(f"\n短期模型 (72小时预测):") print(f" - 回溯步长: {SHORT_TERM_CONFIG['look_back']} (24小时)") print(f" - 预测步长: {SHORT_TERM_CONFIG['forecast_steps']} (72小时)") print(f" - 测试集MAE: {short_mae:.2f} kW") print(f" - 测试集RMSE: {short_rmse:.2f} kW") print("="*50) # 保存预测结果 results_df = pd.DataFrame({ 'timestamp': df['timestamp'].iloc[split_idx_short + SHORT_TERM_CONFIG['look_back']:split_idx_short + SHORT_TERM_CONFIG['look_back'] + len(y_test_short)], '实际功率': y_test_short.flatten(), '超短期预测': ultra_pred.flatten()[:len(y_test_short)], '短期预测': short_pred.flatten() }) results_df.to_csv('风功率预测结果.csv', index=False) print("预测结果已保存到 '风功率预测结果.csv'") if __name__ == "__main__": main() 分析上述代码,并按以下要求重新生成完整代码。 1、数据文件夹路径“G:\桌面映射文件夹\运行调控中心\两个细则\功率预测算法大赛\数据” 2、数据中每个电场只有 ec数据和风电场风机数据是全的,其它两个数据,有的场有,有的场没有,所以EC和风电场数据是必须拿来训练的,其它的根据需要。 3、模型不用tensorflow,用pytorch。

大家在看

recommend-type

ELEC5208 Group project submissions.zip_furniturer4m_smart grid_悉

悉尼大学ELEC5208智能电网project的很多组的报告和code都在里面,供学习和参考
recommend-type

基于python单通道脑电信号的自动睡眠分期研究

【作品名称】:基于python单通道脑电信号的自动睡眠分期研究 【适用人群】:适用于希望学习不同技术领域的小白或进阶学习者。可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。 【项目介绍】:网络结构(具体可查看network.py文件): 网络整体结构类似于TinySleepNet,对RNN部分进行了修改,增加了双向RNN、GRU、Attention等网络结构,可根据参数进行调整选择。 定义了seq_len参数,可以更灵活地调整batch_size与seq_len。 数据集加载(具体可查看dataset.py文件) 直接继承自torch的Dataset,并定义了seq_len和shuffle_seed,方便调整输入,并复现实验。 训练(具体可查看train.py文件): 定义并使用了focal loss损失函数 在实验中有使用wandb,感觉用起来还挺方便的,非常便于实验记录追溯 测试(具体可查看test.py文件): 可以输出accuracy、mf1、recall_confusion_matrics、precision_confusion_matrics、f1
recommend-type

bid格式文件电子标书阅读器.zip

软件介绍: bid格式招投标文件阅读器,可以打开浏览、管理电子招标文件,如果打不开标书文件,请按下面步骤检查:1、请查看招标文件(.bid文件)是否下载完全,请用IE下载工具下载;2、查看IE浏览器版本,如果版本低于IE8,低于IE8版本的请升级为IE8浏览器。
recommend-type

机器翻译WMT14数据集

机器翻译WMT14数据集,ACL2014公布的share task,很多模型都在这上benchmark
recommend-type

高通QXDM使用手册.pdf

高通QXDM使用手册,介绍高通QXDM工具软件的使用,中文版的哦。

最新推荐

recommend-type

C#类库封装:简化SDK调用实现多功能集成,构建地磅无人值守系统

内容概要:本文介绍了利用C#类库封装多个硬件设备的SDK接口,实现一系列复杂功能的一键式调用。具体功能包括身份证信息读取、人证识别、车牌识别(支持臻识和海康摄像头)、LED显示屏文字输出、称重数据读取、二维码扫描以及语音播报。所有功能均被封装为简单的API,极大降低了开发者的工作量和技术门槛。文中详细展示了各个功能的具体实现方式及其应用场景,如身份证读取、人证核验、车牌识别等,并最终将这些功能整合到一起,形成了一套完整的地磅称重无人值守系统解决方案。 适合人群:具有一定C#编程经验的技术人员,尤其是需要快速集成多种硬件设备SDK的应用开发者。 使用场景及目标:适用于需要高效集成多种硬件设备SDK的项目,特别是那些涉及身份验证、车辆管理、物流仓储等领域的企业级应用。通过使用这些封装好的API,可以大大缩短开发周期,降低维护成本,提高系统的稳定性和易用性。 其他说明:虽然封装后的API极大地简化了开发流程,但对于一些特殊的业务需求,仍然可能需要深入研究底层SDK。此外,在实际部署过程中,还需考虑网络环境、硬件兼容性等因素的影响。
recommend-type

基于STM32F1的BLDC无刷直流电机与PMSM永磁同步电机源码解析:传感器与无传感器驱动详解

基于STM32F1的BLDC无刷直流电机和PMSM永磁同步电机的驱动实现方法,涵盖了有传感器和无传感两种驱动方式。对于BLDC电机,有传感器部分采用霍尔传感器进行六步换相,无传感部分则利用反电动势过零点检测实现换相。对于PMSM电机,有传感器部分包括霍尔传感器和编码器的方式,无传感部分则采用了滑模观测器进行矢量控制(FOC)。文中不仅提供了详细的代码片段,还分享了许多调试经验和技巧。 适合人群:具有一定嵌入式系统和电机控制基础知识的研发人员和技术爱好者。 使用场景及目标:适用于需要深入了解和实现BLDC和PMSM电机驱动的开发者,帮助他们掌握不同传感器条件下的电机控制技术和优化方法。 其他说明:文章强调了实际调试过程中可能遇到的问题及其解决方案,如霍尔传感器的中断触发换相、反电动势过零点检测的采样时机、滑模观测器的参数调整以及编码器的ABZ解码等。
recommend-type

基于Java的跨平台图像处理软件ImageJ:多功能图像编辑与分析工具

内容概要:本文介绍了基于Java的图像处理软件ImageJ,详细阐述了它的跨平台特性、多线程处理能力及其丰富的图像处理功能。ImageJ由美国国立卫生研究院开发,能够在多种操作系统上运行,包括Windows、Mac OS、Linux等。它支持多种图像格式,如TIFF、PNG、GIF、JPEG、BMP、DICOM、FITS等,并提供图像栈功能,允许多个图像在同一窗口中进行并行处理。此外,ImageJ还提供了诸如缩放、旋转、扭曲、平滑处理等基本操作,以及区域和像素统计、间距、角度计算等高级功能。这些特性使ImageJ成为科研、医学、生物等多个领域的理想选择。 适合人群:需要进行图像处理的专业人士,如科研人员、医生、生物学家,以及对图像处理感兴趣的普通用户。 使用场景及目标:适用于需要高效处理大量图像数据的场合,特别是在科研、医学、生物学等领域。用户可以通过ImageJ进行图像的编辑、分析、处理和保存,提高工作效率。 其他说明:ImageJ不仅功能强大,而且操作简单,用户无需安装额外的运行环境即可直接使用。其基于Java的开发方式确保了不同操作系统之间的兼容性和一致性。
recommend-type

MATLAB语音识别系统:基于GUI的数字0-9识别及深度学习模型应用 · GUI v1.2

内容概要:本文介绍了一款基于MATLAB的语音识别系统,主要功能是识别数字0到9。该系统采用图形用户界面(GUI),方便用户操作,并配有详尽的代码注释和开发报告。文中详细描述了系统的各个组成部分,包括音频采集、信号处理、特征提取、模型训练和预测等关键环节。此外,还讨论了MATLAB在此项目中的优势及其面临的挑战,如提高识别率和处理背景噪音等问题。最后,通过对各模块的工作原理和技术细节的总结,为未来的研究和发展提供了宝贵的参考资料。 适合人群:对语音识别技术和MATLAB感兴趣的初学者、学生或研究人员。 使用场景及目标:适用于希望深入了解语音识别技术原理的人群,特别是希望通过实际案例掌握MATLAB编程技巧的学习者。目标是在实践中学习如何构建简单的语音识别应用程序。 其他说明:该程序需要MATLAB 2019b及以上版本才能正常运行,建议使用者确保软件环境符合要求。
recommend-type

c语言通讯录管理系统源码.zip

C语言项目源码
recommend-type

Teleport Pro教程:轻松复制网站内容

标题中提到的“复制别人网站的软件”指向的是一种能够下载整个网站或者网站的特定部分,然后在本地或者另一个服务器上重建该网站的技术或工具。这类软件通常被称作网站克隆工具或者网站镜像工具。 描述中提到了一个具体的教程网址,并提到了“天天给力信誉店”,这可能意味着有相关的教程或资源可以在这个网店中获取。但是这里并没有提供实际的教程内容,仅给出了网店的链接。需要注意的是,根据互联网法律法规,复制他人网站内容并用于自己的商业目的可能构成侵权,因此在此类工具的使用中需要谨慎,并确保遵守相关法律法规。 标签“复制 别人 网站 软件”明确指出了这个工具的主要功能,即复制他人网站的软件。 文件名称列表中列出了“Teleport Pro”,这是一款具体的网站下载工具。Teleport Pro是由Tennyson Maxwell公司开发的网站镜像工具,允许用户下载一个网站的本地副本,包括HTML页面、图片和其他资源文件。用户可以通过指定开始的URL,并设置各种选项来决定下载网站的哪些部分。该工具能够帮助开发者、设计师或内容分析人员在没有互联网连接的情况下对网站进行离线浏览和分析。 从知识点的角度来看,Teleport Pro作为一个网站克隆工具,具备以下功能和知识点: 1. 网站下载:Teleport Pro可以下载整个网站或特定网页。用户可以设定下载的深度,例如仅下载首页及其链接的页面,或者下载所有可访问的页面。 2. 断点续传:如果在下载过程中发生中断,Teleport Pro可以从中断的地方继续下载,无需重新开始。 3. 过滤器设置:用户可以根据特定的规则过滤下载内容,如排除某些文件类型或域名。 4. 网站结构分析:Teleport Pro可以分析网站的链接结构,并允许用户查看网站的结构图。 5. 自定义下载:用户可以自定义下载任务,例如仅下载图片、视频或其他特定类型的文件。 6. 多任务处理:Teleport Pro支持多线程下载,用户可以同时启动多个下载任务来提高效率。 7. 编辑和管理下载内容:Teleport Pro具备编辑网站镜像的能力,并可以查看、修改下载的文件。 8. 离线浏览:下载的网站可以在离线状态下浏览,这对于需要测试网站在不同环境下的表现的情况十分有用。 9. 备份功能:Teleport Pro可以用来备份网站,确保重要数据的安全。 在实际使用此类工具时,需要注意以下几点: - 著作权法:复制网站内容可能侵犯原作者的版权,因此在使用此类工具时,必须确保有合法权利去下载和使用目标网站的内容。 - 服务条款:许多网站的服务条款明确禁止未经授权的网站克隆。因此,在使用此类软件之前,应当仔细阅读并遵守目标网站的服务条款。 - 数据隐私:下载含有个人数据的网站可能触及隐私保护法律,特别是在欧洲通用数据保护条例(GDPR)等法规的环境下。 - 网络安全:随意下载网站可能导致恶意软件和病毒的传播,用户应当使用最新的反病毒软件,并在安全的环境中进行操作。 综上所述,虽然“复制别人网站的软件”在技术上是可行的,且可以用于多种正当目的,但在使用此类工具时,必须遵守法律法规和道德规范,不可侵犯他人的合法权益。
recommend-type

【跨平台开发者的必读】:解决Qt5Widgetsd.lib目标计算机类型冲突终极指南

# 1. 跨平台开发与Qt框架简介 跨平台开发已经成为现代软件开发中不可或缺的一部分,它使得开发者能够构建出能够在多种操作系统上运行的应用程序,从而触及更广泛的用户群体。在这其中,Qt框架作为一套成熟的跨平台应用程序和用户界面框架,扮演着关键角色。Qt不仅仅是一个跨平台的工具集,它的设计哲学还包含了模块化和可扩展性,使其能够适应各种规模和复杂度的项目。本章节我们将简要介绍
recommend-type

普通RNN结构和特点

<think>首先,用户的问题是:“我想了解普通RNN的结构及其特点请问RNN循环神经网络结构特点工作原理”。这涉及到RNN的结构、特点和工作原理。根据系统级指令:-所有行内数学表达式必须使用$...$格式。-独立公式必须使用$$...$$格式并单独成段。-LaTeX语法正确。-使用中文回答。-生成相关问题。-回答中引用的段落末尾自然地添加引用标识。用户可见层指令:-回答结构清晰,帮助用户逐步解决问题。-保证回答真实可靠。参考站内引用:-引用[1]:关于RNN的基本介绍,为什么需要RNN。-引用[2]:关于RNN的工作原理、结构图,以及与其他网络的比较。用户上一次的问题和我的回答:用户是第一次
recommend-type

探讨通用数据连接池的核心机制与应用

根据给定的信息,我们能够推断出讨论的主题是“通用数据连接池”,这是一个在软件开发和数据库管理中经常用到的重要概念。在这个主题下,我们可以详细阐述以下几个知识点: 1. **连接池的定义**: 连接池是一种用于管理数据库连接的技术,通过维护一定数量的数据库连接,使得连接的创建和销毁操作更加高效。开发者可以在应用程序启动时预先创建一定数量的连接,并将它们保存在一个池中,当需要数据库连接时,可以直接从池中获取,从而降低数据库连接的开销。 2. **通用数据连接池的概念**: 当提到“通用数据连接池”时,它意味着这种连接池不仅支持单一类型的数据库(如MySQL、Oracle等),而且能够适应多种不同数据库系统。设计一个通用的数据连接池通常需要抽象出一套通用的接口和协议,使得连接池可以兼容不同的数据库驱动和连接方式。 3. **连接池的优点**: - **提升性能**:由于数据库连接创建是一个耗时的操作,连接池能够减少应用程序建立新连接的时间,从而提高性能。 - **资源复用**:数据库连接是昂贵的资源,通过连接池,可以最大化现有连接的使用,避免了连接频繁创建和销毁导致的资源浪费。 - **控制并发连接数**:连接池可以限制对数据库的并发访问,防止过载,确保数据库系统的稳定运行。 4. **连接池的关键参数**: - **最大连接数**:池中能够创建的最大连接数。 - **最小空闲连接数**:池中保持的最小空闲连接数,以应对突发的连接请求。 - **连接超时时间**:连接在池中保持空闲的最大时间。 - **事务处理**:连接池需要能够管理不同事务的上下文,保证事务的正确执行。 5. **实现通用数据连接池的挑战**: 实现一个通用的连接池需要考虑到不同数据库的连接协议和操作差异。例如,不同的数据库可能有不同的SQL方言、认证机制、连接属性设置等。因此,通用连接池需要能够提供足够的灵活性,允许用户配置特定数据库的参数。 6. **数据连接池的应用场景**: - **Web应用**:在Web应用中,为了处理大量的用户请求,数据库连接池可以保证数据库连接的快速复用。 - **批处理应用**:在需要大量读写数据库的批处理作业中,连接池有助于提高整体作业的效率。 - **微服务架构**:在微服务架构中,每个服务可能都需要与数据库进行交互,通用连接池能够帮助简化服务的数据库连接管理。 7. **常见的通用数据连接池技术**: - **Apache DBCP**:Apache的一个Java数据库连接池库。 - **C3P0**:一个提供数据库连接池和控制工具的开源Java框架。 - **HikariCP**:目前性能最好的开源Java数据库连接池之一。 - **BoneCP**:一个高性能的开源Java数据库连接池。 - **Druid**:阿里巴巴开源的一个数据库连接池,提供了对性能监控的高级特性。 8. **连接池的管理与监控**: 为了保证连接池的稳定运行,开发者需要对连接池的状态进行监控,并对其进行适当的管理。监控指标可能包括当前活动的连接数、空闲的连接数、等待获取连接的请求队列长度等。一些连接池提供了监控工具或与监控系统集成的能力。 9. **连接池的配置和优化**: 连接池的性能与连接池的配置密切相关。需要根据实际的应用负载和数据库性能来调整连接池的参数。例如,在高并发的场景下,可能需要增加连接池中连接的数量。另外,适当的线程池策略也可以帮助连接池更好地服务于多线程环境。 10. **连接池的应用案例**: 一个典型的案例是电商平台在大型促销活动期间,用户访问量激增,此时通用数据连接池能够保证数据库操作的快速响应,减少因数据库连接问题导致的系统瓶颈。 总结来说,通用数据连接池是现代软件架构中的重要组件,它通过提供高效的数据库连接管理,增强了软件系统的性能和稳定性。了解和掌握连接池的原理及实践,对于任何涉及数据库交互的应用开发都至关重要。在实现和应用连接池时,需要关注其设计的通用性、配置的合理性以及管理的有效性,确保在不同的应用场景下都能发挥出最大的效能。
recommend-type

【LabVIEW网络通讯终极指南】:7个技巧提升UDP性能和安全性

# 摘要 本文系统介绍了LabVIEW在网络通讯中的应用,尤其是针对UDP协议的研究与优化。首先,阐述了UDP的原理、特点及其在LabVIEW中的基础应用。随后,本文深入探讨了通过调整数据包大小、实现并发通信及优化缓冲区管理等技巧来优化UDP性能的LabVIEW方法。接着,文章聚焦于提升UDP通信安全性,介绍了加密技术和认证授权机制在LabVIEW中的实现,以及防御网络攻击的策略。最后,通过具体案例展示了LabVIEW在实时数据采集和远程控制系统中的高级应用,并展望了LabVIEW与UDP通讯技术的未来发展趋势及新兴技术的影响。 # 关键字 LabVIEW;UDP网络通讯;性能优化;安全性;