活动介绍

dataset = [] for data in np.arange(0, 3, .01): data = math.sin(data * math.pi) dataset.append(data) dataset = np.array(dataset) dataset = dataset.astype('float32') max_value = np.max(dataset) min_value = np.min(dataset) scalar = max_value - min_value dataset = list(map(lambda x: x / scalar, dataset)) def create_dataset(dataset, look_back=3): dataX, dataY = [], [] for i in range(len(dataset) - look_back): a = dataset[i:(i + look_back)] dataX.append(a) dataY.append(dataset[i + look_back]) return np.array(dataX), np.array(dataY)以这段代码为基础写预测正弦函数的RNN代码,绘图真实值和预测值

时间: 2024-03-17 12:46:08 浏览: 120
好的,以下是基于给出代码完成预测正弦函数的RNN代码: ``` import numpy as np import math import tensorflow as tf import matplotlib.pyplot as plt # 生成数据 dataset = [] for data in np.arange(0, 3, .01): data = math.sin(data * math.pi) dataset.append(data) dataset = np.array(dataset) dataset = dataset.astype('float32') max_value = np.max(dataset) min_value = np.min(dataset) scalar = max_value - min_value dataset = list(map(lambda x: x / scalar, dataset)) # 创建数据集 look_back = 5 trainX, trainY = create_dataset(dataset, look_back) # 转换为张量 trainX = tf.convert_to_tensor(trainX) trainY = tf.convert_to_tensor(trainY) # 定义模型 model = tf.keras.Sequential([ tf.keras.layers.LSTM(32, input_shape=(look_back, 1)), tf.keras.layers.Dense(1) ]) # 编译模型 model.compile(optimizer=tf.keras.optimizers.Adam(), loss=tf.keras.losses.MeanSquaredError()) # 训练模型 history = model.fit(trainX, trainY, epochs=100, batch_size=16, verbose=2) # 预测结果 predicted = [] for i in range(len(dataset) - look_back): x = np.array(dataset[i:i+look_back]) x = np.reshape(x, (1, look_back, 1)) y = model.predict(x) predicted.append(y[0][0]) # 反归一化 predicted = np.array(predicted) predicted = predicted * scalar # 绘图 plt.plot(dataset[look_back:], label='True Data') plt.plot(predicted, label='Predicted Data') plt.legend() plt.show() ``` 在这个 RNN 模型中,我们使用 LSTM 层来学习时间序列数据的特征,然后使用全连接层来进行预测。训练过程中我们使用了 Adam 优化器和均方误差损失函数。最后,我们使用模型对测试集进行预测,并将预测结果和真实值绘制在同一张图上,以便进行比较。
阅读全文

相关推荐

import numpy as np import matplotlib.pyplot as plt import math import torch from torch import nn import pdb from torch.autograd import Variable import os os.environ['KMP_DUPLICATE_LIB_OK']='True' dataset = [] for data in np.arange(0, 3, .01): data = math.sin(data * math.pi) dataset.append(data) dataset = np.array(dataset) dataset = dataset.astype('float32') max_value = np.max(dataset) min_value = np.min(dataset) scalar = max_value - min_value dataset = list(map(lambda x: x / scalar, dataset)) def create_dataset(dataset, look_back=3): dataX, dataY = [], [] for i in range(len(dataset) - look_back): a = dataset[i:(i + look_back)] dataX.append(a) dataY.append(dataset[i + look_back]) return np.array(dataX), np.array(dataY) data_X, data_Y = create_dataset(dataset) # 对训练集测试集划分,划分比例0.8 train_X, train_Y = data_X[:int(0.8 * len(data_X))], data_Y[:int(0.8 * len(data_Y))] test_X, test_Y = data_Y[int(0.8 * len(data_X)):], data_Y[int(0.8 * len(data_Y)):] train_X = train_X.reshape(-1, 1, 3).astype('float32') train_Y = train_Y.reshape(-1, 1, 3).astype('float32') test_X = test_X.reshape(-1, 1, 3).astype('float32') class RNN(nn.Module): def __init__(self, input_size, hidden_size, output_size=1, num_layer=2): super(RNN, self).__init__() self.input_size = input_size self.hidden_size = hidden_size self.output_size = output_size self.num_layer = num_layer self.rnn = nn.RNN(input_size, hidden_size, batch_first=True) self.linear = nn.Linear(hidden_size, output_size) def forward(self, x): # 补充forward函数 out, h = self.rnn(x) out = self.linear(out[0]) # print("output的形状", out.shape) return out net = RNN(3, 20) criterion = nn.MSELoss(reduction='mean') optimizer = torch.optim.Adam(net.parameters(), lr=1e-2) train_loss = [] test_loss = [] for e in range(1000): pred = net(train_X) loss = criterion(pred, train_Y) optimizer.zero_grad() # 反向传播 loss.backward() optimizer.step() if (e + 1) % 100 == 0: print('Epoch:{},loss:{:.10f}'.format(e + 1, loss.data.item())) train_loss.append(loss.item()) plt.plot(train_loss, label='train_loss') plt.legend() plt.show()画出预测值真实值图

import numpy import numpy as np import matplotlib.pyplot as plt import math import torch from torch import nn from torch.utils.data import DataLoader, Dataset import os os.environ['KMP_DUPLICATE_LIB_OK']='True' dataset = [] for data in np.arange(0, 3, .01): data = math.sin(data * math.pi) dataset.append(data) dataset = np.array(dataset) dataset = dataset.astype('float32') max_value = np.max(dataset) min_value = np.min(dataset) scalar = max_value - min_value print(scalar) dataset = list(map(lambda x: x / scalar, dataset)) def create_dataset(dataset, look_back=3): dataX, dataY = [], [] for i in range(len(dataset) - look_back): a = dataset[i:(i + look_back)] dataX.append(a) dataY.append(dataset[i + look_back]) return np.array(dataX), np.array(dataY) data_X, data_Y = create_dataset(dataset) train_X, train_Y = data_X[:int(0.8 * len(data_X))], data_Y[:int(0.8 * len(data_Y))] test_X, test_Y = data_Y[int(0.8 * len(data_X)):], data_Y[int(0.8 * len(data_Y)):] train_X = train_X.reshape(-1, 1, 3).astype('float32') train_Y = train_Y.reshape(-1, 1, 3).astype('float32') test_X = test_X.reshape(-1, 1, 3).astype('float32') train_X = torch.from_numpy(train_X) train_Y = torch.from_numpy(train_Y) test_X = torch.from_numpy(test_X) class RNN(nn.Module): def __init__(self, input_size, hidden_size, output_size=1, num_layer=2): super(RNN, self).__init__() self.input_size = input_size self.hidden_size = hidden_size self.output_size = output_size self.num_layer = num_layer self.rnn = nn.RNN(input_size, hidden_size, batch_first=True) self.linear = nn.Linear(hidden_size, output_size) def forward(self, x): out, h = self.rnn(x) out = self.linear(out[0]) return out net = RNN(3, 20) criterion = nn.MSELoss(reduction='mean') optimizer = torch.optim.Adam(net.parameters(), lr=1e-2) train_loss = [] test_loss = [] for e in range(1000): pred = net(train_X) loss = criterion(pred, train_Y) optimizer.zero_grad() # 反向传播 loss.backward() optimizer.step() if (e + 1) % 100 == 0: print('Epoch:{},loss:{:.10f}'.format(e + 1, loss.data.item())) train_loss.append(loss.item()) plt.plot(train_loss, label='train_loss') plt.legend() plt.show()请适当修改代码,并写出预测值和真实值的代码

import pandas as pd import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import DataLoader, TensorDataset from sklearn.preprocessing import MinMaxScaler import numpy as np import math import pickle # ==== 1. 数据加载和归一化 ==== data = pd.read_csv('SBS_MI_fidelity_results.csv') input_cols = ['Delta_f', 'r', 'Delta_nu_L'] target_cols = ['T_SBS', 'G_MI_max', 'D_fidelity'] X = data[input_cols].values.astype(np.float64) Y = data[target_cols].values.astype(np.float64) scaler_x = MinMaxScaler() scaler_y = MinMaxScaler() X_norm = scaler_x.fit_transform(X) Y_norm = scaler_y.fit_transform(Y) X_torch = torch.tensor(X_norm, dtype=torch.float64) Y_torch = torch.tensor(Y_norm, dtype=torch.float64) # ==== 2. 时间步嵌入 ==== class TimeStepEmbedding(nn.Module): def __init__(self, embed_dim): super().__init__() self.embed_dim = embed_dim def forward(self, t): half = self.embed_dim // 2 emb = torch.exp(torch.arange(half, dtype=torch.float64, device=t.device) * (-math.log(10000.0) / (half - 1))) emb = t.float().unsqueeze(1) * emb.unsqueeze(0) emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1) return emb # ==== 3. TabDDPM网络(深宽+Dropout+残差)==== class TabDDPMNet(nn.Module): def __init__(self, x_dim, y_dim, t_dim=256, hidden=1024, n_layers=12, dropout=0.05): super().__init__() self.t_embed = TimeStepEmbedding(t_dim) self.hidden = hidden self.n_layers = n_layers self.x_dim = x_dim self.linears = nn.ModuleList([ nn.Linear(x_dim + y_dim + t_dim if i == 0 else hidden, hidden if i < n_layers-1 else x_dim, dtype=torch.float64) for i in range(n_layers) ]) self.norms = nn.ModuleList([ nn.LayerNorm(hidden, elementwise_affine=False) if i < n_layers-1 else nn.Identity() for i in range(n_layers) ]) self.dropouts = nn.ModuleList([ nn.Dropout(dropout) if i < n_layers-1 else nn.Identity() for i in range(n_layers) ]) self.final_act = nn.Tanh() def forward(self, x_noisy, y, t): t_emb = self.t_embed(t) h = torch.cat([x_noisy, y, t_emb], dim=-1) res = None for i in range(self.n_layers): h_pre = h h = self.linears[i](h) h = self.norms[i](h) h = self.dropouts[i](h) if i < self.n_layers-1: h = F.relu(h) # 残差,每隔3层加一次(保证维度一致) if i > 0 and i % 3 == 0 and h_pre.shape == h.shape: h = h + h_pre return self.final_act(h) # ==== 4. Diffusion流程 ==== class Diffusion: def __init__(self, model, x_dim, y_dim, timesteps=1000, device='cpu'): self.model = model self.timesteps = timesteps self.device = device self.betas = self.cosine_beta_schedule(timesteps).to(device) self.alphas = 1. - self.betas self.alphas_cumprod = torch.cumprod(self.alphas, dim=0).to(device) def cosine_beta_schedule(self, timesteps, s=0.008): steps = timesteps + 1 x = torch.linspace(0, timesteps, steps, dtype=torch.float64) alphas_cumprod = torch.cos(((x / timesteps) + s) / (1 + s) * math.pi * 0.5) ** 2 alphas_cumprod = alphas_cumprod / alphas_cumprod[0] betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1]) return torch.clip(betas, 0, 0.999) def q_sample(self, x0, t, noise=None): if noise is None: noise = torch.randn_like(x0) t_device = t.to(self.alphas_cumprod.device) sqrt_alphas_cumprod_t = torch.sqrt(self.alphas_cumprod[t_device]).unsqueeze(-1) sqrt_one_minus_alphas_cumprod_t = torch.sqrt(1. - self.alphas_cumprod[t_device]).unsqueeze(-1) return sqrt_alphas_cumprod_t * x0 + sqrt_one_minus_alphas_cumprod_t * noise def train_step(self, x0, y, optimizer, accumulation_steps=4): batch = x0.shape[0] t = torch.randint(0, self.timesteps, (batch,), device=x0.device) noise = torch.randn_like(x0) x_noisy = self.q_sample(x0, t, noise) pred = self.model(x_noisy, y, t) loss = F.mse_loss(pred, noise) loss = loss / accumulation_steps loss.backward() return loss.item() @torch.no_grad() def sample(self, y_cond, n_sample=1): y_cond = y_cond.to(self.device) x = torch.randn(n_sample, self.model.x_dim, device=self.device, dtype=torch.float64) for t in reversed(range(self.timesteps)): t_batch = torch.full((n_sample,), t, dtype=torch.long, device=self.device) pred_noise = self.model(x, y_cond, t_batch) alpha = self.alphas[t] alpha_bar = self.alphas_cumprod[t] beta = self.betas[t] if t > 0: noise = torch.randn_like(x) else: noise = torch.zeros_like(x) x = (x - beta * pred_noise / torch.sqrt(1 - alpha_bar)) / torch.sqrt(alpha) x = x + torch.sqrt(beta) * noise x = torch.clamp(x, -1.0, 1.0) return x # ==== 5. 训练设置 ==== batch_size = 64 epochs = 1200 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') model = TabDDPMNet( x_dim=len(input_cols), y_dim=len(target_cols), t_dim=256, hidden=1024, n_layers=12, dropout=0.05 ).to(device) diffusion = Diffusion(model, x_dim=len(input_cols), y_dim=len(target_cols), timesteps=1000, device=device) optimizer = torch.optim.AdamW( model.parameters(), lr=5e-4, # 适中学习率 weight_decay=1e-6 ) scheduler = torch.optim.lr_scheduler.MultiStepLR( optimizer, milestones=[400, 800, 1000], gamma=0.4 ) inv_dataset = TensorDataset(X_torch, Y_torch) inv_loader = DataLoader(inv_dataset, batch_size=batch_size, shuffle=True) min_loss = float('inf') early_stop_count = 0 patience = 100 print("开始训练...") for epoch in range(epochs): model.train() total_loss = 0 optimizer.zero_grad() for i, (x0, y_cond) in enumerate(inv_loader): x0, y_cond = x0.to(device), y_cond.to(device) loss = diffusion.train_step(x0, y_cond, optimizer, accumulation_steps=4) total_loss += loss # 梯度累积,每4步优化1次 if (i + 1) % 4 == 0 or (i + 1) == len(inv_loader): nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) optimizer.step() optimizer.zero_grad() avg_loss = total_loss / len(inv_loader) scheduler.step() current_lr = optimizer.param_groups[0]['lr'] if avg_loss < min_loss: min_loss = avg_loss early_stop_count = 0 torch.save(model.state_dict(), 'best_tabddpm.pth') with open('scaler_x.pkl', 'wb') as f: pickle.dump(scaler_x, f) with open('scaler_y.pkl', 'wb') as f: pickle.dump(scaler_y, f) else: early_stop_count += 1 if epoch % 10 == 0: print(f"Epoch {epoch:4d} | Loss: {avg_loss:.8f} | LR: {current_lr:.2e} | min_loss: {min_loss:.8f}") if early_stop_count >= patience: print(f"早停触发于第 {epoch} 轮") break print("训练完成,最佳模型已保存至 'best_tabddpm.pth'") # ==== 6. 逆向采样&反归一化函数 ==== def inverse_design(model, diffusion, scaler_x, scaler_y, target_y, device, n_sample=1): model.eval() target_y_norm = scaler_y.transform(target_y) target_y_tensor = torch.tensor(target_y_norm, dtype=torch.float64, device=device) with torch.no_grad(): gen_x_norm = diffusion.sample(target_y_tensor, n_sample=n_sample) gen_x_norm = (gen_x_norm.cpu().numpy() + 1) / 2 gen_x_norm = np.clip(gen_x_norm, 0, 1) gen_x = scaler_x.inverse_transform(gen_x_norm) return gen_x # ==== 7. 逆向采样演示 ==== model.load_state_dict(torch.load('best_tabddpm.pth')) with open('scaler_x.pkl', 'rb') as f: scaler_x = pickle.load(f) with open('scaler_y.pkl', 'rb') as f: scaler_y = pickle.load(f) target_y = [[18, 0.01, 1.0]] gen_x = inverse_design(model, diffusion, scaler_x, scaler_y, target_y, device, n_sample=1) print('最优设计参数(逆向设计结果):', dict(zip(input_cols, gen_x[0])))

import os import glob import open3d as o3d import numpy as np from scapy.all import rdpcap import struct import math # === 配置路径 === pcd_folder = r"H:\DataSet\0310\17\1" pcap_file = r"H:\DataSet\0310\17\1\2025-03-10-14-39-21_Velodyne-VLP-16-Data.pcap" output_folder = r"H:\DataSet\0310\17\1\ply" os.makedirs(output_folder, exist_ok=True) # ====== 提取时间戳列表(按升序排序)====== timestamps = sorted([float(os.path.splitext(os.path.basename(f))[0]) for f in glob.glob(os.path.join(pcd_folder, "*.pcd"))]) # ====== 读取 pcap 文件 ====== print("正在读取 pcap 文件...(可能较慢)") packets = rdpcap(pcap_file) print(f"读取完毕,共 {len(packets)} 个数据包") # === 解码单个 Velodyne packet(支持 VLP-16)=== def decode_velodyne_packet(pkt): payload = bytes(pkt.load) if len(payload) != 1206: return None DIST_RES = 0.002 # 距离分辨率(m) ROT_RES = 0.01 # 角度分辨率(°) VERT_ANGLES = [-15, 1, -13, 3, -11, 5, -9, 7, -7, 9, -5, 11, -3, 13, -1, 15] points = [] for block_id in range(12): block_offset = block_id * 100 flag, azimuth = struct.unpack_from("<HH", payload, block_offset) if flag != 0xFFEE: continue # 跳过无效的数据包 azimuth = azimuth * ROT_RES for firing in range(2): for laser_id in range(16): offset = block_offset + 4 + firing * 48 + laser_id * 3 distance, intensity = struct.unpack_from("<HB", payload, offset) distance = distance * DIST_RES if distance == 0: continue # 跳过无效点 vert_angle = VERT_ANGLES[laser_id] vert_rad = math.radians(vert_angle) azi_rad = math.radians(azimuth) x = distance * math.cos(vert_rad) * math.sin(azi_rad) y = distance * math.cos(vert_rad) * math.cos(azi_rad) z = distance * math.sin(vert_rad) points.append([x, y, z]) return np.array(points, dtype=np.float64) 这个对激光雷达点云解码的不对

import os import time import itertools import math import numpy as np import scipy as sp import scipy.sparse as sps from scipy.sparse.linalg import splu import torch import torch.nn as nn import pyamg from scipy.sparse import csr_matrix, isspmatrix_csr, diags from pyamg.multilevel import multilevel_solver from warnings import warn from scipy.sparse import csr_matrix, isspmatrix_csr, SparseEfficiencyWarning from pyamg.relaxation.smoothing import change_smoothers device = 'cpu' # ========== 辅助函数 ========== def prolongation_fn(grid_size): res_stencil = np.zeros((3,3), dtype=np.double) k=16 res_stencil[0,0] = 1/k res_stencil[0,1] = 2/k res_stencil[0,2] = 1/k res_stencil[1,0] = 2/k res_stencil[1,1] = 4/k res_stencil[1,2] = 2/k res_stencil[2,0] = 1/k res_stencil[2,1] = 2/k res_stencil[2,2] = 1/k P_stencils = np.zeros((grid_size//2, grid_size//2, 3, 3)) for i in range(grid_size//2): for j in range(grid_size//2): P_stencils[i,j,:,:] = res_stencil return compute_p2(P_stencils, grid_size).astype(np.double) def compute_p2(P_stencil, grid_size): indexes = get_p_matrix_indices_one(grid_size) P = csr_matrix((P_stencil.reshape(-1), (indexes[:, 1], indexes[:, 0])), shape=((grid_size//2) ** 2, (grid_size) ** 2)) return P def get_p_matrix_indices_one(grid_size): K = map_2_to_1(grid_size=grid_size) indices = [] for ic in range(grid_size // 2): i = 2 * ic + 1 for jc in range(grid_size // 2): j = 2 * jc + 1 J = int(grid_size // 2 * jc + ic) for k in range(3): for m in range(3): I = int(K[i, j, k, m]) indices.append([I, J]) return np.array(indices) def map_2_to_1(grid_size=8): k = np.zeros((grid_size, grid_size, 3, 3)) M = np.reshape(np.arange(grid_size ** 2), (grid_size, grid_size)).T M = np.concatenate([M, M], axis=0) M = np.concatenate([M, M], axis=1) for i in range(3): I = (i - 1) % grid_size for j in range(3): J = (j - 1) % grid_size k[:, :, i, j] = M[I:I + grid_size, J:J + grid_size] return k def diffusion_stencil_2d(epsilon=1.0, theta=0.0, type='FD'): eps = float(epsilon) theta = float(theta) C = np.cos(theta) S = np.sin(theta) CS = C*S CC = C**2 SS = S**2 if type == 'FE': a = (-1*eps - 1)*CC + (-1*eps - 1)*SS + (3*eps - 3)*CS b = (2*eps - 4)*CC + (-4*eps + 2)*SS c = (-1*eps - 1)*CC + (-1*eps - 1)*SS + (-3*eps + 3)*CS d = (-4*eps + 2)*CC + (2*eps - 4)*SS e = (8*eps + 8)*CC + (8*eps + 8)*SS stencil = np.array([[a, b, c],[d, e, d],[c, b, a]]) / 6.0 elif type == 'FD': a = -0.5*(eps - 1)*CS b = -(eps*SS + CC) c = -a d = -(eps*CC + SS) e = 2.0*(eps + 1) stencil = np.array([[a, d, c],[b, e, b],[c, d, a]]) return stencil def coo_to_tensor(coo): values = coo.data.astype(np.float64) indices = np.vstack((coo.row, coo.col)) i = torch.LongTensor(indices) v = torch.DoubleTensor(values) shape = coo.shape return torch.sparse_coo_tensor(i, v, torch.Size(shape)).to(device) # ========== 光滑算子 ========== def neural_smoother(net, size, mixed=0): # 返回PyTorch张量而不是SciPy矩阵 if mixed == 1: I = torch.eye(size*size, dtype=torch.double, device=device) x0 = I for conv_layer in net.convLayers1: kernel = conv_layer.weight.detach().view(3, 3) M = toeplitz_conv(kernel, size) x0 = torch.mm(M, x0) return x0 else: I = torch.eye(size*size, dtype=torch.double, device=device) x0 = I for conv_layer in net.convLayers1: kernel = conv_layer.weight.detach().view(3, 3) M = toeplitz_conv(kernel, size) x0 = torch.mm(M, x0) kernel2 = net.convLayers2[0].weight.detach().view(3, 3) M2 = toeplitz_conv(kernel2, size) y = x0 + (2/3) * M2 return y def toeplitz_conv(kernel, size): # 将3x3卷积核转换为Toeplitz矩阵 full_size = size * size M = torch.zeros(full_size, full_size, dtype=torch.double, device=device) for i in range(size): for j in range(size): idx = i * size + j for di in [-1, 0, 1]: for dj in [-1, 0, 1]: ni, nj = i + di, j + dj if 0 <= ni < size and 0 <= nj < size: nidx = ni * size + nj k_val = kernel[di+1, dj+1] M[idx, nidx] = k_val return M # ========== Level 创建 ========== def create_levels(eps, theta, n): mxl = 5 # max levels levels = [] # 创建最细层 s = diffusion_stencil_2d(eps, theta * np.pi / 180, 'FD') * 2 A = pyamg.gallery.stencil_grid(s, (n, n)).tocsr() # 创建第一层 - 使用PyAMG的level类而不是字典 level0 = multilevel_solver.level() level0.A = A level0.N = n level0.l = A.shape[0] levels.append(level0) current_n = n for i in range(1, mxl): # 因为已经有一层,所以从1开始 # 获取当前最细层(最后一层) fine_level = levels[-1] current_n = fine_level.N # 创建限制算子 R = prolongation_fn(current_n) # 插值算子是限制算子的转置 P = R.T * 4 # 存储到当前层(细层) fine_level.R = R fine_level.P = P # 为下一层准备:计算粗网格矩阵 A_coarse = R @ fine_level.A @ P # 创建粗网格层 coarse_level = multilevel_solver.level() coarse_level.A = A_coarse coarse_level.N = current_n // 2 # 网格大小减半 coarse_level.l = A_coarse.shape[0] levels.append(coarse_level) # 检查是否达到最小网格 if coarse_level.N < 8: break return levels # ========== Problem Class ========== class Problem: def __init__(self, eps, theta, grid_size, k=20, initial_ground_truth=None, initial_u=None, levels=None, net_trained=None, mxl=0): self.eps = eps self.theta = theta self.grid_size = grid_size if levels is None: levels = create_levels(eps, theta, grid_size) self.levels = levels N = levels[0].N l = levels[0].l # 初始化真实解 if initial_ground_truth is None: self.ground_truth = torch.rand(l, 1, dtype=torch.double, device=device, requires_grad=False) else: self.ground_truth = initial_ground_truth.detach().requires_grad_(False) # 初始解 if initial_u is None: self.initial_u = torch.rand(l, 1, dtype=torch.double, device=device, requires_grad=False) else: self.initial_u = initial_u.detach().requires_grad_(False) self.k = k self.N = N self.levels = levels self.mxl = mxl self.net_trained = net_trained or [] # 冻结预训练网络的参数 for net in self.net_trained: for param in net.parameters(): param.requires_grad = False # 使用SciPy稀疏矩阵计算右端项 A_sparse = self.levels[0].A gt_numpy = self.ground_truth.detach().cpu().numpy().flatten() f_numpy = A_sparse @ gt_numpy self.f = torch.tensor(f_numpy, dtype=torch.double, device=device).view(-1, 1).requires_grad_(False) def compute_solution(self, net): with torch.no_grad(): # 禁用梯度计算 A_sparse = self.levels[0].A # SciPy稀疏矩阵 b = self.f.detach().cpu().numpy().flatten() # 创建多重网格求解器 solver_a_CNN = multigrid_solver(A_sparse, self.grid_size, {'smoother': 'a-CNN', 'eps': self.eps, 'theta': self.theta}, net, self.net_trained, self.mxl) u_solution = solver_a_CNN.solve(b, maxiter=10, tol=1e-6) return torch.tensor(u_solution, dtype=torch.double, device=device).view(-1, 1) # ========== 求解器 ========== def multigrid_solver(A, size, args, net, net_trained, mxl): solver = geometric_solver(A, prolongation_fn, max_levels=5, coarse_solver='splu') if net_trained!=0: nets = [net]+net_trained else: nets = [net] if args['smoother'] == 'a-CNN': # mxl最大是5 i in range(4) 0 1 2 3 for i in range(mxl-1): # 创建当前层的光滑算子 M = neural_smoother(nets[i], size// (2 ** i )) # 定义光滑函数 - 修改后版本 def relax(A, x, b, M_new=M): # 计算残差 (使用NumPy的稀疏矩阵操作) r = b - A.dot(x) # 转换为PyTorch张量进行矩阵乘法 r_tensor = torch.tensor(r, dtype=torch.double, device='cpu').view(-1, 1) correction = M_new @ r_tensor # 转回NumPy并更新解 x += correction.view(-1).cpu().numpy() # 设置光滑器 solver.levels[i].presmoother = relax solver.levels[i].postsmoother = relax return solver def geometric_solver(A, prolongation_function, presmoother=('gauss_seidel', {'sweep': 'forward'}), postsmoother=('gauss_seidel', {'sweep': 'forward'}), max_levels=5, max_coarse=10, coarse_solver='splu', **kwargs): levels = [multilevel_solver.level()] # convert A to csr if not isspmatrix_csr(A): try: A = csr_matrix(A) warn("Implicit conversion of A to CSR", SparseEfficiencyWarning) except BaseException: raise TypeError('Argument A must have type csr_matrix, or be convertible to csr_matrix') # preprocess A A = A.asfptype() if A.shape[0] != A.shape[1]: raise ValueError('expected square matrix') levels[-1].A = A while len(levels) < max_levels and levels[-1].A.shape[0] > max_coarse: extend_hierarchy(levels, prolongation_function) # 使用MultilevelSolver代替弃用的multilevel_solver ml = pyamg.multilevel.MultilevelSolver(levels, **kwargs) change_smoothers(ml, presmoother, postsmoother) return ml # internal function def extend_hierarchy(levels, prolongation_fn): """Extend the multigrid hierarchy.""" A = levels[-1].A N = A.shape[0] n = int(math.sqrt(N)) R = prolongation_fn(n) P = R.T.tocsr() * 4 levels[-1].P = P # prolongation operator levels[-1].R = R # restriction operator levels.append(multilevel_solver.level()) # Form next level through Galerkin product A = R * A * P A = A.astype(np.float64) # convert from complex numbers, should have A.imag==0 levels[-1].A = A # ========== 神经网络模型 ========== class _ConvNet_(nn.Module): def __init__(self, initial=5, kernel_size=3, initial_kernel=0.1): super(_ConvNet_, self).__init__() self.convLayers1 = nn.ModuleList([ nn.Conv2d(1, 1, kernel_size, padding=kernel_size//2, bias=False).double() for _ in range(5) ]) self.convLayers2 = nn.ModuleList([ nn.Conv2d(1, 1, kernel_size, padding=kernel_size//2, bias=False).double() for _ in range(2) ]) # 初始化权重 initial_weights = torch.zeros(1, 1, kernel_size, kernel_size, dtype=torch.double) initial_weights[0, 0, kernel_size//2, kernel_size//2] = initial_kernel for net in self.convLayers1: net.weight = nn.Parameter(initial_weights.clone()) for net in self.convLayers2: net.weight = nn.Parameter(initial_weights.clone()) def forward(self, x): y1 = x y2 = x for net in self.convLayers1: y1 = torch.tanh(net(y1)) for net in self.convLayers2: y2 = torch.tanh(net(y2)) return y1 + (2/3) * y2 def compute_loss(net, problem_instances): loss = torch.zeros(1, device=device, requires_grad=True) for problem in problem_instances: # 确保计算图连接 with torch.set_grad_enabled(True): u_pred = problem.compute_solution(net) u_true = problem.ground_truth # 确保梯度可以回传 u_pred.requires_grad_(True) u_true.requires_grad_(False) # 计算损失 diff = u_pred - u_true norm_diff = torch.norm(diff) norm_true = torch.norm(u_true) loss = loss + norm_diff / norm_true return loss def chunks(l, n): for i in range(0, len(l), n): yield l[i:i + n] def set_seed(seed): torch.manual_seed(seed) np.random.seed(seed) # ========== AlphaCNN ========== class alphaCNN: def __init__(self, net=None, batch_size=1, learning_rate=1e-6, max_epochs=1000, nb_layers=5, tol=1e-6, stable_count=50, optimizer='SGD', check_spectral_radius=False, random_seed=None, kernel_size=3, initial_kernel=0.1): if random_seed is not None: set_seed(random_seed) if net is None: self.net = _ConvNet_(initial=5, kernel_size=kernel_size, initial_kernel=initial_kernel).to(device) else: self.net = net # 确保网络参数需要梯度 for param in self.net.parameters(): param.requires_grad = True self.learning_rate = learning_rate if optimizer == 'Adadelta': self.optim = torch.optim.Adadelta(self.net.parameters(), lr=learning_rate) elif optimizer == 'Adam': self.optim = torch.optim.Adam(self.net.parameters(), lr=learning_rate) else: self.optim = torch.optim.SGD(self.net.parameters(), lr=learning_rate) self.batch_size = batch_size self.max_epochs = max_epochs self.tol = tol self.stable_count = stable_count def _optimization_step_(self, problem_instances): shuffled_problem_instances = np.random.permutation(problem_instances) for problem_chunk in chunks(shuffled_problem_instances, self.batch_size): self.optim.zero_grad() loss = compute_loss(self.net, problem_chunk) # 检查梯度是否存在 if loss.grad_fn is None: raise RuntimeError("Loss has no gradient. Check the computation graph.") loss.backward() self.optim.step() # 确保梯度被应用 with torch.no_grad(): for param in self.net.parameters(): if param.grad is not None: param -= self.learning_rate * param.grad def fit(self, problem_instances): losses = [] prev_total_loss = compute_loss(self.net, problem_instances).item() convergence_counter = 0 problem_number = len(problem_instances) for n_epoch in range(self.max_epochs): start_time = time.time() self._optimization_step_(problem_instances) total_loss = compute_loss(self.net, problem_instances).item() losses.append(total_loss) if np.abs(total_loss - prev_total_loss) < self.tol * problem_number: convergence_counter += 1 if convergence_counter >= self.stable_count: print(f"Converged after {n_epoch} epochs") break else: convergence_counter = 0 prev_total_loss = total_loss epoch_time = time.time() - start_time if n_epoch % 10 == 0: print(f"Epoch: {n_epoch:>3} Loss: {total_loss:>10.6f} Time: {epoch_time:.2f}s") self.losses = losses print(f"Training completed. Final loss: {total_loss:.6f}") return self # ========== 模型训练 ========== def train_and_save_model(eps, theta, coarsening='full'): n = 33 # 网格大小 # 创建模型目录 model_dir = f'./models/theta_{theta}_eps_{eps}' if not os.path.isdir(model_dir): os.makedirs(model_dir) # 创建层级结构 levels = create_levels(eps, theta, n) # 第一层训练 (最粗层) problem_instances1 = [ Problem(eps, theta, n, k=k, levels=levels, mxl=1) for k in range(1, 13) ] model1 = alphaCNN( batch_size=8, learning_rate=1e-8, max_epochs=1000, nb_layers=5, tol=1e-6, stable_count=10, optimizer='Adam', random_seed=9, initial_kernel=0.1 ) model1.fit(problem_instances1) torch.save(model1.net.state_dict(), os.path.join(model_dir, f'theta_{theta}_eps_{eps}_level1.pth')) # 第二层训练 problem_instances2 = [ Problem(eps, theta, n, k=k, levels=levels, mxl=2, net_trained=[model1.net]) for k in range(1, 15) ] model2 = alphaCNN( batch_size=8, learning_rate=1e-8, max_epochs=1000, nb_layers=5, tol=1e-6, stable_count=10, optimizer='Adam', random_seed=9, initial_kernel=0.02/3 ) model2.fit(problem_instances2) torch.save(model2.net.state_dict(), os.path.join(model_dir, f'theta_{theta}_eps_{eps}_level2.pth')) # 第三层训练 problem_instances3 = [ Problem(eps, theta, n, k=k, levels=levels, mxl=3, net_trained=[model1.net, model2.net]) for k in range(1, 17) ] model3 = alphaCNN( batch_size=8, learning_rate=1e-8, max_epochs=1000, nb_layers=5, tol=1e-6, stable_count=10, optimizer='Adam', random_seed=9, initial_kernel=0.002/3 ) model3.fit(problem_instances3) torch.save(model3.net.state_dict(), os.path.join(model_dir, f'theta_{theta}_eps_{eps}_level3.pth')) # 第四层训练 (最细层) problem_instances4 = [ Problem(eps, theta, n, k=k, levels=levels, mxl=4, net_trained=[model1.net, model2.net, model3.net]) for k in range(1, 20) ] model4 = alphaCNN( batch_size=8, learning_rate=1e-8, max_epochs=1000, nb_layers=5, tol=1e-6, stable_count=10, optimizer='Adam', random_seed=9, initial_kernel=0.002/3 ) model4.fit(problem_instances4) torch.save(model4.net.state_dict(), os.path.join(model_dir, f'theta_{theta}_eps_{eps}_level4.pth')) # 训练模型 if __name__ == "__main__": train_and_save_model(100, 75) 损失值太大,帮我修改代码,检查是否有错误

import numpy as np import pywt import matplotlib.pyplot as plt import torch import torch.nn as nn import torch.optim as optim from sklearn.preprocessing import MinMaxScaler from torch.utils.data import Dataset, DataLoader from matplotlib import cm # 设置随机种子 np.random.seed(0) torch.manual_seed(0) # Step 1: 数据生成(虚拟时间序列) # 模拟带有趋势与高频噪声的数据,如温度、流量等 T = 1000 x = np.arange(T) trend = 0.01 * x season = np.sin(2 * np.pi * x / 50) noise = 0.3 * np.random.randn(T) data = trend + season + noise # 可视化:原始时间序列 plt.figure(figsize=(10, 4)) plt.plot(x, data, color='darkorange') plt.title('Original Time Series') plt.xlabel('Time') plt.ylabel('Value') plt.grid(True) plt.tight_layout() plt.show() plt.close() # Step 2: 小波分解 + 数据标准化 wavelet = 'db4' coeffs = pywt.wavedec(data, wavelet, level=2) a2, d2, d1 = coeffs new_coeffs = [np.zeros_like(a2), np.zeros_like(d2), d1] # 重构不同频率分量 low_freq = pywt.waverec([a2, None, None], wavelet) mid_freq = pywt.waverec([None, d2, None], wavelet) high_freq = pywt.waverec(new_coeffs, wavelet) # 统一长度 min_len = min(len(low_freq), T) components = np.stack([low_freq[:min_len], mid_freq[:min_len], high_freq[:min_len]], axis=1) # 标准化 scaler = MinMaxScaler() components_scaled = scaler.fit_transform(components) target = data[2:2+min_len] # 滞后2步预测,避免数据泄露 # Step 3: 构造 PyTorch 数据集 class TimeSeriesDataset(Dataset): def __init__(self, X, y, window_size): self.X = [] self.y = [] for i in range(len(X) - window_size): self.X.append(X[i:i+window_size]) self.y.append(y[i+window_size]) self.X = torch.tensor(self.X, dtype=torch.float32) self.y = torch.tensor(self.y, dtype=torch.float32) def __len__(self): return len(self.X) def __getitem__(self, idx): return self.X[idx], self.y[idx] window_size = 32 dataset = TimeSeriesDataset(components_scaled, target, window_size) train_size = int(len(dataset) * 0.8) train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, len(dataset) - train_size]) train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) test_loader = DataLoader(test_dataset, batch_size=32) # Step 4: 构建 Transformer 模型 class WaveletTransformer(nn.Module): def __init__(self, input_dim, d_model=64, nhead=4, num_layers=2): super().__init__() self.linear_in = nn.Linear(input_dim, d_model) encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, batch_first=True) self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers) self.decoder = nn.Sequential( nn.Linear(d_model, 32), nn.ReLU(), nn.Linear(32, 1) ) def forward(self, x): x = self.linear_in(x) x = self.transformer(x) x = x[:, -1, :] out = self.decoder(x).squeeze(-1) return out model = WaveletTransformer(input_dim=3) # Step 5: 模型训练 criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=0.001) epochs = 50 train_losses = [] test_losses = [] for epoch in range(epochs): model.train() total_loss = 0 for X_batch, y_batch in train_loader: optimizer.zero_grad() output = model(X_batch) loss = criterion(output, y_batch) loss.backward() optimizer.step() total_loss += loss.item() train_losses.append(total_loss / len(train_loader)) model.eval() with torch.no_grad(): test_loss = sum(criterion(model(X), y).item() for X, y in test_loader) / len(test_loader) test_losses.append(test_loss) # 可视化:训练曲线 plt.figure(figsize=(8, 4)) plt.plot(train_losses, label='Train Loss', color='royalblue') plt.plot(test_losses, label='Test Loss', color='tomato') plt.title('Loss Curve') plt.xlabel('Epoch') plt.ylabel('MSE') plt.legend() plt.grid(True) plt.tight_layout() plt.show() # Step 6: 模型预测与结果分析 model.eval() preds = [] y_true = [] with torch.no_grad(): for X, y in test_loader: pred = model(X) preds.extend(pred.numpy()) y_true.extend(y.numpy()) # 可视化:预测 vs 真实 plt.figure(figsize=(10, 4)) plt.plot(y_true, label='True', color='green') plt.plot(preds, label='Predicted', color='purple') plt.title('Prediction vs Ground Truth') plt.xlabel('Sample Index') plt.ylabel('Value') plt.legend() plt.tight_layout() plt.show() # 可视化:不同频段成分 plt.figure(figsize=(10, 6)) plt.subplot(3, 1, 1) plt.plot(components[:min_len, 0], color='blue') plt.title('Low Frequency Component') plt.subplot(3, 1, 2) plt.plot(components[:min_len, 1], color='orange') plt.title('Mid Frequency Component') plt.subplot(3, 1, 3) plt.plot(components[:min_len, 2], color='red') plt.title('High Frequency Component') plt.tight_layout() plt.show() # 可视化:预测误差分布 errors = np.array(preds) - np.array(y_true) plt.figure(figsize=(8, 4)) plt.hist(errors, bins=40, color='steelblue', edgecolor='black') plt.title('Prediction Error Distribution') plt.xlabel('Error') plt.ylabel('Count') plt.grid(True) plt.tight_layout() plt.show()查找代码错误

import tkinter as tk from tkinter import ttk, filedialog, messagebox import pandas as pd import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import tensorflow as tf from tensorflow.keras.models import Model from tensorflow.keras.layers import Input, Dense, Lambda from tensorflow.keras.optimizers import Adam from sklearn.preprocessing import MinMaxScaler import os import time import warnings import matplotlib.dates as mdates warnings.filterwarnings('ignore', category=UserWarning, module='tensorflow') mpl.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Arial Unicode MS'] mpl.rcParams['axes.unicode_minus'] = False # 关键修复:使用 ASCII 减号 # 设置中文字体支持 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False class PINNModel(tf.keras.Model): def __init__(self, num_layers=4, hidden_units=32, dropout_rate=0.1, **kwargs): super(PINNModel, self).__init__(**kwargs) self.dense_layers = [] self.dropout_layers = [] # 创建隐藏层和对应的Dropout层 for _ in range(num_layers): self.dense_layers.append(Dense(hidden_units, activation='tanh')) self.dropout_layers.append(tf.keras.layers.Dropout(dropout_rate)) self.final_layer = Dense(1, activation='linear') # 添加更多带约束的物理参数 # 基本衰减系数 self.k1_raw = tf.Variable(0.1, trainable=True, dtype=tf.float32, name='k1_raw') self.k1 = tf.math.sigmoid(self.k1_raw) * 0.5 # 约束在0-0.5之间 # 水位依赖的衰减系数 self.k2_raw = tf.Variable(0.01, trainable=True, dtype=tf.float32, name='k2_raw') self.k2 = tf.math.sigmoid(self.k2_raw) * 0.1 # 约束在0-0.1之间 # 非线性项系数 self.alpha_raw = tf.Variable(0.1, trainable=True, dtype=tf.float32, name='alpha_raw') self.alpha = tf.math.sigmoid(self.alpha_raw) * 1.0 # 约束在0-1.0之间 # 外部影响系数(如降雨、温度等) self.beta_raw = tf.Variable(0.05, trainable=True, dtype=tf.float32, name='beta_raw') self.beta = tf.math.sigmoid(self.beta_raw) * 0.2 # 约束在0-0.2之间 def call(self, inputs, training=False): # 输入特征重构 year, month_sin, month_cos, day_sin, day_cos, h, dt_norm, log_dt_norm = inputs # 时间特征组合 time_features = tf.concat([ year, month_sin, month_cos, day_sin, day_cos, dt_norm, log_dt_norm, h * dt_norm, h * log_dt_norm ], axis=1) # 将时间、水位和时间步长作为输入特征 x = tf.concat([t, h, dt, interaction], axis=1) # 依次通过所有隐藏层和Dropout层 for dense_layer, dropout_layer in zip(self.dense_layers, self.dropout_layers): x = dense_layer(x) x = dropout_layer(x, training=training) # 仅在训练时应用Dropout return self.final_layer(x) def physics_loss(self, h_current_raw, dt_raw, training=False): """在原始空间计算物理损失""" # 获取物理参数 k1 = tf.math.sigmoid(self.k1_raw) * 0.5 k2 = tf.math.sigmoid(self.k2_raw) * 0.1 alpha = tf.math.sigmoid(self.alpha_raw) * 1.0 beta = tf.math.sigmoid(self.beta_raw) * 0.2 # 物理方程计算 exponent = - (k1 + k2 * h_current_raw) * dt_raw exponent = tf.clip_by_value(exponent, -50.0, 50.0) decay_term = h_current_raw * tf.exp(exponent) beta_exp = -beta * dt_raw beta_exp = tf.clip_by_value(beta_exp, -50.0, 50.0) external_term = alpha * (1 - tf.exp(beta_exp)) # 预测值(反归一化) h_next_pred = self.scaler_h.inverse_transform( self.model_output ) residual = h_next_pred - (decay_term + external_term) return tf.reduce_mean(tf.square(residual)) class DamSeepageModel: def __init__(self, root): self.root = root self.root.title("大坝渗流预测模型(PINNs)") self.root.geometry("1200x800") # 初始化数据 self.train_df = None # 训练集 self.test_df = None # 测试集 self.model = None self.scaler_t = MinMaxScaler(feature_range=(0, 1)) self.scaler_h = MinMaxScaler(feature_range=(0, 1)) self.scaler_dt = MinMaxScaler(feature_range=(0, 1)) # 新增归一化器 self.scaler_year = MinMaxScaler(feature_range=(0, 1)) self.scaler_month = MinMaxScaler(feature_range=(0, 1)) self.scaler_day = MinMaxScaler(feature_range=(0, 1)) self.scaler_dt = MinMaxScaler(feature_range=(0, 1)) self.scaler_log_dt = MinMaxScaler(feature_range=(0, 1)) self.evaluation_metrics = {} # 创建主界面 self.create_widgets() def create_widgets(self): # 创建主框架 main_frame = ttk.Frame(self.root, padding=10) main_frame.pack(fill=tk.BOTH, expand=True) # 左侧控制面板 control_frame = ttk.LabelFrame(main_frame, text="模型控制", padding=10) control_frame.pack(side=tk.LEFT, fill=tk.Y, padx=5, pady=5) # 文件选择部分 file_frame = ttk.LabelFrame(control_frame, text="数据文件", padding=10) file_frame.pack(fill=tk.X, pady=5) # 训练集选择 ttk.Label(file_frame, text="训练集:").grid(row=0, column=0, sticky=tk.W, pady=5) self.train_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.train_file_var, width=30, state='readonly').grid( row=0, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("train")).grid(row=0, column=2) # 测试集选择 ttk.Label(file_frame, text="测试集:").grid(row=1, column=0, sticky=tk.W, pady=5) self.test_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.test_file_var, width=30, state='readonly').grid(row=1, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("test")).grid(row=1, column=2) # PINNs参数设置 param_frame = ttk.LabelFrame(control_frame, text="PINNs参数", padding=10) param_frame.pack(fill=tk.X, pady=10) # 验证集切分比例 ttk.Label(param_frame, text="验证集比例:").grid(row=0, column=0, sticky=tk.W, pady=5) self.split_ratio_var = tk.DoubleVar(value=0.2) ttk.Spinbox(param_frame, from_=0, to=1, increment=0.05, textvariable=self.split_ratio_var, width=10).grid(row=0, column=1, padx=5) # 隐藏层数量 ttk.Label(param_frame, text="网络层数:").grid(row=1, column=0, sticky=tk.W, pady=5) self.num_layers_var = tk.IntVar(value=4) ttk.Spinbox(param_frame, from_=2, to=8, increment=1, textvariable=self.num_layers_var, width=10).grid(row=1, column=1, padx=5) # 每层神经元数量 ttk.Label(param_frame, text="神经元数/层:").grid(row=2, column=0, sticky=tk.W, pady=5) self.hidden_units_var = tk.IntVar(value=32) ttk.Spinbox(param_frame, from_=16, to=128, increment=4, textvariable=self.hidden_units_var, width=10).grid(row=2, column=1, padx=5) # 训练轮次 ttk.Label(param_frame, text="训练轮次:").grid(row=3, column=0, sticky=tk.W, pady=5) self.epochs_var = tk.IntVar(value=500) ttk.Spinbox(param_frame, from_=100, to=2000, increment=100, textvariable=self.epochs_var, width=10).grid(row=3, column=1, padx=5) # 物理损失权重 ttk.Label(param_frame, text="物理损失权重:").grid(row=4, column=0, sticky=tk.W, pady=5) self.physics_weight_var = tk.DoubleVar(value=0.5) ttk.Spinbox(param_frame, from_=0.1, to=1.0, increment=0.1, textvariable=self.physics_weight_var, width=10).grid(row=4, column=1, padx=5) # 控制按钮 btn_frame = ttk.Frame(control_frame) btn_frame.pack(fill=tk.X, pady=10) ttk.Button(btn_frame, text="训练模型", command=self.train_model).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="预测结果", command=self.predict).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="保存结果", command=self.save_results).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="重置", command=self.reset).pack(side=tk.RIGHT, padx=5) # 状态栏 self.status_var = tk.StringVar(value="就绪") status_bar = ttk.Label(control_frame, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W) status_bar.pack(fill=tk.X, side=tk.BOTTOM) # 右侧结果显示区域 result_frame = ttk.Frame(main_frame) result_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=5, pady=5) # 创建标签页 self.notebook = ttk.Notebook(result_frame) self.notebook.pack(fill=tk.BOTH, expand=True) # 损失曲线标签页 self.loss_frame = ttk.Frame(self.notebook) self.notebook.add(self.loss_frame, text="训练损失") # 在预测结果标签页 self.prediction_frame = ttk.Frame(self.notebook) self.notebook.add(self.prediction_frame, text="预测结果") # 指标显示 self.metrics_var = tk.StringVar() metrics_label = ttk.Label( self.prediction_frame, textvariable=self.metrics_var, font=('TkDefaultFont', 10, 'bold'), relief='ridge', padding=5 ) metrics_label.pack(fill=tk.X, padx=5, pady=5) # 创建图表容器Frame chart_frame = ttk.Frame(self.prediction_frame) chart_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # 初始化绘图区域 self.fig, self.ax = plt.subplots(figsize=(10, 6)) self.canvas = FigureCanvasTkAgg(self.fig, master=chart_frame) self.canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=True) # 添加Matplotlib工具栏(缩放、平移等) from matplotlib.backends.backend_tkagg import NavigationToolbar2Tk self.toolbar = NavigationToolbar2Tk(self.canvas, chart_frame) self.toolbar.update() self.canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=True) # 损失曲线画布(同样添加工具栏) loss_chart_frame = ttk.Frame(self.loss_frame) loss_chart_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) self.loss_fig, self.loss_ax = plt.subplots(figsize=(10, 4)) self.loss_canvas = FigureCanvasTkAgg(self.loss_fig, master=loss_chart_frame) self.loss_canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=True) self.loss_toolbar = NavigationToolbar2Tk(self.loss_canvas, loss_chart_frame) self.loss_toolbar.update() self.loss_canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=True) def preprocess_data(self, df, is_training=False): """增强的时间特征处理""" # 创建时间戳 if 'datetime' not in df.columns: time_cols = ['year', 'month', 'day'] # 填充缺失的时间单位 for col in ['hour', 'minute', 'second']: if col not in df.columns: df[col] = 0 df['datetime'] = pd.to_datetime(df[time_cols]) df = df.set_index('datetime') # 计算时间步长(单位:天) if 'dt' not in df.columns: df['dt'] = df.index.to_series().diff().dt.total_seconds() / 86400 df['dt'] = df['dt'].fillna(df['dt'].mean()) # 处理异常时间步长 dt_mean = df['dt'].mean() df.loc[df['dt'] <= 0, 'dt'] = dt_mean df.loc[df['dt'] > 30, 'dt'] = dt_mean # 处理过大的时间间隔 # 添加对数变换的时间步长特征 df['log_dt'] = np.log1p(df['dt']) # 周期性时间特征 df['year_norm'] = df['year'] df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12) df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12) df['day_sin'] = np.sin(2 * np.pi * df['day'] / 31) df['day_cos'] = np.cos(2 * np.pi * df['day'] / 31) # 归一化处理 if is_training: self.scaler_year.fit(df[['year_norm']]) self.scaler_month.fit(df[['month_sin', 'month_cos']]) self.scaler_day.fit(df[['day_sin', 'day_cos']]) self.scaler_dt.fit(df[['dt']]) self.scaler_log_dt.fit(df[['log_dt']]) self.scaler_h.fit(df[['水位']]) # 水位归一化器 # 应用归一化 df[['year_norm']] = self.scaler_year.transform(df[['year_norm']]) df[['month_sin', 'month_cos']] = self.scaler_month.transform(df[['month_sin', 'month_cos']]) df[['day_sin', 'day_cos']] = self.scaler_day.transform(df[['day_sin', 'day_cos']]) df[['dt_norm']] = self.scaler_dt.transform(df[['dt']]) df[['log_dt_norm']] = self.scaler_log_dt.transform(df[['log_dt']]) df[['水位_norm']] = self.scaler_h.transform(df[['水位']]) # 归一化水位 return df def select_file(self, file_type): """选择Excel文件并应用预处理""" try: file_path = filedialog.askopenfilename(...) if not file_path: return df = pd.read_excel(file_path) # 验证必需列 required_cols = ['year', 'month', 'day', '水位'] missing_cols = [col for col in required_cols if col not in df.columns] if missing_cols: messagebox.showerror("列名错误", f"缺少必需列: {', '.join(missing_cols)}") return # 应用预处理 is_training = (file_type == "train") df = self.preprocess_data(df, is_training=is_training) # 保存数据 if file_type == "train": self.train_df = df self.train_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载训练集: {len(self.train_df)}条数据") else: self.test_df = df self.test_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载测试集: {len(self.test_df)}条数据") except Exception as e: error_msg = f"文件读取失败: {str(e)}\n\n请确保:\n1. 文件不是打开状态\n2. 文件格式正确\n3. 包含必需的时间和水位列" messagebox.showerror("文件错误", error_msg) def calculate_metrics(self, y_true, y_pred): """计算评估指标""" from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score mse = mean_squared_error(y_true, y_pred) rmse = np.sqrt(mse) mae = mean_absolute_error(y_true, y_pred) non_zero_idx = np.where(y_true != 0)[0] if len(non_zero_idx) > 0: mape = np.mean(np.abs((y_true[non_zero_idx] - y_pred[non_zero_idx]) / y_true[non_zero_idx])) * 100 else: mape = float('nan') r2 = r2_score(y_true, y_pred) return { 'MSE': mse, 'RMSE': rmse, 'MAE': mae, 'MAPE': mape, # 修正键名 'R2': r2 } def train_model(self): """训练PINNs模型(带早停机制+训练指标监控)""" if self.train_df is None: messagebox.showwarning("警告", "请先选择训练集文件") return try: self.status_var.set("正在预处理数据...") self.root.update() # 从训练集中切分训练子集和验证子集(时间顺序切分) split_ratio = 1 - self.split_ratio_var.get() split_idx = int(len(self.train_df) * split_ratio) train_subset = self.train_df.iloc[:split_idx] valid_subset = self.train_df.iloc[split_idx:] # 检查数据量是否足够 if len(train_subset) < 2 or len(valid_subset) < 2: messagebox.showerror("数据错误", "训练集数据量不足(至少需要2个时间步)") return # 数据预处理 - 分别归一化不同特征 # 归一化时间特征 t_train = train_subset['days'].values[1:].reshape(-1, 1) self.scaler_t.fit(t_train) t_train_scaled = self.scaler_t.transform(t_train).astype(np.float32) # 归一化水位特征 h_train = train_subset['水位'].values[:-1].reshape(-1, 1) self.scaler_h.fit(h_train) h_train_scaled = self.scaler_h.transform(h_train).astype(np.float32) # 归一化时间步长特征 dt_train = train_subset['dt'].values[1:].reshape(-1, 1) self.scaler_dt.fit(dt_train) dt_train_scaled = self.scaler_dt.transform(dt_train).astype(np.float32) # 归一化标签(下一时刻水位) h_next_train = train_subset['水位'].values[1:].reshape(-1, 1) h_next_train_scaled = self.scaler_h.transform(h_next_train).astype(np.float32) # 准备验证数据(同样进行归一化) t_valid = valid_subset['days'].values[1:].reshape(-1, 1) t_valid_scaled = self.scaler_t.transform(t_valid).astype(np.float32) h_valid = valid_subset['水位'].values[:-1].reshape(-1, 1) h_valid_scaled = self.scaler_h.transform(h_valid).astype(np.float32) dt_valid = valid_subset['dt'].values[1:].reshape(-1, 1) dt_valid_scaled = self.scaler_dt.transform(dt_valid).astype(np.float32) h_next_valid_scaled = self.scaler_h.transform( valid_subset['水位'].values[1:].reshape(-1, 1) ).astype(np.float32) # 原始值用于指标计算 h_next_train_true = h_next_train h_next_valid_true = valid_subset['水位'].values[1:].reshape(-1, 1) # 创建模型和优化器 self.model = PINNModel( num_layers=self.num_layers_var.get(), hidden_units=self.hidden_units_var.get() ) # 创建动态学习率调度器 initial_lr = 0.001 lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay( initial_learning_rate=initial_lr, decay_steps=100, # 每100步衰减一次 decay_rate=0.95, # 衰减率 staircase=True # 阶梯式衰减 ) optimizer = Adam(learning_rate=lr_schedule) # 在训练循环中,使用归一化后的数据 train_dataset = tf.data.Dataset.from_tensor_slices( ((t_train_scaled, h_train_scaled, dt_train_scaled), h_next_train_scaled) ) train_dataset = train_dataset.shuffle(buffer_size=1024).batch(32) valid_dataset = tf.data.Dataset.from_tensor_slices( ((t_valid_scaled, h_valid_scaled, dt_valid_scaled), h_next_valid_scaled) ) valid_dataset = valid_dataset.batch(32) # 初始化训练历史记录列表 train_data_loss_history = [] physics_loss_history = [] valid_data_loss_history = [] train_metrics_history = [] valid_metrics_history = [] # 早停机制参数 patience = int(self.epochs_var.get() / 3) min_delta = 1e-4 best_valid_loss = float('inf') wait = 0 best_epoch = 0 best_weights = None start_time = time.time() # 自定义训练循环 for epoch in range(self.epochs_var.get()): # 获取当前学习率 - 修复这里 current_lr = optimizer.learning_rate.numpy() # 直接访问属性而不是调用 # 训练阶段 epoch_train_data_loss = [] epoch_physics_loss = [] # 收集训练预测值(归一化后) train_pred_scaled = [] for step, ((t_batch, h_batch, dt_batch), h_next_batch) in enumerate(train_dataset): with tf.GradientTape() as tape: # 预测下一时刻水位 h_pred = self.model([t_batch, h_batch, dt_batch], training=True) data_loss = tf.reduce_mean(tf.square(h_next_batch - h_pred)) # 动态调整物理损失权重 physics_weight = self.physics_weight_var.get() * (1 - self.epoch_var.get()* 0.5) # 计算物理损失(传入时间步长dt) physics_loss = self.model.physics_loss(h_current_raw, dt_raw, training=True) loss = data_loss + physics_weight * physics_loss grads = tape.gradient(loss, self.model.trainable_variables) optimizer.apply_gradients(zip(grads, self.model.trainable_variables)) epoch_train_data_loss.append(data_loss.numpy()) epoch_physics_loss.append(physics_loss.numpy()) train_pred_scaled.append(h_pred.numpy()) # 保存训练预测值(归一化) # 合并训练预测值(归一化后) train_pred_scaled = np.concatenate(train_pred_scaled, axis=0) # 反归一化得到原始预测值 train_pred_true = self.scaler_h.inverse_transform(train_pred_scaled) # 计算训练集指标(使用原始真实值和预测值) train_metrics = self.calculate_metrics( y_true=h_next_train_true.flatten(), y_pred=train_pred_true.flatten() ) train_metrics_history.append(train_metrics) # 验证阶段 epoch_valid_data_loss = [] valid_pred_scaled = [] for ((t_v_batch, h_v_batch, dt_v_batch), h_v_next_batch) in valid_dataset: h_v_pred = self.model([t_v_batch, h_v_batch, dt_v_batch], training=False) # 验证时不启用Dropout valid_data_loss = tf.reduce_mean(tf.square(h_v_next_batch - h_v_pred)) epoch_valid_data_loss.append(valid_data_loss.numpy()) valid_pred_scaled.append(h_v_pred.numpy()) # 保存验证预测值(归一化) # 合并验证预测值(归一化后) valid_pred_scaled = np.concatenate(valid_pred_scaled, axis=0) # 反归一化得到原始预测值 valid_pred_true = self.scaler_h.inverse_transform(valid_pred_scaled) # 计算验证集指标(使用原始真实值和预测值) valid_metrics = self.calculate_metrics( y_true=h_next_valid_true.flatten(), y_pred=valid_pred_true.flatten() ) valid_metrics_history.append(valid_metrics) # 计算平均损失 avg_train_data_loss = np.mean(epoch_train_data_loss) avg_physics_loss = np.mean(epoch_physics_loss) avg_valid_data_loss = np.mean(epoch_valid_data_loss) # 记录损失 train_data_loss_history.append(avg_train_data_loss) physics_loss_history.append(avg_physics_loss) valid_data_loss_history.append(avg_valid_data_loss) # 早停机制逻辑 current_valid_loss = avg_valid_data_loss # 早停机制逻辑 current_valid_loss = avg_valid_data_loss if current_valid_loss < best_valid_loss - min_delta: best_valid_loss = current_valid_loss best_epoch = epoch + 1 wait = 0 best_weights = self.model.get_weights() else: wait += 1 if wait >= patience: self.status_var.set(f"触发早停!最佳轮次: {best_epoch},最佳验证损失: {best_valid_loss:.4f}") if best_weights is not None: self.model.set_weights(best_weights) break # 确保在此处退出循环 # 更新状态(添加当前学习率显示) if epoch % 1 == 0: # 提取当前训练/验证的关键指标 train_rmse = train_metrics['RMSE'] valid_rmse = valid_metrics['RMSE'] train_r2 = train_metrics['R2'] valid_r2 = valid_metrics['R2'] elapsed = time.time() - start_time self.status_var.set( f"训练中 | 轮次: {epoch + 1}/{self.epochs_var.get()} | " f"学习率: {current_lr:.6f} | " f"训练RMSE: {train_rmse:.4f} | 验证RMSE: {valid_rmse:.4f} | " f"训练R²: {train_r2:.4f} | 验证R²: {valid_r2:.4f} | " f"k1: {self.model.k1.numpy():.6f}, k2: {self.model.k2.numpy():.6f} | 时间: {elapsed:.1f}秒 | 早停等待: {wait}/{patience}" ) self.root.update() # 绘制损失曲线 self.loss_ax.clear() epochs_range = range(1, len(train_data_loss_history) + 1) self.loss_ax.plot(epochs_range, train_data_loss_history, 'b-', label='训练数据损失') self.loss_ax.plot(epochs_range, physics_loss_history, 'r--', label='物理损失') self.loss_ax.plot(epochs_range, valid_data_loss_history, 'g-.', label='验证数据损失') self.loss_ax.set_title('PINNs训练与验证损失') self.loss_ax.set_xlabel('轮次') self.loss_ax.set_ylabel('损失', rotation=0) self.loss_ax.legend() self.loss_ax.grid(True, alpha=0.3) self.loss_ax.set_yscale('log') self.loss_canvas.draw() # 训练完成提示 elapsed = time.time() - start_time if wait >= patience: completion_msg = ( f"早停触发 | 最佳轮次: {best_epoch} | 最佳验证损失: {best_valid_loss:.4f} | " f"最佳验证RMSE: {valid_metrics_history[best_epoch - 1]['RMSE']:.4f} | " f"总时间: {elapsed:.1f}秒" ) else: completion_msg = ( f"训练完成 | 总轮次: {self.epochs_var.get()} | " f"最终训练RMSE: {train_metrics_history[-1]['RMSE']:.4f} | " f"最终验证RMSE: {valid_metrics_history[-1]['RMSE']:.4f} | " f"最终训练R²: {train_metrics_history[-1]['R2']:.4f} | " f"最终验证R²: {valid_metrics_history[-1]['R2']:.4f} | " f"总时间: {elapsed:.1f}秒" ) # 保存训练历史 self.train_history = { 'train_data_loss': train_data_loss_history, 'physics_loss': physics_loss_history, 'valid_data_loss': valid_data_loss_history, 'train_metrics': train_metrics_history, 'valid_metrics': valid_metrics_history } # 保存学习到的物理参数 self.learned_params = { "k1": self.model.k1.numpy(), "k2": self.model.k2.numpy(), "alpha": self.model.alpha.numpy(), "beta": self.model.beta.numpy() } self.status_var.set(completion_msg) messagebox.showinfo("训练完成", f"PINNs模型训练成功完成!\n{completion_msg}") except Exception as e: messagebox.showerror("训练错误", f"模型训练失败:\n{str(e)}") self.status_var.set("训练失败") def predict(self): """使用PINNs模型进行递归预测(带Teacher Forcing和蒙特卡洛Dropout)""" if self.model is None: messagebox.showwarning("警告", "请先训练模型") return if self.test_df is None: messagebox.showwarning("警告", "请先选择测试集文件") return try: self.status_var.set("正在生成预测(使用Teacher Forcing和MC Dropout)...") self.root.update() # 预处理测试数据 - 归一化 t_test = self.test_df['days'].values.reshape(-1, 1) t_test_scaled = self.scaler_t.transform(t_test).astype(np.float32) dt_test = self.test_df['dt'].values.reshape(-1, 1) dt_test_scaled = self.scaler_dt.transform(dt_test).astype(np.float32) h_test = self.test_df['水位'].values.reshape(-1, 1) h_test_scaled = self.scaler_h.transform(h_test).astype(np.float32) # 改进的递归预测参数 n = len(t_test) mc_iterations = 100 adaptive_forcing = True # 启用自适应教师强制 # 存储蒙特卡洛采样结果 mc_predictions_scaled = np.zeros((mc_iterations, n, 1), dtype=np.float32) # 进行多次蒙特卡洛采样 for mc_iter in range(mc_iterations): predicted_scaled = np.zeros((n, 1), dtype=np.float32) predicted_scaled[0] = h_test_scaled[0] # 第一个点使用真实值 # 递归预测(带自适应教师强制) for i in range(1, n): # 自适应教师强制:后期阶段增加真实值使用频率 if adaptive_forcing: # 前期70%概率使用真实值,后期提高到90% teacher_forcing_prob = 0.7 + 0.2 * min(1.0, i / (0.7 * n)) else: teacher_forcing_prob = 0.7 # 决定使用真实值还是预测值 use_actual = np.random.rand() < teacher_forcing_prob if use_actual and i < n - 1: # 不能使用未来值 h_prev = h_test_scaled[i - 1:i] else: h_prev = predicted_scaled[i - 1:i] t_prev = t_test_scaled[i - 1:i] dt_i = dt_test_scaled[i:i + 1] # 物理约束增强:添加物理模型预测作为参考 h_pred = self.model([t_prev, h_prev, dt_i], training=True) # 物理模型预测值(用于约束) k1 = self.learned_params['k1'] k2 = self.learned_params['k2'] alpha = self.learned_params['alpha'] beta = self.learned_params['beta'] # 物理方程预测 exponent = - (k1 + k2 * h_prev) * dt_i decay_term = h_prev * np.exp(exponent) external_term = alpha * (1 - np.exp(-beta * dt_i)) physics_pred = decay_term + external_term # 混合预测:神经网络预测与物理模型预测加权平均 physics_weight = 0.3 # 物理模型权重 final_pred = physics_weight * physics_pred + (1 - physics_weight) * h_pred.numpy() predicted_scaled[i] = final_pred[0][0] mc_predictions_scaled[mc_iter] = predicted_scaled # 计算预测统计量 mean_pred_scaled = np.mean(mc_predictions_scaled, axis=0) std_pred_scaled = np.std(mc_predictions_scaled, axis=0) # 反归一化结果 predictions = self.scaler_h.inverse_transform(mean_pred_scaled) uncertainty = self.scaler_h.inverse_transform(std_pred_scaled) * 1.96 # 95%置信区间 actual_values = h_test test_time = self.test_df.index # 清除现有图表 self.ax.clear() # 计算合理的y轴范围 - 基于数据集中区域 # 获取实际值和预测值的中位数 median_val = np.median(actual_values) # 计算数据的波动范围(标准差) data_range = np.std(actual_values) * 4 # 4倍标准差覆盖大部分数据 # 设置y轴范围为中心值±数据波动范围 y_center = median_val y_half_range = max(data_range, 10) # 确保最小范围为20个单位 y_min_adjusted = y_center - y_half_range y_max_adjusted = y_center + y_half_range # 确保范围不为零 if y_max_adjusted - y_min_adjusted < 1: y_min_adjusted -= 5 y_max_adjusted += 5 # 绘制结果(带置信区间) self.ax.plot(test_time, actual_values, 'b-', label='真实值', linewidth=2) self.ax.plot(test_time, predictions, 'r--', label='预测均值', linewidth=2) self.ax.fill_between( test_time, (predictions - uncertainty).flatten(), (predictions + uncertainty).flatten(), color='orange', alpha=0.3, label='95%置信区间' ) # 设置自动调整的y轴范围 self.ax.set_ylim(y_min_adjusted, y_max_adjusted) self.ax.set_title('大坝渗流水位预测(PINNs with MC Dropout)') self.ax.set_xlabel('时间') self.ax.set_ylabel('测压管水位', rotation=0) self.ax.legend(loc='best') # 自动选择最佳位置 # 优化时间轴刻度 self.ax.xaxis.set_major_locator(mdates.YearLocator()) self.ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y')) self.ax.xaxis.set_minor_locator(mdates.MonthLocator(interval=2)) self.ax.grid(which='minor', axis='x', linestyle=':', color='gray', alpha=0.3) self.ax.grid(which='major', axis='y', linestyle='-', color='lightgray', alpha=0.5) self.ax.tick_params(axis='x', which='major', rotation=0, labelsize=9) self.ax.tick_params(axis='x', which='minor', length=2) # 计算评估指标(排除第一个点) eval_actual = actual_values[1:].flatten() eval_pred = predictions[1:].flatten() self.evaluation_metrics = self.calculate_metrics(eval_actual, eval_pred) # 添加不确定性指标 avg_uncertainty = np.mean(uncertainty) max_uncertainty = np.max(uncertainty) self.evaluation_metrics['Avg Uncertainty'] = avg_uncertainty self.evaluation_metrics['Max Uncertainty'] = max_uncertainty metrics_text = ( f"MSE: {self.evaluation_metrics['MSE']:.4f} | " f"RMSE: {self.evaluation_metrics['RMSE']:.4f} | " f"MAE: {self.evaluation_metrics['MAE']:.4f} | " f"MAPE: {self.evaluation_metrics['MAPE']:.2f}% | " f"R²: {self.evaluation_metrics['R2']:.4f}\n" f"平均不确定性: {avg_uncertainty:.4f} | 最大不确定性: {max_uncertainty:.4f}" ) self.metrics_var.set(metrics_text) # 在图表上添加指标 self.ax.text( 0.5, 1.05, metrics_text, transform=self.ax.transAxes, ha='center', fontsize=8, bbox=dict(facecolor='white', alpha=0.8) ) params_text = ( f"物理参数: k1={self.learned_params['k1']:.4f}, " f"k2={self.learned_params['k2']:.4f}, " f"alpha={self.learned_params['alpha']:.4f}, " f"beta={self.learned_params['beta']:.4f} | " f"Teacher Forcing概率: {teacher_forcing_prob}" ) self.ax.text( 0.5, 1.12, params_text, transform=self.ax.transAxes, ha='center', fontsize=8, bbox=dict(facecolor='white', alpha=0.8) ) # 调整布局 plt.tight_layout(pad=2.0) # 更新画布 self.canvas.draw() # 保存预测结果 self.predictions = predictions self.uncertainty = uncertainty self.actual_values = actual_values self.test_time = test_time self.mc_predictions = mc_predictions_scaled self.status_var.set(f"预测完成(MC Dropout采样{mc_iterations}次)") except Exception as e: messagebox.showerror("预测错误", f"预测失败:\n{str(e)}") self.status_var.set("预测失败") import traceback traceback.print_exc() def save_results(self): """保存预测结果和训练历史数据""" if not hasattr(self, 'predictions') or not hasattr(self, 'train_history'): messagebox.showwarning("警告", "请先生成预测结果并完成训练") return # 选择保存路径 save_path = filedialog.asksaveasfilename( defaultextension=".xlsx", filetypes=[("Excel文件", "*.xlsx"), ("所有文件", "*.*")], title="保存结果" ) if not save_path: return try: # 1. 创建预测结果DataFrame result_df = pd.DataFrame({ '时间': self.test_time, '实际水位': self.actual_values.flatten(), '预测水位': self.predictions.flatten() }) # 2. 创建评估指标DataFrame metrics_df = pd.DataFrame([self.evaluation_metrics]) # 3. 创建训练历史DataFrame history_data = { '轮次': list(range(1, len(self.train_history['train_data_loss']) + 1)), '训练数据损失': self.train_history['train_data_loss'], '物理损失': self.train_history['physics_loss'], '验证数据损失': self.train_history['valid_data_loss'] } # 添加训练集指标 for metric in ['MSE', 'RMSE', 'MAE', 'MAPE', 'R2']: history_data[f'训练集_{metric}'] = [item[metric] for item in self.train_history['train_metrics']] # 添加验证集指标 for metric in ['MSE', 'RMSE', 'MAE', 'MAPE', 'R2']: history_data[f'验证集_{metric}'] = [item[metric] for item in self.train_history['valid_metrics']] history_df = pd.DataFrame(history_data) # 保存到Excel with pd.ExcelWriter(save_path) as writer: result_df.to_excel(writer, sheet_name='预测结果', index=False) metrics_df.to_excel(writer, sheet_name='评估指标', index=False) history_df.to_excel(writer, sheet_name='训练历史', index=False) # 保存图表 chart_path = os.path.splitext(save_path)[0] + "_chart.png" self.fig.savefig(chart_path, dpi=300) # 保存损失曲线图 loss_path = os.path.splitext(save_path)[0] + "_loss.png" self.loss_fig.savefig(loss_path, dpi=300) self.status_var.set(f"结果已保存至: {os.path.basename(save_path)}") messagebox.showinfo("保存成功", f"预测结果和图表已保存至:\n" f"主文件: {save_path}\n" f"预测图表: {chart_path}\n" f"损失曲线: {loss_path}") except Exception as e: messagebox.showerror("保存错误", f"保存结果失败:\n{str(e)}") def reset(self): # 重置归一化器 self.scaler_t = MinMaxScaler(feature_range=(0, 1)) self.scaler_h = MinMaxScaler(feature_range=(0, 1)) self.scaler_dt = MinMaxScaler(feature_range=(0, 1)) """重置程序状态""" self.train_df = None self.test_df = None self.model = None self.train_file_var.set("") self.test_file_var.set("") # 清除训练历史 if hasattr(self, 'train_history'): del self.train_history # 清除图表 if hasattr(self, 'ax'): self.ax.clear() if hasattr(self, 'loss_ax'): self.loss_ax.clear() # 重绘画布 if hasattr(self, 'canvas'): self.canvas.draw() if hasattr(self, 'loss_canvas'): self.loss_canvas.draw() # 清除状态 self.status_var.set("已重置,请选择新数据") # 清除预测结果 if hasattr(self, 'predictions'): del self.predictions # 清除指标文本 if hasattr(self, 'metrics_var'): self.metrics_var.set("") messagebox.showinfo("重置", "程序已重置,可以开始新的分析") if __name__ == "__main__": root = tk.Tk() app = DamSeepageModel(root) root.mainloop() 检查错误并纠正

import tkinter as tk from tkinter import ttk, filedialog, messagebox import pandas as pd import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import tensorflow as tf from tensorflow.keras.models import Model from tensorflow.keras.layers import Input, Dense, Lambda from tensorflow.keras.optimizers import Adam from sklearn.preprocessing import MinMaxScaler import os import time import warnings import matplotlib.dates as mdates warnings.filterwarnings('ignore', category=UserWarning, module='tensorflow') mpl.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Arial Unicode MS'] mpl.rcParams['axes.unicode_minus'] = False # 关键修复:使用 ASCII 减号 # 设置中文字体支持 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False class PINNModel(tf.keras.Model): def __init__(self, num_layers=4, hidden_units=32, dropout_rate=0.1, l2_reg=0.001, **kwargs): super(PINNModel, self).__init__(**kwargs) # 增加输入维度处理能力 self.dense_layers = [] self.bn_layers = [] # 批量归一化层 self.dropout_layers = [] # 添加L2正则化 self.l2_reg = l2_reg # 创建更深的网络结构 for i in range(num_layers): # 第一层使用更大的维度 units = hidden_units * 2 if i == 0 else hidden_units self.dense_layers.append( Dense(units, activation='swish', # 使用Swish激活函数 kernel_regularizer=tf.keras.regularizers.l2(l2_reg)) ) self.bn_layers.append(tf.keras.layers.BatchNormalization()) self.dropout_layers.append(tf.keras.layers.Dropout(dropout_rate)) # 添加跳跃连接层 self.residual_layer = Dense(hidden_units, activation='linear') # 最终输出层 self.final_layer = Dense(1, activation='linear', kernel_regularizer=tf.keras.regularizers.l2(l2_reg)) # 物理参数优化 - 使用更灵活的参数化方法 # 基本衰减系数 (使用指数确保正值) self.k1_log = tf.Variable(tf.math.log(0.1), trainable=True, dtype=tf.float32, name='k1_log') # 水位依赖的衰减系数 (使用softplus确保正值) self.k2_raw = tf.Variable(0.01, trainable=True, dtype=tf.float32, name='k2_raw') # 非线性项系数 (使用sigmoid约束在[0,1]) self.alpha_raw = tf.Variable(0.1, trainable=True, dtype=tf.float32, name='alpha_raw') # 外部影响系数 (使用tanh约束在[-0.2,0.2]) self.beta_raw = tf.Variable(0.05, trainable=True, dtype=tf.float32, name='beta_raw') @property def k1(self): """指数变换确保正值""" return tf.exp(self.k1_log) @property def k2(self): """Softplus变换确保正值""" return tf.math.softplus(self.k2_raw) * 0.1 # 约束在0-0.1之间 @property def alpha(self): """Sigmoid约束在[0,1]""" return tf.math.sigmoid(self.alpha_raw) @property def beta(self): """Tanh约束在[-0.2,0.2]""" return tf.math.tanh(self.beta_raw) * 0.2 def call(self, inputs, training=False): # 解包输入 t, h, dt, lag1, lag3, lag7, month_feats, season_feats = inputs # 组合所有特征 x = tf.concat([ t, h, dt, lag1, lag3, lag7, month_feats, season_feats, t * h, h * dt, t * dt, (t * h * dt), h * lag1, h * lag3, h * lag7, dt * lag1, dt * lag3, dt * lag7 ], axis=1) # 初始投影层 x_proj = Dense(64, activation='swish')(x) # 残差块 residual = self.residual_layer(x_proj) # 通过所有隐藏层(带批量归一化和dropout) for i, (dense_layer, bn_layer, dropout_layer) in enumerate(zip( self.dense_layers, self.bn_layers, self.dropout_layers)): # 第一层使用残差连接 if i == 0: x = dense_layer(x_proj) x = bn_layer(x, training=training) x = dropout_layer(x, training=training) x = x + residual # 残差连接 else: x = dense_layer(x) x = bn_layer(x, training=training) x = dropout_layer(x, training=training) # 注意力机制 - 增强重要特征 attention = Dense(x.shape[-1], activation='sigmoid')(x) x = x * attention return self.final_layer(x) def physics_loss(self, t, h_current, dt, training=False): """改进的物理损失计算""" # 创建零值占位符 batch_size = tf.shape(t)[0] zeros = tf.zeros((batch_size, 1), dtype=tf.float32) zeros2 = tf.zeros((batch_size, 2), dtype=tf.float32) # 预测下一时刻水位 h_next_pred = self([t, h_current, dt, zeros, zeros, zeros, zeros2, zeros2], training=training) # 物理方程计算 - 使用改进的公式 # 非线性衰减项 decay_factor = self.k1 + self.k2 * h_current exponent = -decay_factor * dt exponent = tf.clip_by_value(exponent, -50.0, 50.0) decay_term = h_current * tf.exp(exponent) # 外部影响项(考虑时间依赖性) external_factor = self.alpha * self.beta * dt external_factor = tf.clip_by_value(external_factor, -10.0, 10.0) external_term = self.alpha * (1 - tf.exp(-external_factor)) # 残差计算 residual = h_next_pred - (decay_term + external_term) # 添加物理参数正则化(鼓励简单物理模型) param_reg = 0.01 * (tf.abs(self.k1) + tf.abs(self.k2) + tf.abs(self.alpha) + tf.abs(self.beta)) return tf.reduce_mean(tf.square(residual)) + param_reg class DamSeepageModel: def __init__(self, root): self.root = root self.root.title("大坝渗流预测模型(PINNs)") self.root.geometry("1200x800") # 初始化数据 self.train_df = None # 训练集 self.test_df = None # 测试集 self.model = None self.scaler_t = MinMaxScaler(feature_range=(0, 1)) self.scaler_h = MinMaxScaler(feature_range=(0, 1)) self.scaler_dt = MinMaxScaler(feature_range=(0, 1)) self.evaluation_metrics = {} # 创建主界面 self.create_widgets() def create_widgets(self): # 创建主框架 main_frame = ttk.Frame(self.root, padding=10) main_frame.pack(fill=tk.BOTH, expand=True) # 左侧控制面板 control_frame = ttk.LabelFrame(main_frame, text="模型控制", padding=10) control_frame.pack(side=tk.LEFT, fill=tk.Y, padx=5, pady=5) # 文件选择部分 file_frame = ttk.LabelFrame(control_frame, text="数据文件", padding=10) file_frame.pack(fill=tk.X, pady=5) # 训练集选择 ttk.Label(file_frame, text="训练集:").grid(row=0, column=0, sticky=tk.W, pady=5) self.train_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.train_file_var, width=30, state='readonly').grid( row=0, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("train")).grid(row=0, column=2) # 测试集选择 ttk.Label(file_frame, text="测试集:").grid(row=1, column=0, sticky=tk.W, pady=5) self.test_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.test_file_var, width=30, state='readonly').grid(row=1, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("test")).grid(row=1, column=2) # PINNs参数设置 param_frame = ttk.LabelFrame(control_frame, text="PINNs参数", padding=10) param_frame.pack(fill=tk.X, pady=10) # 验证集切分比例 ttk.Label(param_frame, text="验证集比例:").grid(row=0, column=0, sticky=tk.W, pady=5) self.split_ratio_var = tk.DoubleVar(value=0.2) ttk.Spinbox(param_frame, from_=0, to=1, increment=0.05, textvariable=self.split_ratio_var, width=10).grid(row=0, column=1, padx=5) # 隐藏层数量 ttk.Label(param_frame, text="网络层数:").grid(row=1, column=0, sticky=tk.W, pady=5) self.num_layers_var = tk.IntVar(value=4) ttk.Spinbox(param_frame, from_=2, to=8, increment=1, textvariable=self.num_layers_var, width=10).grid(row=1, column=1, padx=5) # 每层神经元数量 ttk.Label(param_frame, text="神经元数/层:").grid(row=2, column=0, sticky=tk.W, pady=5) self.hidden_units_var = tk.IntVar(value=32) ttk.Spinbox(param_frame, from_=16, to=128, increment=4, textvariable=self.hidden_units_var, width=10).grid(row=2, column=1, padx=5) # 训练轮次 ttk.Label(param_frame, text="训练轮次:").grid(row=3, column=0, sticky=tk.W, pady=5) self.epochs_var = tk.IntVar(value=500) ttk.Spinbox(param_frame, from_=100, to=2000, increment=100, textvariable=self.epochs_var, width=10).grid(row=3, column=1, padx=5) # 物理损失权重 ttk.Label(param_frame, text="物理损失权重:").grid(row=4, column=0, sticky=tk.W, pady=5) self.physics_weight_var = tk.DoubleVar(value=0.5) ttk.Spinbox(param_frame, from_=0.1, to=1.0, increment=0.1, textvariable=self.physics_weight_var, width=10).grid(row=4, column=1, padx=5) # 控制按钮 btn_frame = ttk.Frame(control_frame) btn_frame.pack(fill=tk.X, pady=10) ttk.Button(btn_frame, text="训练模型", command=self.train_model).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="预测结果", command=self.predict).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="保存结果", command=self.save_results).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="重置", command=self.reset).pack(side=tk.RIGHT, padx=5) # 状态栏 self.status_var = tk.StringVar(value="就绪") status_bar = ttk.Label(control_frame, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W) status_bar.pack(fill=tk.X, side=tk.BOTTOM) # 右侧结果显示区域 result_frame = ttk.Frame(main_frame) result_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=5, pady=5) # 创建标签页 self.notebook = ttk.Notebook(result_frame) self.notebook.pack(fill=tk.BOTH, expand=True) # 损失曲线标签页 self.loss_frame = ttk.Frame(self.notebook) self.notebook.add(self.loss_frame, text="训练损失") # 预测结果标签页 self.prediction_frame = ttk.Frame(self.notebook) self.notebook.add(self.prediction_frame, text="预测结果") # 指标显示 self.metrics_var = tk.StringVar() metrics_label = ttk.Label( self.prediction_frame, textvariable=self.metrics_var, font=('TkDefaultFont', 10, 'bold'), relief='ridge', padding=5 ) metrics_label.pack(fill=tk.X, padx=5, pady=5) # 初始化绘图区域 self.fig, self.ax = plt.subplots(figsize=(10, 6)) self.canvas = FigureCanvasTkAgg(self.fig, master=self.prediction_frame) self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) # 损失曲线画布 self.loss_fig, self.loss_ax = plt.subplots(figsize=(10, 4)) self.loss_canvas = FigureCanvasTkAgg(self.loss_fig, master=self.loss_frame) self.loss_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) def select_file(self, file_type): """选择Excel文件并计算时间步长""" try: file_path = filedialog.askopenfilename( title=f"选择{file_type}集Excel文件", filetypes=[("Excel文件", "*.xlsx *.xls"), ("所有文件", "*.*")] ) if not file_path: return df = pd.read_excel(file_path) # 验证必需列是否存在 required_cols = ['year', 'month', 'day', '水位'] missing_cols = [col for col in required_cols if col not in df.columns] if missing_cols: messagebox.showerror("列名错误", f"缺少必需列: {', '.join(missing_cols)}") return # 时间特征处理 time_features = ['year', 'month', 'day'] missing_time_features = [feat for feat in time_features if feat not in df.columns] if missing_time_features: messagebox.showerror("列名错误", f"Excel文件缺少预处理后的时间特征列: {', '.join(missing_time_features)}") return # 创建时间戳列 (增强兼容性) time_cols = ['year', 'month', 'day'] if 'hour' in df.columns: time_cols.append('hour') if 'minute' in df.columns: time_cols.append('minute') if 'second' in df.columns: time_cols.append('second') # 填充缺失的时间单位 for col in ['hour', 'minute', 'second']: if col not in df.columns: df[col] = 0 df['datetime'] = pd.to_datetime(df[time_cols]) # 设置时间索引 df = df.set_index('datetime') # 计算相对时间(天) df['days'] = (df.index - df.index[0]).days # 新增:计算时间步长dt(单位:天) df['dt'] = df.index.to_series().diff().dt.total_seconds() / 86400 # 精确到秒级 # 处理时间步长异常值 if len(df) > 1: # 计算有效时间步长(排除<=0的值) valid_dt = df['dt'][df['dt'] > 0] if len(valid_dt) > 0: avg_dt = valid_dt.mean() else: avg_dt = 1.0 else: avg_dt = 1.0 # 替换非正值 df.loc[df['dt'] <= 0, 'dt'] = avg_dt # 填充缺失值 df['dt'] = df['dt'].fillna(avg_dt) # 添加滞后特征 (1天、3天、7天的水位) df['水位_lag1'] = df['水位'].shift(1) df['水位_lag3'] = df['水位'].shift(3) df['水位_lag7'] = df['水位'].shift(7) # 填充缺失的滞后值(用第一个有效值向后填充) lag_cols = ['水位_lag1', '水位_lag3', '水位_lag7'] df[lag_cols] = df[lag_cols].fillna(method='bfill') # 添加周期性特征 df['month'] = df.index.month df['day_of_year'] = df.index.dayofyear # 月份的正弦/余弦变换 df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12) df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12) # 季节特征(每3个月一个季节) seasons = [(12, 1, 2), (3, 4, 5), (6, 7, 8), (9, 10, 11)] season_map = {} for i, months in enumerate(seasons): for month in months: season_map[month] = i df['season'] = df['month'].map(season_map) # 季节的正弦/余弦变换 df['season_sin'] = np.sin(2 * np.pi * df['season'] / 4) df['season_cos'] = np.cos(2 * np.pi * df['season'] / 4) # 保存数据 if file_type == "train": self.train_df = df self.train_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载训练集: {len(self.train_df)}条数据") else: self.test_df = df self.test_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载测试集: {len(self.test_df)}条数据") except Exception as e: error_msg = f"文件读取失败: {str(e)}\n\n请确保:\n1. 文件不是打开状态\n2. 文件格式正确\n3. 包含必需的时间和水位列" messagebox.showerror("文件错误", error_msg) def calculate_metrics(self, y_true, y_pred): """计算评估指标""" from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score mse = mean_squared_error(y_true, y_pred) rmse = np.sqrt(mse) mae = mean_absolute_error(y_true, y_pred) non_zero_idx = np.where(y_true != 0)[0] if len(non_zero_idx) > 0: mape = np.mean(np.abs((y_true[non_zero_idx] - y_pred[non_zero_idx]) / y_true[non_zero_idx])) * 100 else: mape = float('nan') r2 = r2_score(y_true, y_pred) return { 'MSE': mse, 'RMSE': rmse, 'MAE': mae, 'MAPE': mape, # 修正键名 'R2': r2 } def train_model(self): """训练PINNs模型(带早停机制+训练指标监控)""" if self.train_df is None: messagebox.showwarning("警告", "请先选择训练集文件") return try: self.status_var.set("正在预处理数据...") self.root.update() # 从训练集中切分训练子集和验证子集(时间顺序切分) split_ratio = 1 - self.split_ratio_var.get() split_idx = int(len(self.train_df) * split_ratio) train_subset = self.train_df.iloc[:split_idx] valid_subset = self.train_df.iloc[split_idx:] # 检查数据量是否足够 if len(train_subset) < 2 or len(valid_subset) < 2: messagebox.showerror("数据错误", "训练集数据量不足(至少需要2个时间步)") return # ===== 新增:创建特征归一化器 ===== self.scaler_lag1 = MinMaxScaler(feature_range=(0, 1)) self.scaler_lag3 = MinMaxScaler(feature_range=(0, 1)) self.scaler_lag7 = MinMaxScaler(feature_range=(0, 1)) self.scaler_month = MinMaxScaler(feature_range=(0, 1)) self.scaler_season = MinMaxScaler(feature_range=(0, 1)) # 数据预处理 - 分别归一化不同特征 # 时间特征 t_train = train_subset['days'].values[1:].reshape(-1, 1) self.scaler_t.fit(t_train) t_train_scaled = self.scaler_t.transform(t_train).astype(np.float32) # 水位特征 h_train = train_subset['水位'].values[:-1].reshape(-1, 1) self.scaler_h.fit(h_train) h_train_scaled = self.scaler_h.transform(h_train).astype(np.float32) # 时间步长特征 dt_train = train_subset['dt'].values[1:].reshape(-1, 1) self.scaler_dt.fit(dt_train) dt_train_scaled = self.scaler_dt.transform(dt_train).astype(np.float32) # ===== 新增:归一化滞后特征 ===== lag1_train = train_subset['水位_lag1'].values[:-1].reshape(-1, 1) self.scaler_lag1.fit(lag1_train) lag1_train_scaled = self.scaler_lag1.transform(lag1_train).astype(np.float32) lag3_train = train_subset['水位_lag3'].values[:-1].reshape(-1, 1) self.scaler_lag3.fit(lag3_train) lag3_train_scaled = self.scaler_lag3.transform(lag3_train).astype(np.float32) lag7_train = train_subset['水位_lag7'].values[:-1].reshape(-1, 1) self.scaler_lag7.fit(lag7_train) lag7_train_scaled = self.scaler_lag7.transform(lag7_train).astype(np.float32) # ===== 新增:归一化周期性特征 ===== month_sin_train = train_subset['month_sin'].values[:-1].reshape(-1, 1) month_cos_train = train_subset['month_cos'].values[:-1].reshape(-1, 1) month_features_train = np.hstack([month_sin_train, month_cos_train]) self.scaler_month.fit(month_features_train) month_features_train_scaled = self.scaler_month.transform(month_features_train).astype(np.float32) season_sin_train = train_subset['season_sin'].values[:-1].reshape(-1, 1) season_cos_train = train_subset['season_cos'].values[:-1].reshape(-1, 1) season_features_train = np.hstack([season_sin_train, season_cos_train]) self.scaler_season.fit(season_features_train) season_features_train_scaled = self.scaler_season.transform(season_features_train).astype(np.float32) # 归一化标签(下一时刻水位) h_next_train = train_subset['水位'].values[1:].reshape(-1, 1) h_next_train_scaled = self.scaler_h.transform(h_next_train).astype(np.float32) # 准备验证数据(同样进行归一化) t_valid = valid_subset['days'].values[1:].reshape(-1, 1) t_valid_scaled = self.scaler_t.transform(t_valid).astype(np.float32) h_valid = valid_subset['水位'].values[:-1].reshape(-1, 1) h_valid_scaled = self.scaler_h.transform(h_valid).astype(np.float32) dt_valid = valid_subset['dt'].values[1:].reshape(-1, 1) dt_valid_scaled = self.scaler_dt.transform(dt_valid).astype(np.float32) h_next_valid_scaled = self.scaler_h.transform( valid_subset['水位'].values[1:].reshape(-1, 1) ).astype(np.float32) # 原始值用于指标计算 h_next_train_true = h_next_train h_next_valid_true = valid_subset['水位'].values[1:].reshape(-1, 1) # 创建模型和优化器 self.model = PINNModel( num_layers=self.num_layers_var.get(), hidden_units=self.hidden_units_var.get() ) # 创建动态学习率调度器 initial_lr = 0.001 lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay( initial_learning_rate=initial_lr, decay_steps=100, # 每100步衰减一次 decay_rate=0.95, # 衰减率 staircase=True # 阶梯式衰减 ) optimizer = Adam(learning_rate=lr_schedule) # ===== 新增:验证集的滞后特征归一化 ===== lag1_valid = valid_subset['水位_lag1'].values[:-1].reshape(-1, 1) lag1_valid_scaled = self.scaler_lag1.transform(lag1_valid).astype(np.float32) lag3_valid = valid_subset['水位_lag3'].values[:-1].reshape(-1, 1) lag3_valid_scaled = self.scaler_lag3.transform(lag3_valid).astype(np.float32) lag7_valid = valid_subset['水位_lag7'].values[:-1].reshape(-1, 1) lag7_valid_scaled = self.scaler_lag7.transform(lag7_valid).astype(np.float32) # ===== 新增:验证集的周期性特征归一化 ===== month_sin_valid = valid_subset['month_sin'].values[:-1].reshape(-1, 1) month_cos_valid = valid_subset['month_cos'].values[:-1].reshape(-1, 1) month_features_valid = np.hstack([month_sin_valid, month_cos_valid]) month_features_valid_scaled = self.scaler_month.transform(month_features_valid).astype(np.float32) season_sin_valid = valid_subset['season_sin'].values[:-1].reshape(-1, 1) season_cos_valid = valid_subset['season_cos'].values[:-1].reshape(-1, 1) season_features_valid = np.hstack([season_sin_valid, season_cos_valid]) season_features_valid_scaled = self.scaler_season.transform(season_features_valid).astype(np.float32) # 在训练循环中,使用归一化后的数据 train_dataset = tf.data.Dataset.from_tensor_slices( ((t_train_scaled, h_train_scaled, dt_train_scaled, lag1_train_scaled, lag3_train_scaled, lag7_train_scaled, month_features_train_scaled, season_features_train_scaled), h_next_train_scaled) ) train_dataset = train_dataset.shuffle(buffer_size=1024).batch(32) valid_dataset = tf.data.Dataset.from_tensor_slices( ((t_valid_scaled, h_valid_scaled, dt_valid_scaled, lag1_valid_scaled, lag3_valid_scaled, lag7_valid_scaled, month_features_valid_scaled, season_features_valid_scaled), h_next_valid_scaled) ) valid_dataset = valid_dataset.batch(32) # 初始化训练历史记录列表 train_data_loss_history = [] physics_loss_history = [] valid_data_loss_history = [] train_metrics_history = [] valid_metrics_history = [] # 早停机制参数 patience = int(self.epochs_var.get() / 3) min_delta = 1e-4 best_valid_loss = float('inf') wait = 0 best_epoch = 0 best_weights = None start_time = time.time() # 自定义训练循环 for epoch in range(self.epochs_var.get()): # 获取当前学习率 current_lr = optimizer.learning_rate.numpy() # 训练阶段 epoch_train_data_loss = [] epoch_physics_loss = [] train_pred_scaled = [] # 修改后的解包方式 # 在训练循环中: for step, (inputs, h_next_batch) in enumerate(train_dataset): t_batch, h_batch, dt_batch, lag1_batch, lag3_batch, lag7_batch, month_feats_batch, season_feats_batch = inputs with tf.GradientTape() as tape: # 预测下一时刻水位 h_pred = self.model([ t_batch, h_batch, dt_batch, lag1_batch, lag3_batch, lag7_batch, month_feats_batch, season_feats_batch ], training=True) data_loss = tf.reduce_mean(tf.square(h_next_batch - h_pred)) # 计算物理损失 physics_loss = self.model.physics_loss(t_batch, h_batch, dt_batch) # 使用设置的物理损失权重 current_physics_weight = self.physics_weight_var.get() # 添加L2正则化损失 l2_loss = tf.reduce_sum(self.model.losses) # 总损失 loss = data_loss + current_physics_weight * physics_loss + l2_loss grads = tape.gradient(loss, self.model.trainable_variables) optimizer.apply_gradients(zip(grads, self.model.trainable_variables)) epoch_train_data_loss.append(data_loss.numpy()) epoch_physics_loss.append(physics_loss.numpy()) train_pred_scaled.append(h_pred.numpy()) # 合并训练预测值 train_pred_scaled = np.concatenate(train_pred_scaled, axis=0) train_pred_true = self.scaler_h.inverse_transform(train_pred_scaled) train_metrics = self.calculate_metrics( y_true=h_next_train_true.flatten(), y_pred=train_pred_true.flatten() ) train_metrics_history.append(train_metrics) # 验证阶段 epoch_valid_data_loss = [] valid_pred_scaled = [] for (inputs, h_v_next_batch) in valid_dataset: t_v_batch, h_v_batch, dt_v_batch, lag1_v_batch, lag3_v_batch, lag7_v_batch, month_feats_v_batch, season_feats_v_batch = inputs h_v_pred = self.model([ t_v_batch, h_v_batch, dt_v_batch, lag1_v_batch, lag3_v_batch, lag7_v_batch, month_feats_v_batch, season_feats_v_batch ], training=False) valid_data_loss = tf.reduce_mean(tf.square(h_v_next_batch - h_v_pred)) epoch_valid_data_loss.append(valid_data_loss.numpy()) valid_pred_scaled.append(h_v_pred.numpy()) # 合并验证预测值(归一化后) valid_pred_scaled = np.concatenate(valid_pred_scaled, axis=0) # 反归一化得到原始预测值 valid_pred_true = self.scaler_h.inverse_transform(valid_pred_scaled) # 计算验证集指标(使用原始真实值和预测值) valid_metrics = self.calculate_metrics( y_true=h_next_valid_true.flatten(), y_pred=valid_pred_true.flatten() ) valid_metrics_history.append(valid_metrics) # 计算平均损失 avg_train_data_loss = np.mean(epoch_train_data_loss) avg_physics_loss = np.mean(epoch_physics_loss) avg_valid_data_loss = np.mean(epoch_valid_data_loss) # 记录损失 train_data_loss_history.append(avg_train_data_loss) physics_loss_history.append(avg_physics_loss) valid_data_loss_history.append(avg_valid_data_loss) # 早停机制逻辑 current_valid_loss = avg_valid_data_loss # 早停机制逻辑 current_valid_loss = avg_valid_data_loss if current_valid_loss < best_valid_loss - min_delta: best_valid_loss = current_valid_loss best_epoch = epoch + 1 wait = 0 best_weights = self.model.get_weights() else: wait += 1 if wait >= patience: self.status_var.set(f"触发早停!最佳轮次: {best_epoch},最佳验证损失: {best_valid_loss:.4f}") if best_weights is not None: self.model.set_weights(best_weights) break # 确保在此处退出循环 # 更新状态(添加当前学习率显示) if epoch % 1 == 0: # 提取当前训练/验证的关键指标 train_rmse = train_metrics['RMSE'] valid_rmse = valid_metrics['RMSE'] train_r2 = train_metrics['R2'] valid_r2 = valid_metrics['R2'] elapsed = time.time() - start_time self.status_var.set( f"训练中 | 轮次: {epoch + 1}/{self.epochs_var.get()} | " f"学习率: {current_lr:.6f} | " f"训练RMSE: {train_rmse:.4f} | 验证RMSE: {valid_rmse:.4f} | " f"训练R²: {train_r2:.4f} | 验证R²: {valid_r2:.4f} | " f"k1: {self.model.k1.numpy():.6f}, k2: {self.model.k2.numpy():.6f} | 时间: {elapsed:.1f}秒 | 早停等待: {wait}/{patience}" ) self.root.update() # 绘制损失曲线 self.loss_ax.clear() epochs_range = range(1, len(train_data_loss_history) + 1) self.loss_ax.plot(epochs_range, train_data_loss_history, 'b-', label='训练数据损失') self.loss_ax.plot(epochs_range, physics_loss_history, 'r--', label='物理损失') self.loss_ax.plot(epochs_range, valid_data_loss_history, 'g-.', label='验证数据损失') self.loss_ax.set_title('PINNs训练与验证损失') self.loss_ax.set_xlabel('轮次') self.loss_ax.set_ylabel('损失', rotation=0) self.loss_ax.legend() self.loss_ax.grid(True, alpha=0.3) self.loss_ax.set_yscale('log') self.loss_canvas.draw() # 训练完成提示 elapsed = time.time() - start_time if wait >= patience: completion_msg = ( f"早停触发 | 最佳轮次: {best_epoch} | 最佳验证损失: {best_valid_loss:.4f} | " f"最佳验证RMSE: {valid_metrics_history[best_epoch - 1]['RMSE']:.4f} | " f"总时间: {elapsed:.1f}秒" ) else: completion_msg = ( f"训练完成 | 总轮次: {self.epochs_var.get()} | " f"最终训练RMSE: {train_metrics_history[-1]['RMSE']:.4f} | " f"最终验证RMSE: {valid_metrics_history[-1]['RMSE']:.4f} | " f"最终训练R²: {train_metrics_history[-1]['R2']:.4f} | " f"最终验证R²: {valid_metrics_history[-1]['R2']:.4f} | " f"总时间: {elapsed:.1f}秒" ) # 保存训练历史 self.train_history = { 'train_data_loss': train_data_loss_history, 'physics_loss': physics_loss_history, 'valid_data_loss': valid_data_loss_history, 'train_metrics': train_metrics_history, 'valid_metrics': valid_metrics_history } # 保存学习到的物理参数 self.learned_params = { "k1": self.model.k1.numpy(), "k2": self.model.k2.numpy(), "alpha": self.model.alpha.numpy(), "beta": self.model.beta.numpy() } self.status_var.set(completion_msg) messagebox.showinfo("训练完成", f"PINNs模型训练成功完成!\n{completion_msg}") except Exception as e: messagebox.showerror("训练错误", f"模型训练失败:\n{str(e)}") self.status_var.set("训练失败") def predict(self): """使用PINNs模型进行递归预测(带Teacher Forcing和蒙特卡洛Dropout)""" if self.model is None: messagebox.showwarning("警告", "请先训练模型") return if self.test_df is None: messagebox.showwarning("警告", "请先选择测试集文件") return try: self.status_var.set("正在生成预测(使用Teacher Forcing和MC Dropout)...") self.root.update() # 预处理测试数据 - 归一化 t_test = self.test_df['days'].values.reshape(-1, 1) t_test_scaled = self.scaler_t.transform(t_test).astype(np.float32) dt_test = self.test_df['dt'].values.reshape(-1, 1) dt_test_scaled = self.scaler_dt.transform(dt_test).astype(np.float32) h_test = self.test_df['水位'].values.reshape(-1, 1) h_test_scaled = self.scaler_h.transform(h_test).astype(np.float32) # ===== 新增:归一化测试集的滞后特征 ===== lag1_test = self.test_df['水位_lag1'].values.reshape(-1, 1) lag1_test_scaled = self.scaler_lag1.transform(lag1_test).astype(np.float32) lag3_test = self.test_df['水位_lag3'].values.reshape(-1, 1) lag3_test_scaled = self.scaler_lag3.transform(lag3_test).astype(np.float32) lag7_test = self.test_df['水位_lag7'].values.reshape(-1, 1) lag7_test_scaled = self.scaler_lag7.transform(lag7_test).astype(np.float32) # ===== 新增:归一化测试集的周期性特征 ===== month_sin_test = self.test_df['month_sin'].values.reshape(-1, 1) month_cos_test = self.test_df['month_cos'].values.reshape(-1, 1) month_features_test = np.hstack([month_sin_test, month_cos_test]) month_features_test_scaled = self.scaler_month.transform(month_features_test).astype(np.float32) season_sin_test = self.test_df['season_sin'].values.reshape(-1, 1) season_cos_test = self.test_df['season_cos'].values.reshape(-1, 1) season_features_test = np.hstack([season_sin_test, season_cos_test]) season_features_test_scaled = self.scaler_season.transform(season_features_test).astype(np.float32) # 改进的递归预测参数 n = len(t_test) mc_iterations = 100 adaptive_forcing = True # 存储蒙特卡洛采样结果 mc_predictions_scaled = np.zeros((mc_iterations, n, 1), dtype=np.float32) # 进行多次蒙特卡洛采样 for mc_iter in range(mc_iterations): predicted_scaled = np.zeros((n, 1), dtype=np.float32) predicted_scaled[0] = h_test_scaled[0] # 第一个点使用真实值 # 递归预测(带自适应教师强制) for i in range(1, n): # 自适应教师强制:后期阶段增加真实值使用频率 if adaptive_forcing: # 前期70%概率使用真实值,后期提高到90% teacher_forcing_prob = 0.7 + 0.2 * min(1.0, i / (0.7 * n)) else: teacher_forcing_prob = 0.7 # 决定使用真实值还是预测值 use_actual = np.random.rand() < teacher_forcing_prob if use_actual and i < n - 1: # 不能使用未来值 h_prev = h_test_scaled[i - 1:i] else: h_prev = predicted_scaled[i - 1:i] t_prev = t_test_scaled[i - 1:i] dt_i = dt_test_scaled[i:i + 1] # 准备输入特征 inputs = ( t_test_scaled[i - 1:i], # 时间 h_prev, # 当前水位 dt_test_scaled[i:i + 1], # 时间步长 lag1_test_scaled[i - 1:i], lag3_test_scaled[i - 1:i], lag7_test_scaled[i - 1:i], month_features_test_scaled[i - 1:i], season_features_test_scaled[i - 1:i] ) # 直接传递整个输入元组 h_pred = self.model(inputs, training=True) # 物理模型预测值(用于约束) k1 = self.learned_params['k1'] k2 = self.learned_params['k2'] alpha = self.learned_params['alpha'] beta = self.learned_params['beta'] # 物理方程预测 exponent = - (k1 + k2 * h_prev) * dt_i decay_term = h_prev * np.exp(exponent) external_term = alpha * (1 - np.exp(-beta * dt_i)) physics_pred = decay_term + external_term # 混合预测:神经网络预测与物理模型预测加权平均 physics_weight = 0.3 # 物理模型权重 final_pred = physics_weight * physics_pred + (1 - physics_weight) * h_pred.numpy() predicted_scaled[i] = final_pred[0][0] mc_predictions_scaled[mc_iter] = predicted_scaled # 计算预测统计量 mean_pred_scaled = np.mean(mc_predictions_scaled, axis=0) std_pred_scaled = np.std(mc_predictions_scaled, axis=0) # 反归一化结果 predictions = self.scaler_h.inverse_transform(mean_pred_scaled) uncertainty = self.scaler_h.inverse_transform(std_pred_scaled) * 1.96 # 95%置信区间 actual_values = h_test test_time = self.test_df.index # 清除现有图表 self.ax.clear() # 计算合理的y轴范围 - 基于数据集中区域 # 获取实际值和预测值的中位数 median_val = np.median(actual_values) # 计算数据的波动范围(标准差) data_range = np.std(actual_values) * 4 # 4倍标准差覆盖大部分数据 # 设置y轴范围为中心值±数据波动范围 y_center = median_val y_half_range = max(data_range, 10) # 确保最小范围为20个单位 y_min_adjusted = y_center - y_half_range y_max_adjusted = y_center + y_half_range # 确保范围不为零 if y_max_adjusted - y_min_adjusted < 1: y_min_adjusted -= 5 y_max_adjusted += 5 # 绘制结果(带置信区间) self.ax.plot(test_time, actual_values, 'b-', label='真实值', linewidth=2) self.ax.plot(test_time, predictions, 'r--', label='预测均值', linewidth=2) self.ax.fill_between( test_time, (predictions - uncertainty).flatten(), (predictions + uncertainty).flatten(), color='orange', alpha=0.3, label='95%置信区间' ) # 设置自动调整的y轴范围 self.ax.set_ylim(y_min_adjusted, y_max_adjusted) self.ax.set_title('大坝渗流水位预测(PINNs with MC Dropout)') self.ax.set_xlabel('时间') self.ax.set_ylabel('测压管水位', rotation=0) self.ax.legend(loc='best') # 自动选择最佳位置 # 优化时间轴刻度 self.ax.xaxis.set_major_locator(mdates.YearLocator()) self.ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y')) self.ax.xaxis.set_minor_locator(mdates.MonthLocator(interval=2)) self.ax.grid(which='minor', axis='x', linestyle=':', color='gray', alpha=0.3) self.ax.grid(which='major', axis='y', linestyle='-', color='lightgray', alpha=0.5) self.ax.tick_params(axis='x', which='major', rotation=0, labelsize=9) self.ax.tick_params(axis='x', which='minor', length=2) # 计算评估指标(排除第一个点) eval_actual = actual_values[1:].flatten() eval_pred = predictions[1:].flatten() self.evaluation_metrics = self.calculate_metrics(eval_actual, eval_pred) # 添加不确定性指标 avg_uncertainty = np.mean(uncertainty) max_uncertainty = np.max(uncertainty) self.evaluation_metrics['Avg Uncertainty'] = avg_uncertainty self.evaluation_metrics['Max Uncertainty'] = max_uncertainty metrics_text = ( f"MSE: {self.evaluation_metrics['MSE']:.4f} | " f"RMSE: {self.evaluation_metrics['RMSE']:.4f} | " f"MAE: {self.evaluation_metrics['MAE']:.4f} | " f"MAPE: {self.evaluation_metrics['MAPE']:.2f}% | " f"R²: {self.evaluation_metrics['R2']:.4f}\n" f"平均不确定性: {avg_uncertainty:.4f} | 最大不确定性: {max_uncertainty:.4f}" ) self.metrics_var.set(metrics_text) # 在图表上添加指标 self.ax.text( 0.5, 1.05, metrics_text, transform=self.ax.transAxes, ha='center', fontsize=8, bbox=dict(facecolor='white', alpha=0.8) ) params_text = ( f"物理参数: k1={self.learned_params['k1']:.4f}, " f"k2={self.learned_params['k2']:.4f}, " f"alpha={self.learned_params['alpha']:.4f}, " f"beta={self.learned_params['beta']:.4f} | " f"Teacher Forcing概率: {teacher_forcing_prob}" ) self.ax.text( 0.5, 1.12, params_text, transform=self.ax.transAxes, ha='center', fontsize=8, bbox=dict(facecolor='white', alpha=0.8) ) # 调整布局 plt.tight_layout(pad=2.0) self.canvas.draw() # 保存预测结果 self.predictions = predictions self.uncertainty = uncertainty self.actual_values = actual_values self.test_time = test_time self.mc_predictions = mc_predictions_scaled self.status_var.set(f"预测完成(MC Dropout采样{mc_iterations}次)") except Exception as e: messagebox.showerror("预测错误", f"预测失败:\n{str(e)}") self.status_var.set("预测失败") import traceback traceback.print_exc() def save_results(self): """保存预测结果和训练历史数据""" if not hasattr(self, 'predictions') or not hasattr(self, 'train_history'): messagebox.showwarning("警告", "请先生成预测结果并完成训练") return # 选择保存路径 save_path = filedialog.asksaveasfilename( defaultextension=".xlsx", filetypes=[("Excel文件", "*.xlsx"), ("所有文件", "*.*")], title="保存结果" ) if not save_path: return try: # 1. 创建预测结果DataFrame result_df = pd.DataFrame({ '时间': self.test_time, '实际水位': self.actual_values.flatten(), '预测水位': self.predictions.flatten() }) # 2. 创建评估指标DataFrame metrics_df = pd.DataFrame([self.evaluation_metrics]) # 3. 创建训练历史DataFrame history_data = { '轮次': list(range(1, len(self.train_history['train_data_loss']) + 1)), '训练数据损失': self.train_history['train_data_loss'], '物理损失': self.train_history['physics_loss'], '验证数据损失': self.train_history['valid_data_loss'] } # 添加训练集指标 for metric in ['MSE', 'RMSE', 'MAE', 'MAPE', 'R2']: history_data[f'训练集_{metric}'] = [item[metric] for item in self.train_history['train_metrics']] # 添加验证集指标 for metric in ['MSE', 'RMSE', 'MAE', 'MAPE', 'R2']: history_data[f'验证集_{metric}'] = [item[metric] for item in self.train_history['valid_metrics']] history_df = pd.DataFrame(history_data) # 保存到Excel with pd.ExcelWriter(save_path) as writer: result_df.to_excel(writer, sheet_name='预测结果', index=False) metrics_df.to_excel(writer, sheet_name='评估指标', index=False) history_df.to_excel(writer, sheet_name='训练历史', index=False) # 保存图表 chart_path = os.path.splitext(save_path)[0] + "_chart.png" self.fig.savefig(chart_path, dpi=300) # 保存损失曲线图 loss_path = os.path.splitext(save_path)[0] + "_loss.png" self.loss_fig.savefig(loss_path, dpi=300) self.status_var.set(f"结果已保存至: {os.path.basename(save_path)}") messagebox.showinfo("保存成功", f"预测结果和图表已保存至:\n" f"主文件: {save_path}\n" f"预测图表: {chart_path}\n" f"损失曲线: {loss_path}") except Exception as e: messagebox.showerror("保存错误", f"保存结果失败:\n{str(e)}") def reset(self): # 重置归一化器 self.scaler_t = MinMaxScaler(feature_range=(0, 1)) self.scaler_h = MinMaxScaler(feature_range=(0, 1)) self.scaler_dt = MinMaxScaler(feature_range=(0, 1)) """重置程序状态""" self.train_df = None self.test_df = None self.model = None self.train_file_var.set("") self.test_file_var.set("") # 清除训练历史 if hasattr(self, 'train_history'): del self.train_history # 清除图表 if hasattr(self, 'ax'): self.ax.clear() if hasattr(self, 'loss_ax'): self.loss_ax.clear() # 重绘画布 if hasattr(self, 'canvas'): self.canvas.draw() if hasattr(self, 'loss_canvas'): self.loss_canvas.draw() # 清除状态 self.status_var.set("已重置,请选择新数据") # 清除预测结果 if hasattr(self, 'predictions'): del self.predictions # 清除指标文本 if hasattr(self, 'metrics_var'): self.metrics_var.set("") messagebox.showinfo("重置", "程序已重置,可以开始新的分析") if __name__ == "__main__": root = tk.Tk() app = DamSeepageModel(root) root.mainloop() 我这个代码是用来训练水位预测模型的,帮我检查错误并修改

最新推荐

recommend-type

响应式绿色简洁风格网络借贷网页模板分享

标题中提到的“绿色简洁风格响应式网络借贷网页模板.zip”暗示着该模板采用了绿色作为主要色彩,并且界面设计风格简洁。响应式设计则意味着网页模板能够在不同尺寸的屏幕上展示适宜的布局和内容,无论是电脑、平板还是手机等移动设备。这种设计符合现代网页设计的趋势,确保用户无论使用何种设备访问网络借贷平台,都能获得良好的浏览体验。同时,“网络借贷”表明这个网页模板可能专门适用于P2P借贷公司或金融技术服务公司,它们需要一个能够体现专业、可靠、易用界面的在线平台。 在描述部分,“html网站模版分享”表明该文件是一个分享性质的资源,用户可以通过这个模板快速搭建一个HTML网站。静态化H5网站模版源码意味着该模板可能不包含后端交互逻辑,即不会涉及数据库和服务器端编程。这里提及的H5指的是HTML5,它是HTML的最新版本,提供了更多增强的标签和功能,比如更好的多媒体和图形支持、离线存储等。PC+wap表明该模板支持传统的个人电脑浏览以及移动设备的wap(无线应用协议)浏览,平面广告设计网页模版代码则说明模板中可能包含了广告位或者特定的视觉元素来强化广告效果。 标签“html5 H5模版 HTML模版”进一步细化了文件的内容,强调了HTML5技术的应用。HTML5模版通常包含最新的HTML标记和语义化标签,能够支持现代浏览器的各种新特性,从而提升网站的交互性和用户体验。标签的使用也说明了这个模板可能适用于多种不同类型的网站,但特别适用于需要在移动设备上也能良好展示的网站。 文件名列表中的“24809”可能指的是该模板的版本号、编号或者文件在压缩包中的唯一标识。由于没有具体的文件扩展名,我们无法直接了解具体的文件内容,但是通常情况下,一个网页模板压缩包中应包含HTML、CSS、JavaScript文件以及其他可能的媒体资源,如图片、字体等。 综上所述,该模板文件包含的知识点包括: 1. 响应式网页设计:能够适应不同屏幕尺寸和分辨率的设备。 2. 绿色简洁风格:采用绿色作为视觉主色调,设计简洁明了。 3. 网络借贷领域适用性:模板设计符合金融技术服务公司或P2P借贷平台的需要。 4. 静态HTML5网页模板:适用于快速搭建静态网站,不涉及后端逻辑。 5. PC与移动设备适配:模板设计同时考虑了电脑和平板及手机等移动设备的浏览体验。 6. 广告设计元素:模板中包含设计用于展示广告内容的版块。 7. HTML5技术应用:利用HTML5的新特性和语义化标签来构建网页。 8. 跨平台的网页模板:模板适用于多种设备和浏览器,确保良好的兼容性。 这个网页模板特别适合希望快速上线的网络借贷平台、金融服务网站等,它可以帮助开发者节省设计和编码的时间,同时也确保了一个专业且用户友好的前端展示。
recommend-type

图割技术深度剖析:掌握从理论到实践的15个关键步骤

# 1. 图割技术概述 图割技术是一种应用广泛的数学算法,它来源于图论,主要应用于图像处理、网络优化、数据分析等领域。图割技术的核心在于将复杂的问题转化为图的最小割问题,并求解。这不仅涉及到了图论和网络流理论,还需要具备一定的优化算法知识。通过对图割技术的基本概念、理论基础和实际应用的深入理解,可以有
recommend-type

zmdl

<think>好的,我现在需要处理用户关于“zmdl IT相关内容”的查询。首先,“zmdl”可能是一个缩写或拼写错误,所以我要考虑常见的IT相关术语中可能的匹配。 首先,我会想到常见的缩写,比如ZMDL可能代表某个技术框架、工具、协议或者公司名称。例如,在软件开发中,可能有类似缩写的库或框架。但常见的IT缩写如XML、JSON、YAML等,似乎不太匹配。然后,检查是否有拼写错误,比如用户可能想输入的是“ZMQ”(ZeroMQ)或者“ZFS”等,但不确定。 接下来,考虑中文拼音首字母缩写。例如,“zmdl”可能对应“自动化代码”之类的词组,比如“自动化代码流程”(Zi Dong Hua D
recommend-type

紫色大气PC+wap网页模板代码包

根据给定的文件信息,我们可以提炼出以下知识点: ### 网页模板与设计 #### 1. 网页模板概述 网页模板是一种预先设计好的网页结构框架,它包含HTML、CSS以及可能的JavaScript代码,可以快速帮助开发者构建出一致风格和布局的网页。使用模板可以节省设计和编码的时间,使得开发者可以专注于网页内容的更新和功能的实现。 #### 2. PC与WAP的区别 PC端指的是使用个人电脑访问的网页版本,通常会提供更加丰富的布局和功能,因为屏幕尺寸较大,可以展示更多的内容和元素。WAP则是针对移动设备(如手机和平板电脑)设计的网页版本,它必须考虑到移动设备屏幕小、网络带宽较低等特点,因此在设计上更倾向于简洁、高效。 #### 3. 静态网页与动态网页 静态网页是一种简单的网页格式,其内容是固定的,不会因为用户的交互而改变。动态网页则允许内容根据用户的不同操作发生变化,通常包含服务器端脚本或数据库交互,可以提供更加个性化的浏览体验。静态化H5网站模板意味着这个模板是静态的,但专为H5设计,即兼容移动设备的HTML5标准。 #### 4. HTML5网页模板 HTML5是最新版本的HTML标准,它引入了诸多新特性,例如支持多媒体内容、图形和动画等,而无需依赖插件。HTML5模板专为HTML5标准设计,能够提供更好的兼容性和更丰富的用户体验。 ### 开发工具与技术 #### 1. HTML和CSS HTML(HyperText Markup Language)是构建网页的标准标记语言,它定义了网页的内容和结构。CSS(Cascading Style Sheets)用于描述HTML文档的呈现样式,包括布局、设计、颜色和字体等。两者结合使用,可以创建既美观又功能强大的网页。 #### 2. JavaScript JavaScript是一种运行在浏览器端的脚本语言,它能够让网页变得动态和交互性更强。通过使用JavaScript,开发者可以添加复杂的动画效果、表单验证、数据操作以及与用户的实时互动。 #### 3. 响应式设计 响应式网页设计是一种设计方法论,旨在让网页在不同设备和屏幕尺寸上均能提供优秀的浏览体验。这通常是通过媒体查询(Media Queries)来实现,可以根据设备的屏幕尺寸来应用不同的CSS样式。 ### 文件管理和解压缩 #### 1. 压缩文件格式 "紫色大气形式pc+wap专业维修服务网页模板代码.zip"文件意味着该文件是一个ZIP压缩包,它通过压缩算法减少了文件大小,便于传输和存储。解压缩此文件后,可以得到一系列的文件,这些文件包含了网页模板的所有资源。 #### 2. 文件命名规范 给定的压缩包中只有一个文件,即"22695"。从文件名称中,我们无法直接获取关于文件内容的具体信息。通常来说,文件命名应该反映出文件内容或者用途,以便于管理和检索。 ### 具体应用场景 #### 1. 专业维修服务网站 该网页模板被描述为面向专业维修服务的。这表明模板会包含相应的行业元素和布局设计,比如服务介绍、价格信息、联系方式、在线预约等。此类模板适合维修公司、汽车服务中心、电子产品维修点等使用。 #### 2. 平面广告设计 网页模板中还提到了平面广告设计。这意味着模板可能融入了平面设计的元素,如视觉焦点、色彩搭配和图形设计等,帮助企业在网络上展示其品牌和产品。 ### 结论 综上所述,"紫色大气形式pc+wap专业维修服务网页模板代码.zip"文件提供了一个静态化H5网页模板,可用于创建兼容PC和移动端的维修服务网站。模板代码基于HTML5、CSS和可能的JavaScript编写,具有响应式设计以适应不同设备。通过解压缩操作,开发者可以获取模板文件,然后根据需要进行修改和扩展以构建出一个功能完整、视觉吸引的网站。
recommend-type

【微信小程序CI_CD流程优化】:掌握这些技巧,部署效率提升不止一倍!

# 1. 微信小程序CI/CD的基本概念 微信小程序CI/CD(持续集成和持续部署)是一种软件开发实践,旨在使开发人员能够更快地交付新版本的小程序,同时保持高质量的标准。它强调在开发过程中持续进行构建、测试和发布,确保代码改动能够被快速发现并部署到生产环境中。通过自动化测试和部署流程,CI/CD减少了手动错误,加速
recommend-type

16.1054 63.2079 39.6566 37.3455 30.8524 48.6809 52.7529 45.2237 37.5511 46.7804 55.6762 55.565 66.0176 53.1187 68.2415 62.1257 57.9002 64.6832 53.7859 64.3788 66.2928 50.9808 51.9941 50.3053 39.3842 42.3115 42.7619 39.8346 27.2247 48.7291 37.8606 35.8012 30.7673 25.7334 15.8944 10.4029 15.208 18.1825 35.1148 46.8987 65.5473 35.1311 47.9013 49.9909 36.0599 37.9174 37.2208 51.8484 82.2645 89.4622 105.0186 78.5496 77.0662 70.4043 87.585 76.0143 90.0394 75.3131 79.5206 84.4294 99.1557 99.8569 93.5457 106.8694 63.0412 70.242 108.194 98.4349 94.6397 108.194 114.1579 133.6761 167.2908 196.0259 255.1226 237.7731 223.6767 206.3272 207.8226 283.3892 320.6168 355.0663 565.6525 757.9028 750.6795 719.0082 821.801 904.5908 1528.0266 1503.7027 2273.6813 2748.9146 2445.4799 3181.3091 2559.268 2330.8491 2095.6872 1859.6824 2057.7578 1954.0843 1564.6765 1122.2518 1053.0677 860.4376 785.8273 697.6516 825.1673 774.9749 855.0114 869.9335 1023.2236 1050.3546 1221.28 1637.3646 1720.4856 1180.8806 1319.8698 1468.3974 1471.1227 1373.0127 1172.7048 1184.9685 1194.507 962.8584 891.8322 832.6799 1040.4007 1058.2839 1055.5327 1234.3651 1018.3905 1030.7713 1028.02 1022.5174 1095.4261 1066.5377 992.149 1077.1249 1035.3335 1035.3335 908.5662 1014.4378 883.4913 940.6063 965.6811 1053.4431 1053.4431 1150.9564 1188.5687 1141.2051 1203.7752 1032.8578 1090.772 1075.234 1135.9733 1329.4914 1596.4616 1595.0491 1419.894 1458.0326 1618.2582 1312.7582 1265.4272 1256.8216 1209.4906 1220.9648 1156.4225 1051.7206 958.4929 863.831 1060.3263 1033.0751 1194.7088 1152.309 1399.1365 1281.0227 1290.1084 1323.4225 1550.5644 2004.8482 1886.7345 1830.7061 1992.734 2110.8478 2072.9908 2098.7336 2121.2401 2061.7146 1894.1034 2041.3506 2260.655 3726.8614 2697.6973 3969.2295 4171.63 3967.5972 4303.8432 3254.2987 3275.5181 3251.0341 3259.1955 3082.9112 3370.1893 3975.7585 3714.5966 3819.976 4329.5898 4786.8922 5168.2588 5141.2594 6238.1102 5418.0033 5517.5636 4692.3943 4218.2172 4051.1584 3835.1632 4118.4055 4162.2559 4149.9778 4362.2136 4078.0632,将以上数据整理成逗号隔开

<think>嗯,用户给了一大堆数据,每个数字之间用换行隔开,现在需要把这些数据整理成逗号分隔的形式。首先,我需要确认这些数据的结构是否正确,有没有多余的空格或者换行。 看一下用户提供的数据,看起来每个数字占据一行,中间可能有一些空行。我需要先把所有数据合并成一行,用逗号连接起来。不过,在处理过程中要注意检查是否有重复的数字或者格式错误,比如小数点后有多位或者数字之间有其他符号。 接下来,我需要按顺序处理每个数字。可能有些数字后面跟着换行,而有些可能有多个换行分隔。需要确保每个数字之间只有一个逗号,并且没有多余的空格。另外,还要检查是否有数字被拆分成了多行的情况,但根据用户提供的数据,似乎
recommend-type

51单片机串口通信与实时电压电流监测系统设计

标题中提到的知识点主要包括51单片机、串口主从机通信、L1602液晶屏显示以及实时电流和电压监测。这些知识点涉及到微控制器(MCU)基础、串行通信协议、嵌入式系统编程、模拟电路设计和数字电路设计。 1. **51单片机**:这是一种基于Intel 8051架构的微控制器。51单片机是早期被广泛使用的8位微控制器之一,具有成本效益高、架构简单、使用方便等特点,非常适合用于教学和简单的嵌入式系统设计。51单片机的核心包括中央处理器(CPU)、只读存储器(ROM)、随机存取存储器(RAM)、定时/计数器、串口通信接口等基本单元,以及一些特殊功能寄存器。 2. **串口主从机通信**:在串口通信中,通常把设备分为两类:主机(Master)和从机(Slave)。在本项目中,主从机通信指的是两个或多个51单片机之间的数据交换。主单片机负责发送控制信号和接收数据,而从单片机则根据主机的命令发送或接收数据。这种通信方式广泛应用于工业控制系统、智能家居等领域。串口通信涉及到信号的发送与接收协议、数据包格式定义以及通信速率的设置等。 3. **L1602液晶屏显示**:L1602是一个常见的字符型液晶显示模块,通常用于显示字母、数字和一些特殊符号。这种模块具有并行接口,能够显示2行每行16个字符。在本项目中,L1602液晶屏被用来显示实时的电流值和系统的工作状态。实现这一功能需要编写相应的字符映射代码以及控制逻辑。 4. **实时电流和电压监测**:这是指通过模拟电路(如运算放大器)将电流和电压信号转换为单片机可以读取的数字信号。单片机根据这些信号通过编程进行处理,以便监测电压电流值是否在正常范围内。在异常情况下,系统需要执行相应的报警或者安全措施,如本例中的亮灯和继电器动作。 描述中详细说明了系统的运行逻辑:正常情况下,从机检测到的电压电流在预设的正常范围内,主机通过串口通信接收到这些数据,并在L1602屏幕上显示"Normal"。如果检测到的电压超出正常范围,则主机显示"Warning",同时执行继电器切换到备用电源的逻辑,并由从机发出警告信号。 标签中提到了**信号处理、电路仿真、设计**。信号处理是将模拟信号转换为数字信号,以便单片机处理,涉及到模数转换(ADC);电路仿真则是在实际电路搭建前,使用仿真软件(如Proteus)模拟电路的工作情况,验证设计的电路是否按照预期工作;设计则是指整个项目的系统设计,包括硬件设计和软件编程。 最后,提到的“串口通信检测发送”是文件名称列表中的一个文件。根据这个名称可以推测,该文件可能包含了实现串口通信、数据检测和发送的代码或设计图纸。 在进行本项目时,需要的技能和知识点包括但不限于: - 51单片机的编程和使用,包括其寄存器的配置、定时器的使用、串口通信编程等; - 串行通信协议的实现,理解波特率、起始位、停止位、奇偶校验位等参数的配置; - L1602液晶显示屏的编程,包括如何向显示屏发送数据和控制指令; - 电压和电流信号的检测与处理,需要设计电路将模拟信号转换为数字信号,然后单片机进行处理; - 使用Proteus软件进行电路仿真,以及使用Keil软件进行程序的编写和调试; - 电路设计和印刷电路板(PCB)设计知识,如果涉及到实物设备的生产。 总的来说,这个项目不仅涉及嵌入式系统的软件编程,还包括硬件电路的设计和搭建,是一个综合性的工程项目。通过这个项目的实践,可以对单片机应用、电路设计和嵌入式系统开发有一个全面的了解和应用。
recommend-type

【插件开发从零开始】:TerraExplorer Pro6.5自定义功能开发全教程(创意无限!)

# 摘要 本文为那些对TerraExplorer Pro6.5插件开发感兴趣的读者提供了一个全面的指南。首先,我们介绍插件开发的基础知识和所需的工具。接着,文章深入探讨了TerraExplorer Pro6.5的架构和API,为开发自定义功能打下坚实基础。我们通过用户界面设计、事件处理、3D模型集成和数据可视化等实践案例,帮助读者理解基本和高级功能的开发过程。此外,本文还详述
recommend-type

永磁同步电机工作原理新手入门 通俗易懂的解释

<think>嗯,用户想了解永磁同步电机的工作原理,特别是针对新手的通俗解释。首先,我需要回忆一下永磁同步电机的基本结构和工作原理。可能要从定子和转子开始讲起,因为这是电机的主要部分。定子通常有绕组,通电后产生旋转磁场,而转子装有永磁体,跟随磁场同步旋转。 用户提到是新手,所以需要用简单的比喻,比如旋转木马或者磁铁相互吸引的例子。要避免使用太多专业术语,或者如果用了,需要解释清楚。比如,旋转磁场的概念,可以比喻成一群人手拉手转圈,带动中间的磁铁跟着转。 可能需要分步骤解释:定子通电产生旋转磁场,转子永磁体被吸引同步转动,转速与磁场同步。这里要强调“同步”的含义,即转子的速度与磁场旋转速度一
recommend-type

免费分享蓝色宽屏PC+wap山区教育成果网站模板

### 知识点一:HTML5网页模板 HTML5是一种用于构建网页的标记语言,它用于描述网页的结构以及内容的呈现方式。网页模板则是一种预先设计的网页布局框架,通常包含HTML和CSS代码,有时也会包括JavaScript,用来为开发者提供一个快速搭建网页的起点。 在这个提供的标题“蓝色宽屏形式pc+wap山区教育成果网页模板代码.zip”中,我们可以看到关键字“蓝色宽屏”,这通常指的是网页模板的设计风格,即模板采用宽屏设计,以蓝色为基调。关键词“pc+wap”意味着该网页模板是响应式的,它可以兼容个人电脑(PC)和移动设备(wap,即Wireless Application Protocol,无线应用协议)的屏幕尺寸和分辨率,提供良好的用户体验。 ### 知识点二:静态化H5网站模版源码 静态化通常是指网站内容的固定不变,即用户每次访问时页面内容不会根据服务器端的动态数据进行变化。在给定描述中,“静态化H5网站模版源码”指的是这些模板使用了HTML5和CSS技术,不需要服务器端的脚本语言如PHP或ASP来动态生成页面内容。它们是预设的页面布局,可以直接上传到服务器供用户访问,无需后端程序的介入。 ### 知识点三:PC+wap平面广告设计网页模版代码 “PC+wap平面广告设计网页模版代码”这里提到了两个方面。首先是“PC”,指的是网页模板适用于电脑端的显示;“wap”则特指适用于手机端的网页浏览。结合“平面广告设计”,这些模板可能被设计用于展示图形、图片和文字信息,用于广告宣传目的。 ### 知识点四:工作室H5模版,公司官网HTML5模版 “工作室H5模版”可能指的是一种专为设计工作室、创意工作室等小型企业设计的网页模板,强调个性化和创意表现。而“公司官网HTML5模版”则更普遍,是为各类公司的官方网站提供的网页模板。两者都表明了模板的适用范围和目的。 ### 知识点五:响应式极简公司宣传网站模板 “响应式”再次强调了模板的自适应性,能够适应不同屏幕尺寸和设备。极简主义的设计理念则体现在“极简”,这通常意味着网页设计去繁从简,减少不必要的装饰元素,突出内容主体,保持页面清爽、易于导航,并提升加载速度。公司宣传网站模板则意在突出宣传公司的信息,如公司介绍、服务、产品、新闻和联系方式等。 ### 知识点六:标签含义解析 - **html5**: 标签表明该网页模板是基于HTML5技术开发的。 - **网站模版**: 表示提供的是一套设计好的网页布局。 - **H5响应式**: 强调网页模板可适配多种设备,如电脑、平板和手机等。 - **静态化H5**: 指模板是静态的,不需要服务器端支持来生成动态内容。 - **公司官网html5**: 指明这些模板适合用于公司官方网页的创建。 ### 综合分析 给定文件信息为一套HTML5网页模板的压缩包,包含了用于创建具有响应式特性的静态网页源码。文件适合用于设计工作室、公司官网以及用于教育成果展示的网站。模板强调了简洁的视觉设计,且设计成兼容多种设备,以确保用户在不同终端上都能获得良好的浏览体验。 根据这些信息,开发者和设计师可以利用这些模板来快速搭建一个视觉吸引且功能完善的企业官网或教育成果展示网站。同时,由于其静态化和响应式的特性,无需编写复杂的后端代码,就可以实现跨平台的展示效果,大大加快了网站开发的进程。对于个人或企业来说,使用这样的模板可以节省时间和资源,同时保持网站的专业形象。