活动介绍

6.请定义一个5*3的随机序列DataFrame索引对象,将前3行,第1列以及前2行,第2列设置为NA值,并通过字典形式以及fillna()方法填充0.55和0.66的值。

时间: 2023-06-16 18:02:58 浏览: 137
可以按照以下步骤定义DataFrame并进行处理: ```python import pandas as pd import numpy as np # 定义5*3的随机序列DataFrame索引对象 index = pd.Index(['a', 'b', 'c', 'd', 'e'], name='rows') columns = pd.Index(['A', 'B', 'C'], name='cols') df = pd.DataFrame(np.random.randn(5, 3), index=index, columns=columns) # 将前3行,第1列以及前2行,第2列设置为NA值 df.iloc[:3, 0] = np.nan df.iloc[:2, 1] = np.nan # 通过字典形式填充0.55和0.66的值 fill_dict = {'A': 0.55, 'B': 0.66} df.fillna(fill_dict, inplace=True) # 打印结果 print(df) ``` 输出结果如下: ``` cols A B C rows a 0.550000 0.660000 -0.208848 b 0.550000 0.660000 -1.164982 c 0.550000 -0.546279 0.317201 d -0.292064 -0.216926 -1.478362 e -1.028901 0.140778 1.139728 ``` 可以看到,前3行第1列和前2行第2列的NA值已经被填充为0.55和0.66了。
阅读全文

相关推荐

# -*- coding: utf-8 -*- """ Created on Wed Oct 16 21:45:32 2024 @author: xyhcn """ #(1)箱形图(Box plot),是一种用作显示一组数据分散情况资料的统计图。 ## 箱线图可以深入了解数据的分布特性,其中非异常值最常见的定义是: ## [Q1 - 1.5xIQR, Q3 + 1.5xIQR],如果是区间外的值就被视为outlier并显示在图上。 ## Q1:第一四分位数 ## median:是中位数 ## Q3:第三四分位数 ## IQR=Q3-Q1:四分位间距 #(2)随机生成数据 ## 随机生成一组以正态分布均值等于10的数据 import numpy as np import matplotlib.pyplot as plt import pandas as pd import seaborn as sns ## 解决中文乱码问题 plt.rcParams['font.sans-serif'] = ['Simhei'] data = np.random.normal(loc = 10, scale = 1, size = 1000) ## 在Pandas中,DataFrame.describe() 方法是一个非常有用的工具, ## 它可以生成关于DataFrame数据的描述性统计信息。 ## 这些统计信息包括了数据集分布的中心趋势、离散程度和形状的概括,不包括NaN值。 ## 它可以分析数值和对象序列,以及混合数据类型的DataFrame列集。 df = pd.DataFrame(data) print(df.describe()) ##正态的标准化直方图 sns.displot(data, bins=100, color='b') #(3)寻找异常值 ## 找到异常值所在范围:小于Q1 - 1.5 x IQR,以及大于 Q3 + 1.5 x IQR] ## 直接使用matplotlib.pyplot.boxplot的返回值whiskers获取两极端的非异常数据点 ## 绘图后获取它们,当绘制箱线图完成时,使用get_ydata()方法 ## 它的返回值返回一个二维数组,第二个元素就是我们想要的值(上边缘和下边缘) fig = plt.figure(figsize=(10,8)) ax1 = fig.add_subplot(1,2,1) bp = ax1.boxplot(data) lower_whisker = [item.get_ydata()[1] for item in bp['whiskers']][0] upper_whisker = [item.get_ydata()[1] for item in bp['whiskers']][1] print("上边缘:", upper_whisker) print("下边缘:", lower_whisker) print("非异常范围:", [lower_whisker,upper_whisker]) ##寻找异常值 outlier = data[(data < lower_whisker) | (data > upper_whisker)] print("异常值(离散值):", outlier) #(4)删除异常值 ##删除异常值 not_outlier = data[(data > lower_whisker) & (data < upper_whisker)] ##重新检查是否还有异常值 ax2 = fig.add_subplot(1,2,2,sharey =ax1) ax2.boxplot(not_outlier, boxprops={'color':'red'}, #设置上下边缘的显示样式 capprops={'color':'green', 'lw':4, 'ls':'--'}, #设置异常值的显示样式 flierprops={'marker':'*', 'markersize': 8}) ax1.set_xlabel('删除异常值前') ax2.set_xlabel('删除异常值后')

import torch import torch.nn as nn import torch.nn.functional as F import os import numpy as np import pandas as pd import pickle from torch.utils.data import Dataset, DataLoader, random_split from torch.optim import Adam # 配置文件路径 LABEL_FILE = r'E:\Python_Eswin\growing\dataset_202507_10spec\necking_ingot1_2_match_cleaned_data_above_50rows_cleaned.csv' SPEED_DIR = r'E:\Python_Eswin\growing\dataset_202507_10spec\ingot_tau_cal\Pull Speed Actual_embedded_5' ADC_DIR = r'E:\Python_Eswin\growing\dataset_202507_10spec\ingot_tau_cal\ADC Actual V_embedded_6' MELTGAP_DIR = r'E:\Python_Eswin\growing\dataset_202507_10spec\ingot_tau_cal\Melt_Gap_embedded_5' SPEC_FILE = r'E:\Python_Eswin\growing\dataset_202507_10spec\ingot_10_spec_length_cleaned_above_50rows.csv' MODEL_PATH = r"D:\anaconda\envs\pytorch\GrowingProject\lstm\for_10spec\saved_model\best_model.pth" TEST_SET_PATH = r"D:\anaconda\envs\pytorch\GrowingProject\lstm\for_10spec\saved_model\test_set.pkl" # 确保目录存在 os.makedirs(os.path.dirname(MODEL_PATH), exist_ok=True) class PullingDataset(Dataset): def __init__(self, label_df, speed_dir, adc_dir, meltgap_dir, spec_file): # 保存原始标签DataFrame self.labels = label_df self.speed_dir = speed_dir self.adc_dir = adc_dir self.meltgap_dir = meltgap_dir self.spec_data = self._load_spec_data(spec_file) # 加载全局参数数据 # 预计算所有序列长度并过滤有效样本 self.lengths = [] self.valid_indices = [] # 调试信息:打印全局参数文件中的ID数量 print(f"全局参数文件中找到的铸锭ID数量: {len(self.spec_data)}") print(f"前5个铸锭ID: {list(self.spec_data.keys())[:5]}") for idx, ingot_id in enumerate(self.labels.iloc[:, 0]): # 确保ID为字符串并去除可能的前后空格 ingot_id_str = str(ingot_id).strip() # 调试信息:打印当前处理的ID if idx < 5: # 只打印前5个用于调试 print(f"处理标签文件中的铸锭ID: '{ingot_id_str}' (索引: {idx})") # 检查所有必需文件是否存在 speed_path = os.path.join(speed_dir, f"{ingot_id_str}_embedded_output.csv") adc_path = os.path.join(adc_dir, f"{ingot_id_str}_embedded_output.csv") meltgap_path = os.path.join(meltgap_dir, f"{ingot_id_str}_embedded_output.csv") file_exists = ( os.path.exists(speed_path) and os.path.exists(adc_path) and os.path.exists(meltgap_path) and ingot_id_str in self.spec_data ) if file_exists: try: # 获取各序列长度(跳过第一行索引) speed_df = pd.read_csv(speed_path, header=0) adc_df = pd.read_csv(adc_path, header=0) meltgap_df = pd.read_csv(meltgap_path, header=0) speed_len = len(speed_df) adc_len = len(adc_df) meltgap_len = len(meltgap_df) # 取最小长度 min_len = min(speed_len, adc_len, meltgap_len) if min_len > 0: self.lengths.append(min_len) self.valid_indices.append(idx) else: print(f"警告: {ingot_id_str} 文件有空数据") except Exception as e: print(f"读取文件错误: {ingot_id_str} - {e}") else: missing_files = [] if not os.path.exists(speed_path): missing_files.append("speed") if not os.path.exists(adc_path): missing_files.append("ADC") if not os.path.exists(meltgap_path): missing_files.append("MeltGap") if ingot_id_str not in self.spec_data: missing_files.append("spec_params") # 调试信息:检查为什么找不到spec_params print(f"在全局参数文件中找不到ID: '{ingot_id_str}'") if len(self.spec_data) > 0: print(f"全局参数文件中的ID示例: {list(self.spec_data.keys())[:5]}") print(f"文件缺失: {ingot_id_str} ({', '.join(missing_files)})") # 过滤有效样本 if len(self.valid_indices) > 0: self.labels = self.labels.iloc[self.valid_indices] print(f"有效样本数量: {len(self.labels)}") # 如果没有有效样本,打印更多信息 if len(self.labels) == 0: print("错误: 没有找到有效样本!") print(f"标签文件中的ID示例: {self.labels.iloc[:5, 0].tolist()}") print(f"全局参数文件中的ID示例: {list(self.spec_data.keys())[:5]}") print("请检查ID是否匹配(包括类型和格式)") def _load_spec_data(self, file_path): """加载全局参数文件,返回{ingot_id: 特征向量}""" try: df = pd.read_csv(file_path, header=0) print(f"成功加载全局参数文件: {file_path}") print(f"文件包含 {len(df)} 行数据") except Exception as e: print(f"加载全局参数文件错误: {e}") return {} spec_dict = {} for _, row in df.iterrows(): # 处理ID格式:先转换为浮点数再转为整数,最后转为字符串(去除小数部分) try: ingot_id = str(int(float(row.iloc[0]))) # 关键修改:处理浮点型ID spec_dict[ingot_id] = row.iloc[1:10].values.astype(np.float32) except ValueError as e: print(f"转换ID错误: {row.iloc[0]} - {e}") continue print(f"全局参数文件中找到的铸锭ID数量: {len(spec_dict)}") print(f"处理后ID示例: {list(spec_dict.keys())[:5]}") return spec_dict def __len__(self): return len(self.labels) def __getitem__(self, idx): row = self.labels.iloc[idx] ingot_id = str(row.iloc[0]).strip() # 确保ingot_id是字符串并去除空格 label = row.iloc[11] # 第12列: 标签 (0或1) try: # 加载所有序列数据(跳过第一行索引) speed_data = pd.read_csv( os.path.join(self.speed_dir, f"{ingot_id}_embedded_output.csv"), header=0 ).values.astype(np.float32) adc_data = pd.read_csv( os.path.join(self.adc_dir, f"{ingot_id}_embedded_output.csv"), header=0 ).values.astype(np.float32) meltgap_data = pd.read_csv( os.path.join(self.meltgap_dir, f"{ingot_id}_embedded_output.csv"), header=0 ).values.astype(np.float32) # 获取全局参数向量 spec_vector = self.spec_data[ingot_id] # 取最小长度 min_len = min(len(speed_data), len(adc_data), len(meltgap_data)) # 截取相同长度的序列 speed_data = speed_data[:min_len, :] adc_data = adc_data[:min_len, :] meltgap_data = meltgap_data[:min_len, :] # 创建全局参数矩阵(复制min_len次) spec_matrix = np.tile(spec_vector, (min_len, 1)) # 合并所有特征 combined_data = np.concatenate([ speed_data, # 5维 adc_data, # 6维 meltgap_data, # 5维 spec_matrix # 9维 ], axis=1) # 转换为tensor seq = torch.tensor(combined_data, dtype=torch.float32) label = float(label) label = torch.tensor(label, dtype=torch.float32) return seq, label, min_len except Exception as e: print(f"错误: 无法处理铸锭 {ingot_id}: {e}") return torch.zeros(0), torch.tensor(0.0), 0 def collate_fn(batch): sequences, labels, lengths = zip(*batch) max_len = max(lengths) if lengths else 0 # 过滤掉空序列 valid_indices = [i for i, seq in enumerate(sequences) if len(seq) > 0] if not valid_indices: return torch.zeros(0), torch.zeros(0), torch.zeros(0) sequences = [sequences[i] for i in valid_indices] labels = [labels[i] for i in valid_indices] lengths = [lengths[i] for i in valid_indices] # 获取特征维度 feat_dim = sequences[0].shape[1] if sequences[0].nelement() > 0 else 0 # 填充序列 padded_seqs = torch.zeros(len(sequences), max_len, feat_dim) for i, seq in enumerate(sequences): padded_seqs[i, :lengths[i]] = seq lengths = torch.tensor(lengths, dtype=torch.long) labels = torch.stack(labels) return padded_seqs, labels, lengths def save_test_set(test_dataset, path): test_data = [] for i in range(len(test_dataset)): test_data.append(test_dataset[i]) with open(path, 'wb') as f: pickle.dump(test_data, f) class SpeedLSTM(nn.Module): def __init__(self, input_dim=25, hidden_dim=128, num_layers=2, dropout=0.2): super().__init__() # 添加LayerNorm稳定训练 self.input_norm = nn.LayerNorm(input_dim) self.lstm = nn.LSTM( input_size=input_dim, hidden_size=hidden_dim, num_layers=num_layers, bidirectional=True, batch_first=True, dropout=dropout if num_layers > 1 else 0 ) # 添加梯度裁剪和更稳定的分类器 self.classifier = nn.Sequential( nn.LayerNorm(2 * hidden_dim), nn.Linear(2 * hidden_dim, 64), nn.ReLU(), nn.Dropout(dropout), nn.LayerNorm(64), nn.Linear(64, 32), nn.ReLU(), nn.Dropout(dropout), nn.LayerNorm(32), nn.Linear(32, 1) ) def forward(self, x, lengths): if x.nelement() == 0: return torch.zeros(x.size(0)), None x = self.input_norm(x) # 输入标准化 packed = nn.utils.rnn.pack_padded_sequence(x, lengths.cpu(), batch_first=True, enforce_sorted=False) packed_out, _ = self.lstm(packed) out, _ = nn.utils.rnn.pad_packed_sequence(packed_out, batch_first=True) # 取最后一个有效时间步 last_output = out[torch.arange(out.size(0)), lengths - 1] # 梯度裁剪 torch.nn.utils.clip_grad_norm_(self.parameters(), max_norm=1.0) return self.classifier(last_output).squeeze(-1), None def save_model(model, MODEL_PATH): os.makedirs(os.path.dirname(MODEL_PATH), exist_ok=True) try: torch.save(model.state_dict(), MODEL_PATH) print(f"模型成功保存至: {MODEL_PATH}") except Exception as e: print(f"模型保存失败: {e}") def load_model(path, input_dim=25): model = SpeedLSTM(input_dim) model.load_state_dict(torch.load(path, map_location=torch.device('cpu'))) model.eval() print(f"从 {path} 加载模型") return model def main(): print(f"使用设备: {'GPU' if torch.cuda.is_available() else 'CPU'}") # 加载标签数据 - 跳过第一行索引 try: # 使用header=0跳过第一行索引 labels_df = pd.read_csv(LABEL_FILE, header=0) print(f"加载标签数据: {labels_df.shape[0]} 行") except Exception as e: print(f"加载数据错误: {e}") return # 创建完整数据集 full_dataset = PullingDataset( labels_df, SPEED_DIR, ADC_DIR, MELTGAP_DIR, SPEC_FILE ) if len(full_dataset) == 0: print("错误: 没有有效样本!") return # 检查第一个样本获取输入维度 sample, label, length = full_dataset[0] if len(sample) > 0: input_dim = sample.shape[1] print(f"检测到输入维度: {input_dim} (5拉速 + 6ADC + 5MeltGap + 9全局参数)") else: print("警告: 无法获取样本维度, 使用默认值25") input_dim = 25 # 划分训练/验证集 (80%/20%) train_size = int(0.8 * len(full_dataset)) test_size = len(full_dataset) - train_size train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size]) # 保存测试集 save_test_set(test_dataset, TEST_SET_PATH) print(f"保存测试集到 {TEST_SET_PATH}") # 创建数据加载器 train_loader = DataLoader( train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn) # 初始化模型 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = SpeedLSTM(input_dim=input_dim).to(device) # 计算类别权重 labels = [] for i in range(len(full_dataset)): try: _, label_val, _ = full_dataset[i] labels.append(float(label_val.item())) except Exception as e: print(f"错误: 获取标签值失败: {e}") pos_count = sum(labels) neg_count = len(labels) - pos_count pos_weight = torch.tensor([neg_count / max(pos_count, 1)]).to(device) print(f"类别分布: 负样本 {neg_count}, 正样本 {pos_count}") print(f"正样本权重: {pos_weight.item():.2f}") criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight) optimizer = Adam(model.parameters(), lr=0.0001, weight_decay=1e-4) torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) # 训练参数 best_val_loss = float('inf') epochs = 100 early_stop_patience = 20 no_improve_count = 0 print("\n开始训练...") for epoch in range(epochs): model.train() train_loss = 0.0 train_correct = 0 train_total = 0 for inputs, labels, lengths in train_loader: if inputs.nelement() == 0: continue inputs, labels = inputs.to(device), labels.to(device) optimizer.zero_grad() outputs, _ = model(inputs, lengths) loss = criterion(outputs, labels) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() preds = (torch.sigmoid(outputs) > 0.5).float() train_correct += (preds == labels).sum().item() train_total += labels.size(0) train_loss += loss.item() * inputs.size(0) # 计算平均损失和准确率 train_loss = train_loss / train_total if train_total > 0 else 0 train_acc = train_correct / train_total if train_total > 0 else 0 print(f"Epoch {epoch + 1}/{epochs}: " f"Train Loss: {train_loss:.4f}, " f"Acc: {train_acc:.4f}") # 保存最佳模型 if train_loss < best_val_loss: best_val_loss = train_loss save_model(model, MODEL_PATH) no_improve_count = 0 print(f"保存最佳模型 (loss={best_val_loss:.4f})") else: no_improve_count += 1 if no_improve_count >= early_stop_patience: print(f"早停: {early_stop_patience}个epoch无改进") break print(f"\n训练完成! 最佳模型保存至: {MODEL_PATH}") if __name__ == "__main__": main() 我想要优化这个二分类预测模型的准确性,目前准确度在50%左右

帮我改一下下面这串代码,要求把次日(x+1日)收盘价大于次日(x+1日)开盘价百分之五的条件改成次日(x+1日)开盘价比昨日(x-1日)收盘价高出百分之十,且今日(x日)收盘价不高出今日(x日)开盘价百分之五。删除次日(x+1日)最低价不低于次日(x+1日)开盘价百分之九十八的限制条件。然后,输出的预测模型命名为尾盘选股预测模型,输出的聚类模型命名为微盘选股聚类模型。 # -*- coding: utf-8 -*- """ Created on Sun Jul 20 16:00:01 2025 @author: srx20 """ import os import gc import numpy as np import pandas as pd import joblib import talib as ta from tqdm import tqdm import random from sklearn.cluster import MiniBatchKMeans from sklearn.preprocessing import StandardScaler from sklearn.model_selection import RandomizedSearchCV, GroupKFold from sklearn.feature_selection import SelectKBest, f_classif from sklearn.metrics import make_scorer, recall_score, classification_report import lightgbm as lgb import logging import psutil import warnings from scipy import sparse warnings.filterwarnings('ignore') # 设置日志记录 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('stock_prediction_fixed.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # ========== 配置类 ========== class StockConfig: def __init__(self): # 数据路径 self.SH_PATH = r"D:\股票量化数据库\股票csv数据\上证" self.SZ_PATH = r"D:\股票量化数据库\股票csv数据\深证" # 时间范围 self.START_DATE = "2011-1-1" self.END_DATE = "2024-1-1" self.TEST_START = "2024-1-1" self.TEST_END = "2025-7-18" # 聚类设置 self.CLUSTER_NUM = 8 self.CLUSTER_FEATURES = [ 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist' ] # 预测特征 (初始列表,实际使用时会动态更新) self.PREDICT_FEATURES = [ 'open', 'high', 'low', 'close', 'volume', 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist', 'cluster', 'MOM10', 'ATR14', 'VWAP', 'RSI_diff', 'price_vol_ratio', 'MACD_RSI', 'advance_decline', 'day_of_week', 'month' ] # 模型参数优化范围(内存优化版) self.PARAM_GRID = { 'boosting_type': ['gbdt'], # 减少选项 'num_leaves': [31, 63], # 减少选项 'max_depth': [-1, 7], # 减少选项 'learning_rate': [0.01, 0.05], 'n_estimators': [300, 500], # 减少选项 'min_child_samples': [50], # 固定值 'min_split_gain': [0.0, 0.1], 'reg_alpha': [0, 0.1], 'reg_lambda': [0, 0.1], 'feature_fraction': [0.7, 0.9], 'bagging_fraction': [0.7, 0.9], 'bagging_freq': [1] } # 目标条件 self.MIN_GAIN = 0.05 self.MIN_LOW_RATIO = 0.98 # 调试模式 self.DEBUG_MODE = False self.MAX_STOCKS = 50 if self.DEBUG_MODE else None self.SAMPLE_FRACTION = 0.3 if not self.DEBUG_MODE else 1.0 # 采样比例 # ========== 内存管理工具 (修复版) ========== def reduce_mem_usage(df): """优化DataFrame内存使用,只处理数值列""" start_mem = df.memory_usage().sum() / 1024**2 # 只处理数值列 numeric_cols = df.select_dtypes(include=['int', 'float', 'integer']).columns for col in numeric_cols: col_type = df[col].dtype if col_type != object: c_min = df[col].min() c_max = df[col].max() if str(col_type)[:3] == 'int': if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16) elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max: df[col] = df[col].astype(np.int64) else: if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: df[col] = df[col].astype(np.float16) elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: df[col] = df[col].astype(np.float32) else: df[col] = df[col].astype(np.float64) end_mem = df.memory_usage().sum() / 1024**2 logger.info(f'内存优化: 从 {start_mem:.2f} MB 减少到 {end_mem:.2f} MB ({100*(start_mem-end_mem)/start_mem:.1f}%)') return df def print_memory_usage(): """打印当前内存使用情况""" process = psutil.Process(os.getpid()) mem = process.memory_info().rss / (1024 ** 2) logger.info(f"当前内存使用: {mem:.2f} MB") # ========== 数据加载 (修复版) ========== def load_stock_data(sh_path, sz_path, start_date, end_date, sample_fraction=1.0, debug_mode=False, max_stocks=None): """加载股票数据,并过滤日期范围(修复随机抽样问题)""" stock_data = {} # 创建文件列表 all_files = [] for exchange, path in [('SH', sh_path), ('SZ', sz_path)]: if os.path.exists(path): csv_files = [f for f in os.listdir(path) if f.endswith('.csv')] for file in csv_files: all_files.append((exchange, path, file)) if not all_files: logger.warning("没有找到任何CSV文件") return stock_data # 随机抽样(修复一维问题) if sample_fraction < 1.0: sample_size = max(1, int(len(all_files) * sample_fraction)) # 使用random.sample代替np.random.choice all_files = random.sample(all_files, sample_size) logger.info(f"抽样 {len(all_files)} 只股票文件 (比例: {sample_fraction})") total_files = len(all_files) pbar = tqdm(total=total_files, desc='加载股票数据') loaded_count = 0 for exchange, path, file in all_files: if max_stocks is not None and loaded_count >= max_stocks: break if file.endswith('.csv'): stock_code = f"{exchange}_{file.split('.')[0]}" file_path = os.path.join(path, file) try: # 读取数据并验证列名 df = pd.read_csv(file_path) # 验证必要的列是否存在 required_cols = ['date', 'open', 'high', 'low', 'close', 'volume'] if not all(col in df.columns for col in required_cols): logger.warning(f"股票 {stock_code} 缺少必要列,跳过") pbar.update(1) continue # 转换日期并过滤 df['date'] = pd.to_datetime(df['date']) df = df[(df['date'] >= start_date) & (df['date'] <= end_date)] if len(df) < 50: # 至少50个交易日 logger.info(f"股票 {stock_code} 数据不足({len(df)}条),跳过") pbar.update(1) continue # 转换数据类型 for col in ['open', 'high', 'low', 'close']: df[col] = pd.to_numeric(df[col], errors='coerce').astype(np.float32) df['volume'] = pd.to_numeric(df['volume'], errors='coerce').astype(np.uint32) # 删除包含NaN的行 df = df.dropna(subset=required_cols) if len(df) > 0: stock_data[stock_code] = df loaded_count += 1 logger.debug(f"成功加载股票 {stock_code},数据条数: {len(df)}") else: logger.warning(f"股票 {stock_code} 过滤后无数据") except Exception as e: logger.error(f"加载股票 {stock_code} 失败: {str(e)}", exc_info=True) pbar.update(1) # 调试模式只处理少量股票 if debug_mode and loaded_count >= 10: logger.info("调试模式: 已加载10只股票,提前结束") break pbar.close() logger.info(f"成功加载 {len(stock_data)} 只股票数据") return stock_data # ========== 特征工程 (修复版) ========== class FeatureEngineer: def __init__(self, config): self.config = config def safe_fillna(self, series, default=0): """安全填充NaN值""" if isinstance(series, pd.Series): return series.fillna(default) elif isinstance(series, np.ndarray): return np.nan_to_num(series, nan=default) return series def transform(self, df): """添加技术指标特征(修复NumPy数组问题)""" try: # 创建临时副本用于TA-Lib计算 df_temp = df.copy() # 将价格列转换为float64以满足TA-Lib要求 for col in ['open', 'high', 'low', 'close']: df_temp[col] = df_temp[col].astype(np.float64) # 基础特征 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 技术指标 - 修复NumPy数组问题 rsi = ta.RSI(df_temp['close'].values, timeperiod=14) df['RSI14'] = self.safe_fillna(rsi, 50) macd, macd_signal, macd_hist = ta.MACD( df_temp['close'].values, fastperiod=12, slowperiod=26, signalperiod=9 ) df['MACD_hist'] = self.safe_fillna(macd_hist, 0) # 新增特征 mom = ta.MOM(df_temp['close'].values, timeperiod=10) df['MOM10'] = self.safe_fillna(mom, 0) atr = ta.ATR( df_temp['high'].values, df_temp['low'].values, df_temp['close'].values, timeperiod=14 ) df['ATR14'] = self.safe_fillna(atr, 0) # 成交量加权平均价 vwap = (df['volume'] * (df['high'] + df['low'] + df['close']) / 3).cumsum() / df['volume'].cumsum() df['VWAP'] = self.safe_fillna(vwap, 0) # 相对强弱指数差值 df['RSI_diff'] = df['RSI14'] - df['RSI14'].rolling(5).mean().fillna(0) # 价格波动比率 df['price_vol_ratio'] = df['price_change'] / (df['volatility'].replace(0, 1e-8) + 1e-8) # 技术指标组合特征 df['MACD_RSI'] = df['MACD_hist'] * df['RSI14'] # 市场情绪指标 df['advance_decline'] = (df['close'] > df['open']).astype(int).rolling(5).sum().fillna(0) # 时间特征 df['day_of_week'] = df['date'].dt.dayofweek df['month'] = df['date'].dt.month # 处理无穷大和NaN df = df.replace([np.inf, -np.inf], np.nan) df = df.fillna(0) # 优化内存(只处理数值列) return reduce_mem_usage(df) except Exception as e: logger.error(f"特征工程失败: {str(e)}", exc_info=True) # 返回基本特征作为回退方案 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 填充缺失的技术指标 for col in self.config.PREDICT_FEATURES: if col not in df.columns: df[col] = 0 return df # ========== 聚类模型 (添加保存/加载功能) ========== class StockCluster: def __init__(self, config): self.config = config self.scaler = StandardScaler() self.kmeans = MiniBatchKMeans( n_clusters=config.CLUSTER_NUM, random_state=42, batch_size=1000 ) self.cluster_map = {} # 股票代码到聚类ID的映射 self.model_file = "stock_cluster_model.pkl" # 模型保存路径 def save(self): """保存聚类模型到文件""" # 创建包含所有必要组件的字典 model_data = { 'kmeans': self.kmeans, 'scaler': self.scaler, 'cluster_map': self.cluster_map, 'config_cluster_num': self.config.CLUSTER_NUM } # 使用joblib保存模型 joblib.dump(model_data, self.model_file) logger.info(f"聚类模型已保存到: {self.model_file}") def load(self): """从文件加载聚类模型""" if os.path.exists(self.model_file): model_data = joblib.load(self.model_file) self.kmeans = model_data['kmeans'] self.scaler = model_data['scaler'] self.cluster_map = model_data['cluster_map'] logger.info(f"从 {self.model_file} 加载聚类模型") return True else: logger.warning("聚类模型文件不存在,需要重新训练") return False def fit(self, stock_data): """训练聚类模型""" logger.info("开始股票聚类分析...") cluster_features = [] # 提取每只股票的特征 for stock_code, df in tqdm(stock_data.items(), desc="提取聚类特征"): if len(df) < 50: # 至少50个交易日 continue features = {} for feat in self.config.CLUSTER_FEATURES: if feat in df.columns: # 使用统计特征 features[f"{feat}_mean"] = df[feat].mean() features[f"{feat}_std"] = df[feat].std() else: # 特征缺失时填充0 features[f"{feat}_mean"] = 0 features[f"{feat}_std"] = 0 cluster_features.append(features) if not cluster_features: logger.warning("没有可用的聚类特征,使用默认聚类") # 创建默认聚类映射 self.cluster_map = {code: 0 for code in stock_data.keys()} return self # 创建特征DataFrame feature_df = pd.DataFrame(cluster_features) feature_df = reduce_mem_usage(feature_df) # 标准化特征 scaled_features = self.scaler.fit_transform(feature_df) # 聚类 self.kmeans.fit(scaled_features) clusters = self.kmeans.predict(scaled_features) feature_df['cluster'] = clusters # 创建股票到聚类的映射 stock_codes = list(stock_data.keys())[:len(clusters)] # 确保长度匹配 for i, stock_code in enumerate(stock_codes): self.cluster_map[stock_code] = clusters[i] logger.info("聚类分布统计:") logger.info(feature_df['cluster'].value_counts().to_string()) logger.info(f"股票聚类完成,共分为 {self.config.CLUSTER_NUM} 个类别") # 训练完成后自动保存模型 self.save() return self def transform(self, df, stock_code): """为数据添加聚类特征""" cluster_id = self.cluster_map.get(stock_code, -1) # 默认为-1表示未知聚类 df['cluster'] = cluster_id return df # ========== 目标创建 ========== class TargetCreator: def __init__(self, config): self.config = config def create_targets(self, df): """创建目标变量 - 修改为收盘价高于开盘价5%""" # 计算次日收盘价相对于开盘价的涨幅 df['next_day_open_to_close_gain'] = df['close'].shift(-1) / df['open'].shift(-1) - 1 # 计算次日最低价与开盘价比例 df['next_day_low_ratio'] = df['low'].shift(-1) / df['open'].shift(-1) # 创建复合目标:收盘价比开盘价高5% 且 最低价≥开盘价98% df['target'] = 0 mask = (df['next_day_open_to_close_gain'] > self.config.MIN_GAIN) & \ (df['next_day_low_ratio'] >= self.config.MIN_LOW_RATIO) df.loc[mask, 'target'] = 1 # 删除最后一行(没有次日数据) df = df.iloc[:-1] # 检查目标分布 target_counts = df['target'].value_counts() logger.info(f"目标分布: 0={target_counts.get(0, 0)}, 1={target_counts.get(1, 0)}") # 添加调试信息 if self.config.DEBUG_MODE: sample_targets = df[['open', 'close', 'next_day_open_to_close_gain', 'target']].tail(5) logger.debug(f"目标创建示例:\n{sample_targets}") return df # ========== 模型训练 (内存优化版) ========== class StockModelTrainer: def __init__(self, config): self.config = config self.model_name = "stock_prediction_model" self.feature_importance = None def prepare_dataset(self, stock_data, cluster_model, feature_engineer): """准备训练数据集(内存优化版)""" logger.info("准备训练数据集...") X_list = [] y_list = [] stock_group_list = [] # 用于分组交叉验证 target_creator = TargetCreator(self.config) # 使用生成器减少内存占用 for stock_code, df in tqdm(stock_data.items(), desc="处理股票数据"): try: # 特征工程 df = feature_engineer.transform(df.copy()) # 添加聚类特征 df = cluster_model.transform(df, stock_code) # 创建目标 df = target_creator.create_targets(df) # 只保留所需特征和目标 features = self.config.PREDICT_FEATURES if 'target' not in df.columns: logger.warning(f"股票 {stock_code} 缺少目标列,跳过") continue X = df[features] y = df['target'] # 确保没有NaN值 if X.isnull().any().any(): logger.warning(f"股票 {stock_code} 特征包含NaN值,跳过") continue # 使用稀疏矩阵存储(减少内存) sparse_X = sparse.csr_matrix(X.values.astype(np.float32)) X_list.append(sparse_X) y_list.append(y.values) stock_group_list.extend([stock_code] * len(X)) # 为每个样本添加股票代码作为组标识 # 定期清理内存 if len(X_list) % 100 == 0: gc.collect() print_memory_usage() except Exception as e: logger.error(f"处理股票 {stock_code} 失败: {str(e)}", exc_info=True) if not X_list: logger.error("没有可用的训练数据") return None, None, None # 合并所有数据 X_full = sparse.vstack(X_list) y_full = np.concatenate(y_list) groups = np.array(stock_group_list) logger.info(f"数据集准备完成,样本数: {X_full.shape[0]}") logger.info(f"目标分布: 0={sum(y_full==0)}, 1={sum(y_full==1)}") return X_full, y_full, groups def feature_selection(self, X, y): """执行特征选择(内存优化版)""" logger.info("执行特征选择...") # 使用基模型评估特征重要性 base_model = lgb.LGBMClassifier( n_estimators=100, random_state=42, n_jobs=-1 ) # 分批训练(减少内存占用) batch_size = 100000 for i in range(0, X.shape[0], batch_size): end_idx = min(i + batch_size, X.shape[0]) X_batch = X[i:end_idx].toarray() if sparse.issparse(X) else X[i:end_idx] y_batch = y[i:end_idx] if i == 0: base_model.fit(X_batch, y_batch) else: base_model.fit(X_batch, y_batch, init_model=base_model) # 获取特征重要性 importance = pd.Series(base_model.feature_importances_, index=self.config.PREDICT_FEATURES) importance = importance.sort_values(ascending=False) logger.info("特征重要性:\n" + importance.to_string()) # 选择前K个重要特征 k = min(15, len(self.config.PREDICT_FEATURES)) selected_features = importance.head(k).index.tolist() logger.info(f"选择前 {k} 个特征: {selected_features}") # 更新配置中的特征列表 self.config.PREDICT_FEATURES = selected_features # 转换特征矩阵 if sparse.issparse(X): # 对于稀疏矩阵,我们需要重新索引 feature_indices = [self.config.PREDICT_FEATURES.index(f) for f in selected_features] X_selected = X[:, feature_indices] else: X_selected = X[selected_features] return X_selected, selected_features def train_model(self, X, y, groups): """训练并优化模型(内存优化版)""" if X is None or len(y) == 0: logger.error("训练数据为空,无法训练模型") return None logger.info("开始训练模型...") # 1. 处理类别不平衡 pos_count = sum(y == 1) neg_count = sum(y == 0) scale_pos_weight = neg_count / pos_count if pos_count > 0 else 1.0 logger.info(f"类别不平衡处理: 正样本权重 = {scale_pos_weight:.2f}") # 2. 特征选择 X_selected, selected_features = self.feature_selection(X, y) # 3. 自定义评分函数 - 关注正类召回率 def positive_recall_score(y_true, y_pred): return recall_score(y_true, y_pred, pos_label=1) custom_scorer = make_scorer(positive_recall_score, greater_is_better=True) # 4. 使用分组时间序列交叉验证(减少折数) group_kfold = GroupKFold(n_splits=2) # 减少折数以节省内存 cv = list(group_kfold.split(X_selected, y, groups=groups)) # 5. 创建模型 model = lgb.LGBMClassifier( objective='binary', random_state=42, n_jobs=-1, scale_pos_weight=scale_pos_weight, verbose=-1 ) # 6. 参数搜索(减少迭代次数) search = RandomizedSearchCV( estimator=model, param_distributions=self.config.PARAM_GRID, n_iter=10, # 减少迭代次数以节省内存 scoring=custom_scorer, cv=cv, verbose=2, n_jobs=1, # 减少并行任务以节省内存 pre_dispatch='2*n_jobs', # 控制任务分发 random_state=42 ) logger.info("开始参数搜索...") # 分批处理数据(减少内存占用) if sparse.issparse(X_selected): X_dense = X_selected.toarray() # 转换为密集矩阵用于搜索 else: X_dense = X_selected search.fit(X_dense, y) # 7. 使用最佳参数训练最终模型 best_params = search.best_params_ logger.info(f"最佳参数: {best_params}") logger.info(f"最佳召回率: {search.best_score_}") final_model = lgb.LGBMClassifier( **best_params, objective='binary', random_state=42, n_jobs=-1, scale_pos_weight=scale_pos_weight ) # 使用早停策略训练最终模型 logger.info("训练最终模型...") final_model.fit( X_dense, y, eval_set=[(X_dense, y)], eval_metric='binary_logloss', callbacks=[ lgb.early_stopping(stopping_rounds=50, verbose=False), lgb.log_evaluation(period=100) ] ) # 保存特征重要性 self.feature_importance = pd.Series( final_model.feature_importances_, index=selected_features ).sort_values(ascending=False) # 8. 保存模型 model_path = f"{self.model_name}.pkl" joblib.dump((final_model, selected_features), model_path) logger.info(f"模型已保存到: {model_path}") return final_model def evaluate_model(self, model, X_test, y_test): """评估模型性能""" if model is None or len(X_test) == 0: logger.warning("无法评估模型,缺少数据或模型") return # 预测测试集 y_pred = model.predict(X_test) # 计算召回率 recall = recall_score(y_test, y_pred, pos_label=1) logger.info(f"测试集召回率: {recall:.4f}") # 计算满足条件的样本比例 condition_ratio = sum(y_test == 1) / len(y_test) logger.info(f"满足条件的样本比例: {condition_ratio:.4f}") # 详细分类报告 report = classification_report(y_test, y_pred) logger.info("分类报告:\n" + report) # 特征重要性 if self.feature_importance is not None: logger.info("特征重要性:\n" + self.feature_importance.to_string()) # ========== 主程序 ========== def main(): # 初始化配置 config = StockConfig() logger.info("===== 股票上涨预测程序 (修复版) =====") # 加载训练数据(添加抽样) logger.info(f"加载训练数据: {config.START_DATE} 至 {config.END_DATE}") train_data = load_stock_data( config.SH_PATH, config.SZ_PATH, config.START_DATE, config.END_DATE, sample_fraction=config.SAMPLE_FRACTION, debug_mode=config.DEBUG_MODE, max_stocks=config.MAX_STOCKS ) if not train_data: logger.error("错误: 没有加载到任何股票数据,请检查数据路径和格式") return # 特征工程 feature_engineer = FeatureEngineer(config) # 聚类分析 - 尝试加载现有模型,否则训练新模型 cluster_model = StockCluster(config) if not cluster_model.load(): # 尝试加载模型 try: cluster_model.fit(train_data) except Exception as e: logger.error(f"聚类分析失败: {str(e)}", exc_info=True) # 创建默认聚类映射 cluster_model.cluster_map = {code: 0 for code in train_data.keys()} logger.info("使用默认聚类(所有股票归为同一类)") cluster_model.save() # 保存默认聚类模型 # 准备训练数据 trainer = StockModelTrainer(config) try: X_train, y_train, groups = trainer.prepare_dataset( train_data, cluster_model, feature_engineer ) except Exception as e: logger.error(f"准备训练数据失败: {str(e)}", exc_info=True) return if X_train is None or len(y_train) == 0: logger.error("错误: 没有可用的训练数据") return # 训练模型 model = trainer.train_model(X_train, y_train, groups) if model is None: logger.error("模型训练失败") return # 加载测试数据(添加抽样) logger.info(f"\n加载测试数据: {config.TEST_START} 至 {config.TEST_END}") test_data = load_stock_data( config.SH_PATH, config.SZ_PATH, config.TEST_START, config.TEST_END, sample_fraction=config.SAMPLE_FRACTION, debug_mode=config.DEBUG_MODE, max_stocks=config.MAX_STOCKS ) if test_data: # 准备测试数据 X_test, y_test, _ = trainer.prepare_dataset( test_data, cluster_model, feature_engineer ) if X_test is not None and len(y_test) > 0: # 评估模型 if sparse.issparse(X_test): X_test = X_test.toarray() trainer.evaluate_model(model, X_test, y_test) else: logger.warning("测试数据准备失败,无法评估模型") else: logger.warning("没有测试数据可用") logger.info("===== 程序执行完成 =====") if __name__ == "__main__": main()

帮我给下面这个代码的训练条件加一条,要求后天(T+2)的收盘价不能低于次日(T+1)的收盘价 # -*- coding: utf-8 -*- """ Created on Sun Jul 20 16:00:01 2025 @author: srx20 """ import os import gc import numpy as np import pandas as pd import joblib import talib as ta from tqdm import tqdm import random from sklearn.cluster import MiniBatchKMeans from sklearn.preprocessing import StandardScaler from sklearn.model_selection import RandomizedSearchCV, GroupKFold from sklearn.feature_selection import SelectKBest, f_classif from sklearn.metrics import make_scorer, recall_score, classification_report import lightgbm as lgb import logging import psutil import warnings from scipy import sparse warnings.filterwarnings('ignore') # 设置日志记录 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('stock_prediction_fixed.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # ========== 配置类 ========== class StockConfig: def __init__(self): # 数据路径 self.SH_PATH = r"D:\股票量化数据库\股票csv数据\上证" self.SZ_PATH = r"D:\股票量化数据库\股票csv数据\深证" # 时间范围 self.START_DATE = "2011-1-1" self.END_DATE = "2024-1-1" self.TEST_START = "2024-1-1" self.TEST_END = "2025-7-18" # 聚类设置 self.CLUSTER_NUM = 8 self.CLUSTER_FEATURES = [ 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist' ] # 预测特征 (初始列表,实际使用时会动态更新) self.PREDICT_FEATURES = [ 'open', 'high', 'low', 'close', 'volume', 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist', 'cluster', 'MOM10', 'ATR14', 'VWAP', 'RSI_diff', 'price_vol_ratio', 'MACD_RSI', 'advance_decline', 'day_of_week', 'month' ] # 模型参数优化范围(内存优化版) self.PARAM_GRID = { 'boosting_type': ['gbdt'], # 减少选项 'num_leaves': [31, 63], # 减少选项 'max_depth': [-1, 7], # 减少选项 'learning_rate': [0.01, 0.05], 'n_estimators': [300, 500], # 减少选项 'min_child_samples': [50], # 固定值 'min_split_gain': [0.0, 0.1], 'reg_alpha': [0, 0.1], 'reg_lambda': [0, 0.1], 'feature_fraction': [0.7, 0.9], 'bagging_fraction': [0.7, 0.9], 'bagging_freq': [1] } # 目标条件 self.MIN_GAIN = 0.05 self.MIN_LOW_RATIO = 0.98 # 调试模式 self.DEBUG_MODE = False self.MAX_STOCKS = 50 if self.DEBUG_MODE else None self.SAMPLE_FRACTION = 0.3 if not self.DEBUG_MODE else 1.0 # 采样比例 # ========== 内存管理工具 (修复版) ========== def reduce_mem_usage(df): """优化DataFrame内存使用,只处理数值列""" start_mem = df.memory_usage().sum() / 1024**2 # 只处理数值列 numeric_cols = df.select_dtypes(include=['int', 'float', 'integer']).columns for col in numeric_cols: col_type = df[col].dtype if col_type != object: c_min = df[col].min() c_max = df[col].max() if str(col_type)[:3] == 'int': if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16) elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max: df[col] = df[col].astype(np.int64) else: if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: df[col] = df[col].astype(np.float16) elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: df[col] = df[col].astype(np.float32) else: df[col] = df[col].astype(np.float64) end_mem = df.memory_usage().sum() / 1024**2 logger.info(f'内存优化: 从 {start_mem:.2f} MB 减少到 {end_mem:.2f} MB ({100*(start_mem-end_mem)/start_mem:.1f}%)') return df def print_memory_usage(): """打印当前内存使用情况""" process = psutil.Process(os.getpid()) mem = process.memory_info().rss / (1024 ** 2) logger.info(f"当前内存使用: {mem:.2f} MB") # ========== 数据加载 (修复版) ========== def load_stock_data(sh_path, sz_path, start_date, end_date, sample_fraction=1.0, debug_mode=False, max_stocks=None): """加载股票数据,并过滤日期范围(修复随机抽样问题)""" stock_data = {} # 创建文件列表 all_files = [] for exchange, path in [('SH', sh_path), ('SZ', sz_path)]: if os.path.exists(path): csv_files = [f for f in os.listdir(path) if f.endswith('.csv')] for file in csv_files: all_files.append((exchange, path, file)) if not all_files: logger.warning("没有找到任何CSV文件") return stock_data # 随机抽样(修复一维问题) if sample_fraction < 1.0: sample_size = max(1, int(len(all_files) * sample_fraction)) # 使用random.sample代替np.random.choice all_files = random.sample(all_files, sample_size) logger.info(f"抽样 {len(all_files)} 只股票文件 (比例: {sample_fraction})") total_files = len(all_files) pbar = tqdm(total=total_files, desc='加载股票数据') loaded_count = 0 for exchange, path, file in all_files: if max_stocks is not None and loaded_count >= max_stocks: break if file.endswith('.csv'): stock_code = f"{exchange}_{file.split('.')[0]}" file_path = os.path.join(path, file) try: # 读取数据并验证列名 df = pd.read_csv(file_path) # 验证必要的列是否存在 required_cols = ['date', 'open', 'high', 'low', 'close', 'volume'] if not all(col in df.columns for col in required_cols): logger.warning(f"股票 {stock_code} 缺少必要列,跳过") pbar.update(1) continue # 转换日期并过滤 df['date'] = pd.to_datetime(df['date']) df = df[(df['date'] >= start_date) & (df['date'] <= end_date)] if len(df) < 50: # 至少50个交易日 logger.info(f"股票 {stock_code} 数据不足({len(df)}条),跳过") pbar.update(1) continue # 转换数据类型 for col in ['open', 'high', 'low', 'close']: df[col] = pd.to_numeric(df[col], errors='coerce').astype(np.float32) df['volume'] = pd.to_numeric(df['volume'], errors='coerce').astype(np.uint32) # 删除包含NaN的行 df = df.dropna(subset=required_cols) if len(df) > 0: stock_data[stock_code] = df loaded_count += 1 logger.debug(f"成功加载股票 {stock_code},数据条数: {len(df)}") else: logger.warning(f"股票 {stock_code} 过滤后无数据") except Exception as e: logger.error(f"加载股票 {stock_code} 失败: {str(e)}", exc_info=True) pbar.update(1) # 调试模式只处理少量股票 if debug_mode and loaded_count >= 10: logger.info("调试模式: 已加载10只股票,提前结束") break pbar.close() logger.info(f"成功加载 {len(stock_data)} 只股票数据") return stock_data # ========== 特征工程 (修复版) ========== class FeatureEngineer: def __init__(self, config): self.config = config def safe_fillna(self, series, default=0): """安全填充NaN值""" if isinstance(series, pd.Series): return series.fillna(default) elif isinstance(series, np.ndarray): return np.nan_to_num(series, nan=default) return series def transform(self, df): """添加技术指标特征(修复NumPy数组问题)""" try: # 创建临时副本用于TA-Lib计算 df_temp = df.copy() # 将价格列转换为float64以满足TA-Lib要求 for col in ['open', 'high', 'low', 'close']: df_temp[col] = df_temp[col].astype(np.float64) # 基础特征 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 技术指标 - 修复NumPy数组问题 rsi = ta.RSI(df_temp['close'].values, timeperiod=14) df['RSI14'] = self.safe_fillna(rsi, 50) macd, macd_signal, macd_hist = ta.MACD( df_temp['close'].values, fastperiod=12, slowperiod=26, signalperiod=9 ) df['MACD_hist'] = self.safe_fillna(macd_hist, 0) # 新增特征 mom = ta.MOM(df_temp['close'].values, timeperiod=10) df['MOM10'] = self.safe_fillna(mom, 0) atr = ta.ATR( df_temp['high'].values, df_temp['low'].values, df_temp['close'].values, timeperiod=14 ) df['ATR14'] = self.safe_fillna(atr, 0) # 成交量加权平均价 vwap = (df['volume'] * (df['high'] + df['low'] + df['close']) / 3).cumsum() / df['volume'].cumsum() df['VWAP'] = self.safe_fillna(vwap, 0) # 相对强弱指数差值 df['RSI_diff'] = df['RSI14'] - df['RSI14'].rolling(5).mean().fillna(0) # 价格波动比率 df['price_vol_ratio'] = df['price_change'] / (df['volatility'].replace(0, 1e-8) + 1e-8) # 技术指标组合特征 df['MACD_RSI'] = df['MACD_hist'] * df['RSI14'] # 市场情绪指标 df['advance_decline'] = (df['close'] > df['open']).astype(int).rolling(5).sum().fillna(0) # 时间特征 df['day_of_week'] = df['date'].dt.dayofweek df['month'] = df['date'].dt.month # 处理无穷大和NaN df = df.replace([np.inf, -np.inf], np.nan) df = df.fillna(0) # 优化内存(只处理数值列) return reduce_mem_usage(df) except Exception as e: logger.error(f"特征工程失败: {str(e)}", exc_info=True) # 返回基本特征作为回退方案 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 填充缺失的技术指标 for col in self.config.PREDICT_FEATURES: if col not in df.columns: df[col] = 0 return df # ========== 聚类模型 (添加保存/加载功能) ========== class StockCluster: def __init__(self, config): self.config = config self.scaler = StandardScaler() self.kmeans = MiniBatchKMeans( n_clusters=config.CLUSTER_NUM, random_state=42, batch_size=1000 ) self.cluster_map = {} # 股票代码到聚类ID的映射 self.model_file = "stock_cluster_model.pkl" # 模型保存路径 def save(self): """保存聚类模型到文件""" # 创建包含所有必要组件的字典 model_data = { 'kmeans': self.kmeans, 'scaler': self.scaler, 'cluster_map': self.cluster_map, 'config_cluster_num': self.config.CLUSTER_NUM } # 使用joblib保存模型 joblib.dump(model_data, self.model_file) logger.info(f"聚类模型已保存到: {self.model_file}") def load(self): """从文件加载聚类模型""" if os.path.exists(self.model_file): model_data = joblib.load(self.model_file) self.kmeans = model_data['kmeans'] self.scaler = model_data['scaler'] self.cluster_map = model_data['cluster_map'] logger.info(f"从 {self.model_file} 加载聚类模型") return True else: logger.warning("聚类模型文件不存在,需要重新训练") return False def fit(self, stock_data): """训练聚类模型""" logger.info("开始股票聚类分析...") cluster_features = [] # 提取每只股票的特征 for stock_code, df in tqdm(stock_data.items(), desc="提取聚类特征"): if len(df) < 50: # 至少50个交易日 continue features = {} for feat in self.config.CLUSTER_FEATURES: if feat in df.columns: # 使用统计特征 features[f"{feat}_mean"] = df[feat].mean() features[f"{feat}_std"] = df[feat].std() else: # 特征缺失时填充0 features[f"{feat}_mean"] = 0 features[f"{feat}_std"] = 0 cluster_features.append(features) if not cluster_features: logger.warning("没有可用的聚类特征,使用默认聚类") # 创建默认聚类映射 self.cluster_map = {code: 0 for code in stock_data.keys()} return self # 创建特征DataFrame feature_df = pd.DataFrame(cluster_features) feature_df = reduce_mem_usage(feature_df) # 标准化特征 scaled_features = self.scaler.fit_transform(feature_df) # 聚类 self.kmeans.fit(scaled_features) clusters = self.kmeans.predict(scaled_features) feature_df['cluster'] = clusters # 创建股票到聚类的映射 stock_codes = list(stock_data.keys())[:len(clusters)] # 确保长度匹配 for i, stock_code in enumerate(stock_codes): self.cluster_map[stock_code] = clusters[i] logger.info("聚类分布统计:") logger.info(feature_df['cluster'].value_counts().to_string()) logger.info(f"股票聚类完成,共分为 {self.config.CLUSTER_NUM} 个类别") # 训练完成后自动保存模型 self.save() return self def transform(self, df, stock_code): """为数据添加聚类特征""" cluster_id = self.cluster_map.get(stock_code, -1) # 默认为-1表示未知聚类 df['cluster'] = cluster_id return df # ========== 目标创建 ========== class TargetCreator: def __init__(self, config): self.config = config def create_targets(self, df): """创建目标变量 - 修改为收盘价高于开盘价5%""" # 计算次日收盘价相对于开盘价的涨幅 df['next_day_open_to_close_gain'] = df['close'].shift(-1) / df['open'].shift(-1) - 1 # 计算次日最低价与开盘价比例 df['next_day_low_ratio'] = df['low'].shift(-1) / df['open'].shift(-1) # 创建复合目标:收盘价比开盘价高5% 且 最低价≥开盘价98% df['target'] = 0 mask = (df['next_day_open_to_close_gain'] > self.config.MIN_GAIN) & \ (df['next_day_low_ratio'] >= self.config.MIN_LOW_RATIO) df.loc[mask, 'target'] = 1 # 删除最后一行(没有次日数据) df = df.iloc[:-1] # 检查目标分布 target_counts = df['target'].value_counts() logger.info(f"目标分布: 0={target_counts.get(0, 0)}, 1={target_counts.get(1, 0)}") # 添加调试信息 if self.config.DEBUG_MODE: sample_targets = df[['open', 'close', 'next_day_open_to_close_gain', 'target']].tail(5) logger.debug(f"目标创建示例:\n{sample_targets}") return df # ========== 模型训练 (内存优化版) ========== class StockModelTrainer: def __init__(self, config): self.config = config self.model_name = "stock_prediction_model" self.feature_importance = None def prepare_dataset(self, stock_data, cluster_model, feature_engineer): """准备训练数据集(内存优化版)""" logger.info("准备训练数据集...") X_list = [] y_list = [] stock_group_list = [] # 用于分组交叉验证 target_creator = TargetCreator(self.config) # 使用生成器减少内存占用 for stock_code, df in tqdm(stock_data.items(), desc="处理股票数据"): try: # 特征工程 df = feature_engineer.transform(df.copy()) # 添加聚类特征 df = cluster_model.transform(df, stock_code) # 创建目标 df = target_creator.create_targets(df) # 只保留所需特征和目标 features = self.config.PREDICT_FEATURES if 'target' not in df.columns: logger.warning(f"股票 {stock_code} 缺少目标列,跳过") continue X = df[features] y = df['target'] # 确保没有NaN值 if X.isnull().any().any(): logger.warning(f"股票 {stock_code} 特征包含NaN值,跳过") continue # 使用稀疏矩阵存储(减少内存) sparse_X = sparse.csr_matrix(X.values.astype(np.float32)) X_list.append(sparse_X) y_list.append(y.values) stock_group_list.extend([stock_code] * len(X)) # 为每个样本添加股票代码作为组标识 # 定期清理内存 if len(X_list) % 100 == 0: gc.collect() print_memory_usage() except Exception as e: logger.error(f"处理股票 {stock_code} 失败: {str(e)}", exc_info=True) if not X_list: logger.error("没有可用的训练数据") return None, None, None # 合并所有数据 X_full = sparse.vstack(X_list) y_full = np.concatenate(y_list) groups = np.array(stock_group_list) logger.info(f"数据集准备完成,样本数: {X_full.shape[0]}") logger.info(f"目标分布: 0={sum(y_full==0)}, 1={sum(y_full==1)}") return X_full, y_full, groups def feature_selection(self, X, y): """执行特征选择(内存优化版)""" logger.info("执行特征选择...") # 使用基模型评估特征重要性 base_model = lgb.LGBMClassifier( n_estimators=100, random_state=42, n_jobs=-1 ) # 分批训练(减少内存占用) batch_size = 100000 for i in range(0, X.shape[0], batch_size): end_idx = min(i + batch_size, X.shape[0]) X_batch = X[i:end_idx].toarray() if sparse.issparse(X) else X[i:end_idx] y_batch = y[i:end_idx] if i == 0: base_model.fit(X_batch, y_batch) else: base_model.fit(X_batch, y_batch, init_model=base_model) # 获取特征重要性 importance = pd.Series(base_model.feature_importances_, index=self.config.PREDICT_FEATURES) importance = importance.sort_values(ascending=False) logger.info("特征重要性:\n" + importance.to_string()) # 选择前K个重要特征 k = min(15, len(self.config.PREDICT_FEATURES)) selected_features = importance.head(k).index.tolist() logger.info(f"选择前 {k} 个特征: {selected_features}") # 更新配置中的特征列表 self.config.PREDICT_FEATURES = selected_features # 转换特征矩阵 if sparse.issparse(X): # 对于稀疏矩阵,我们需要重新索引 feature_indices = [self.config.PREDICT_FEATURES.index(f) for f in selected_features] X_selected = X[:, feature_indices] else: X_selected = X[selected_features] return X_selected, selected_features def train_model(self, X, y, groups): """训练并优化模型(内存优化版)""" if X is None or len(y) == 0: logger.error("训练数据为空,无法训练模型") return None logger.info("开始训练模型...") # 1. 处理类别不平衡 pos_count = sum(y == 1) neg_count = sum(y == 0) scale_pos_weight = neg_count / pos_count if pos_count > 0 else 1.0 logger.info(f"类别不平衡处理: 正样本权重 = {scale_pos_weight:.2f}") # 2. 特征选择 X_selected, selected_features = self.feature_selection(X, y) # 3. 自定义评分函数 - 关注正类召回率 def positive_recall_score(y_true, y_pred): return recall_score(y_true, y_pred, pos_label=1) custom_scorer = make_scorer(positive_recall_score, greater_is_better=True) # 4. 使用分组时间序列交叉验证(减少折数) group_kfold = GroupKFold(n_splits=2) # 减少折数以节省内存 cv = list(group_kfold.split(X_selected, y, groups=groups)) # 5. 创建模型 model = lgb.LGBMClassifier( objective='binary', random_state=42, n_jobs=-1, scale_pos_weight=scale_pos_weight, verbose=-1 ) # 6. 参数搜索(减少迭代次数) search = RandomizedSearchCV( estimator=model, param_distributions=self.config.PARAM_GRID, n_iter=10, # 减少迭代次数以节省内存 scoring=custom_scorer, cv=cv, verbose=2, n_jobs=1, # 减少并行任务以节省内存 pre_dispatch='2*n_jobs', # 控制任务分发 random_state=42 ) logger.info("开始参数搜索...") # 分批处理数据(减少内存占用) if sparse.issparse(X_selected): X_dense = X_selected.toarray() # 转换为密集矩阵用于搜索 else: X_dense = X_selected search.fit(X_dense, y) # 7. 使用最佳参数训练最终模型 best_params = search.best_params_ logger.info(f"最佳参数: {best_params}") logger.info(f"最佳召回率: {search.best_score_}") final_model = lgb.LGBMClassifier( **best_params, objective='binary', random_state=42, n_jobs=-1, scale_pos_weight=scale_pos_weight ) # 使用早停策略训练最终模型 logger.info("训练最终模型...") final_model.fit( X_dense, y, eval_set=[(X_dense, y)], eval_metric='binary_logloss', callbacks=[ lgb.early_stopping(stopping_rounds=50, verbose=False), lgb.log_evaluation(period=100) ] ) # 保存特征重要性 self.feature_importance = pd.Series( final_model.feature_importances_, index=selected_features ).sort_values(ascending=False) # 8. 保存模型 model_path = f"{self.model_name}.pkl" joblib.dump((final_model, selected_features), model_path) logger.info(f"模型已保存到: {model_path}") return final_model def evaluate_model(self, model, X_test, y_test): """评估模型性能""" if model is None or len(X_test) == 0: logger.warning("无法评估模型,缺少数据或模型") return # 预测测试集 y_pred = model.predict(X_test) # 计算召回率 recall = recall_score(y_test, y_pred, pos_label=1) logger.info(f"测试集召回率: {recall:.4f}") # 计算满足条件的样本比例 condition_ratio = sum(y_test == 1) / len(y_test) logger.info(f"满足条件的样本比例: {condition_ratio:.4f}") # 详细分类报告 report = classification_report(y_test, y_pred) logger.info("分类报告:\n" + report) # 特征重要性 if self.feature_importance is not None: logger.info("特征重要性:\n" + self.feature_importance.to_string()) # ========== 主程序 ========== def main(): # 初始化配置 config = StockConfig() logger.info("===== 股票上涨预测程序 (修复版) =====") # 加载训练数据(添加抽样) logger.info(f"加载训练数据: {config.START_DATE} 至 {config.END_DATE}") train_data = load_stock_data( config.SH_PATH, config.SZ_PATH, config.START_DATE, config.END_DATE, sample_fraction=config.SAMPLE_FRACTION, debug_mode=config.DEBUG_MODE, max_stocks=config.MAX_STOCKS ) if not train_data: logger.error("错误: 没有加载到任何股票数据,请检查数据路径和格式") return # 特征工程 feature_engineer = FeatureEngineer(config) # 聚类分析 - 尝试加载现有模型,否则训练新模型 cluster_model = StockCluster(config) if not cluster_model.load(): # 尝试加载模型 try: cluster_model.fit(train_data) except Exception as e: logger.error(f"聚类分析失败: {str(e)}", exc_info=True) # 创建默认聚类映射 cluster_model.cluster_map = {code: 0 for code in train_data.keys()} logger.info("使用默认聚类(所有股票归为同一类)") cluster_model.save() # 保存默认聚类模型 # 准备训练数据 trainer = StockModelTrainer(config) try: X_train, y_train, groups = trainer.prepare_dataset( train_data, cluster_model, feature_engineer ) except Exception as e: logger.error(f"准备训练数据失败: {str(e)}", exc_info=True) return if X_train is None or len(y_train) == 0: logger.error("错误: 没有可用的训练数据") return # 训练模型 model = trainer.train_model(X_train, y_train, groups) if model is None: logger.error("模型训练失败") return # 加载测试数据(添加抽样) logger.info(f"\n加载测试数据: {config.TEST_START} 至 {config.TEST_END}") test_data = load_stock_data( config.SH_PATH, config.SZ_PATH, config.TEST_START, config.TEST_END, sample_fraction=config.SAMPLE_FRACTION, debug_mode=config.DEBUG_MODE, max_stocks=config.MAX_STOCKS ) if test_data: # 准备测试数据 X_test, y_test, _ = trainer.prepare_dataset( test_data, cluster_model, feature_engineer ) if X_test is not None and len(y_test) > 0: # 评估模型 if sparse.issparse(X_test): X_test = X_test.toarray() trainer.evaluate_model(model, X_test, y_test) else: logger.warning("测试数据准备失败,无法评估模型") else: logger.warning("没有测试数据可用") logger.info("===== 程序执行完成 =====") if __name__ == "__main__": main()

给这段代码加上保存聚类模型的功能: import os import gc import numpy as np import pandas as pd import joblib import talib as ta from tqdm import tqdm import random # 新增random模块 from sklearn.cluster import MiniBatchKMeans from sklearn.preprocessing import StandardScaler from sklearn.model_selection import RandomizedSearchCV, GroupKFold from sklearn.feature_selection import SelectKBest, f_classif from sklearn.metrics import make_scorer, recall_score, classification_report import lightgbm as lgb import logging import psutil import warnings from scipy import sparse warnings.filterwarnings('ignore') # 设置日志记录 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('stock_prediction_fixed.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # ========== 配置类 ========== class StockConfig: def __init__(self): # 数据路径 self.SH_PATH = r"D:\股票量化数据库\股票csv数据\上证" self.SZ_PATH = r"D:\股票量化数据库\股票csv数据\深证" # 时间范围 self.START_DATE = "2018-01-01" self.END_DATE = "2020-12-31" self.TEST_START = "2021-01-01" self.TEST_END = "2021-12-31" # 聚类设置 self.CLUSTER_NUM = 8 self.CLUSTER_FEATURES = [ 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist' ] # 预测特征 (初始列表,实际使用时会动态更新) self.PREDICT_FEATURES = [ 'open', 'high', 'low', 'close', 'volume', 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist', 'cluster', 'MOM10', 'ATR14', 'VWAP', 'RSI_diff', 'price_vol_ratio', 'MACD_RSI', 'advance_decline', 'day_of_week', 'month' ] # 模型参数优化范围(内存优化版) self.PARAM_GRID = { 'boosting_type': ['gbdt'], # 减少选项 'num_leaves': [31, 63], # 减少选项 'max_depth': [-1, 7], # 减少选项 'learning_rate': [0.01, 0.05], 'n_estimators': [300, 500], # 减少选项 'min_child_samples': [50], # 固定值 'min_split_gain': [0.0, 0.1], 'reg_alpha': [0, 0.1], 'reg_lambda': [0, 0.1], 'feature_fraction': [0.7, 0.9], 'bagging_fraction': [0.7, 0.9], 'bagging_freq': [1] } # 目标条件 self.MIN_GAIN = 0.05 self.MIN_LOW_RATIO = 0.98 # 调试模式 self.DEBUG_MODE = False self.MAX_STOCKS = 50 if self.DEBUG_MODE else None self.SAMPLE_FRACTION = 0.3 if not self.DEBUG_MODE else 1.0 # 采样比例 # ========== 内存管理工具 ========== def reduce_mem_usage(df): """优化DataFrame内存使用""" start_mem = df.memory_usage().sum() / 1024**2 for col in df.columns: col_type = df[col].dtype if col_type != object: c_min = df[col].min() c_max = df[col].max() if str(col_type)[:3] == 'int': if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].ast(np.int16) elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max: df[col] = df[col].astype(np.int64) else: if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: df[col] = df[col].astype(np.float16) elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: df[col] = df[col].astype(np.float32) else: df[col] = df[col].astype(np.float64) end_mem = df.memory_usage().sum() / 1024**2 logger.info(f'内存优化: 从 {start_mem:.2f} MB 减少到 {end_mem:.2f} MB ({100*(start_mem-end_mem)/start_mem:.1f}%)') return df def print_memory_usage(): """打印当前内存使用情况""" process = psutil.Process(os.getpid()) mem = process.memory_info().rss / (1024 ** 2) logger.info(f"当前内存使用: {mem:.2f} MB") # ========== 数据加载 (修复版) ========== def load_stock_data(sh_path, sz_path, start_date, end_date, sample_fraction=1.0, debug_mode=False, max_stocks=None): """加载股票数据,并过滤日期范围(修复随机抽样问题)""" stock_data = {} # 创建文件列表 all_files = [] for exchange, path in [('SH', sh_path), ('SZ', sz_path)]: if os.path.exists(path): csv_files = [f for f in os.listdir(path) if f.endswith('.csv')] for file in csv_files: all_files.append((exchange, path, file)) if not all_files: logger.warning("没有找到任何CSV文件") return stock_data # 随机抽样(修复一维问题) if sample_fraction < 1.0: sample_size = max(1, int(len(all_files) * sample_fraction)) # 使用random.sample代替np.random.choice all_files = random.sample(all_files, sample_size) logger.info(f"抽样 {len(all_files)} 只股票文件 (比例: {sample_fraction})") total_files = len(all_files) pbar = tqdm(total=total_files, desc='加载股票数据') loaded_count = 0 for exchange, path, file in all_files: if max_stocks is not None and loaded_count >= max_stocks: break if file.endswith('.csv'): stock_code = f"{exchange}_{file.split('.')[0]}" file_path = os.path.join(path, file) try: # 读取数据并验证列名 df = pd.read_csv(file_path) # 验证必要的列是否存在 required_cols = ['date', 'open', 'high', 'low', 'close', 'volume'] if not all(col in df.columns for col in required_cols): logger.warning(f"股票 {stock_code} 缺少必要列,跳过") pbar.update(1) continue # 转换日期并过滤 df['date'] = pd.to_datetime(df['date']) df = df[(df['date'] >= start_date) & (df['date'] <= end_date)] if len(df) < 100: # 至少100个交易日 logger.info(f"股票 {stock_code} 数据不足({len(df)}条),跳过") pbar.update(1) continue # 转换数据类型 for col in ['open', 'high', 'low', 'close']: df[col] = pd.to_numeric(df[col], errors='coerce').astype(np.float32) df['volume'] = pd.to_numeric(df['volume'], errors='coerce').astype(np.uint32) # 删除包含NaN的行 df = df.dropna(subset=required_cols) if len(df) > 0: stock_data[stock_code] = df loaded_count += 1 logger.debug(f"成功加载股票 {stock_code},数据条数: {len(df)}") else: logger.warning(f"股票 {stock_code} 过滤后无数据") except Exception as e: logger.error(f"加载股票 {stock_code} 失败: {str(e)}", exc_info=True) pbar.update(1) # 调试模式只处理少量股票 if debug_mode and loaded_count >= 10: logger.info("调试模式: 已加载10只股票,提前结束") break pbar.close() logger.info(f"成功加载 {len(stock_data)} 只股票数据") return stock_data # ========== 特征工程 (修复版) ========== class FeatureEngineer: def __init__(self, config): self.config = config def safe_fillna(self, series, default=0): """安全填充NaN值""" if isinstance(series, pd.Series): return series.fillna(default) elif isinstance(series, np.ndarray): return np.nan_to_num(series, nan=default) return series def transform(self, df): """添加技术指标特征(修复NumPy数组问题)""" try: # 创建临时副本用于TA-Lib计算 df_temp = df.copy() # 将价格列转换为float64以满足TA-Lib要求 for col in ['open', 'high', 'low', 'close']: df_temp[col] = df_temp[col].astype(np.float64) # 基础特征 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 技术指标 - 修复NumPy数组问题 rsi = ta.RSI(df_temp['close'].values, timeperiod=14) df['RSI14'] = self.safe_fillna(rsi, 50) macd, macd_signal, macd_hist = ta.MACD( df_temp['close'].values, fastperiod=12, slowperiod=26, signalperiod=9 ) df['MACD_hist'] = self.safe_fillna(macd_hist, 0) # 新增特征 mom = ta.MOM(df_temp['close'].values, timeperiod=10) df['MOM10'] = self.safe_fillna(mom, 0) atr = ta.ATR( df_temp['high'].values, df_temp['low'].values, df_temp['close'].values, timeperiod=14 ) df['ATR14'] = self.safe_fillna(atr, 0) # 成交量加权平均价 vwap = (df['volume'] * (df['high'] + df['low'] + df['close']) / 3).cumsum() / df['volume'].cumsum() df['VWAP'] = self.safe_fillna(vwap, 0) # 相对强弱指数差值 df['RSI_diff'] = df['RSI14'] - df['RSI14'].rolling(5).mean().fillna(0) # 价格波动比率 df['price_vol_ratio'] = df['price_change'] / (df['volatility'].replace(0, 1e-8) + 1e-8) # 技术指标组合特征 df['MACD_RSI'] = df['MACD_hist'] * df['RSI14'] # 市场情绪指标 df['advance_decline'] = (df['close'] > df['open']).astype(int).rolling(5).sum().fillna(0) # 时间特征 df['day_of_week'] = df['date'].dt.dayofweek df['month'] = df['date'].dt.month # 处理无穷大和NaN df = df.replace([np.inf, -np.inf], np.nan) df = df.fillna(0) # 优化内存 return reduce_mem_usage(df) except Exception as e: logger.error(f"特征工程失败: {str(e)}", exc_info=True) # 返回基本特征作为回退方案 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 填充缺失的技术指标 for col in self.config.PREDICT_FEATURES: if col not in df.columns: df[col] = 0 return df # ========== 聚类模型 ========== class StockCluster: def __init__(self, config): self.config = config self.scaler = StandardScaler() self.kmeans = MiniBatchKMeans( n_clusters=config.CLUSTER_NUM, random_state=42, batch_size=1000 ) self.cluster_map = {} # 股票代码到聚类ID的映射 def fit(self, stock_data): """训练聚类模型""" logger.info("开始股票聚类分析...") cluster_features = [] # 提取每只股票的特征 for stock_code, df in tqdm(stock_data.items(), desc="提取聚类特征"): if len(df) < 100: # 至少100个交易日 continue features = {} for feat in self.config.CLUSTER_FEATURES: if feat in df.columns: # 使用统计特征 features[f"{feat}_mean"] = df[feat].mean() features[f"{feat}_std"] = df[feat].std() else: # 特征缺失时填充0 features[f"{feat}_mean"] = 0 features[f"{feat}_std"] = 0 cluster_features.append(features) if not cluster_features: logger.warning("没有可用的聚类特征,使用默认聚类") # 创建默认聚类映射 self.cluster_map = {code: 0 for code in stock_data.keys()} return self # 创建特征DataFrame feature_df = pd.DataFrame(cluster_features) feature_df = reduce_mem_usage(feature_df) # 标准化特征 scaled_features = self.scaler.fit_transform(feature_df) # 聚类 self.kmeans.fit(scored_features) clusters = self.kmeans.predict(scaled_features) feature_df['cluster'] = clusters # 创建股票到聚类的映射 stock_codes = list(stock_data.keys())[:len(clusters)] # 确保长度匹配 for i, stock_code in enumerate(stock_codes): self.cluster_map[stock_code] = clusters[i] logger.info("聚类分布统计:") logger.info(feature_df['cluster'].value_counts().to_string()) logger.info(f"股票聚类完成,共分为 {self.config.CLUSTER_NUM} 个类别") return self def transform(self, df, stock_code): """为数据添加聚类特征""" cluster_id = self.cluster_map.get(stock_code, -1) # 默认为-1表示未知聚类 df['cluster'] = cluster_id return df # ========== 目标创建 ========== class TargetCreator: def __init__(self, config): self.config = config def create_targets(self, df): """创建目标变量""" # 计算次日涨幅 df['next_day_gain'] = df['close'].shift(-1) / df['close'] - 1 # 计算次日最低价与开盘价比例 df['next_day_low_ratio'] = df['low'].shift(-1) / df['open'].shift(-1) # 创建复合目标:涨幅>5% 且 最低价≥开盘价98% df['target'] = 0 mask = (df['next_day_gain'] > self.config.MIN_GAIN) & \ (df['next_day_low_ratio'] >= self.config.MIN_LOW_RATIO) df.loc[mask, 'target'] = 1 # 删除最后一行(没有次日数据) df = df.iloc[:-1] # 检查目标分布 target_counts = df['target'].value_counts() logger.info(f"目标分布: 0={target_counts.get(0, 0)}, 1={target_counts.get(1, 0)}") return df # ========== 模型训练 (内存优化版) ========== class StockModelTrainer: def __init__(self, config): self.config = config self.model_name = "stock_prediction_model" self.feature_importance = None def prepare_dataset(self, stock_data, cluster_model, feature_engineer): """准备训练数据集(内存优化版)""" logger.info("准备训练数据集...") X_list = [] y_list = [] stock_group_list = [] # 用于分组交叉验证 target_creator = TargetCreator(self.config) # 使用生成器减少内存占用 for stock_code, df in tqdm(stock_data.items(), desc="处理股票数据"): try: # 特征工程 df = feature_engineer.transform(df.copy()) # 添加聚类特征 df = cluster_model.transform(df, stock_code) # 创建目标 df = target_creator.create_targets(df) # 只保留所需特征和目标 features = self.config.PREDICT_FEATURES if 'target' not in df.columns: logger.warning(f"股票 {stock_code} 缺少目标列,跳过") continue X = df[features] y = df['target'] # 确保没有NaN值 if X.isnull().any().any(): logger.warning(f"股票 {stock_code} 特征包含NaN值,跳过") continue # 使用稀疏矩阵存储(减少内存) sparse_X = sparse.csr_matrix(X.values.astype(np.float32)) X_list.append(sparse_X) y_list.append(y.values) stock_group_list.extend([stock_code] * len(X)) # 为每个样本添加股票代码作为组标识 # 定期清理内存 if len(X_list) % 100 == 0: gc.collect() print_memory_usage() except Exception as e: logger.error(f"处理股票 {stock_code} 失败: {str(e)}", exc_info=True) if not X_list: logger.error("没有可用的训练数据") return None, None, None # 合并所有数据 X_full = sparse.vstack(X_list) y_full = np.concatenate(y_list) groups = np.array(stock_group_list) logger.info(f"数据集准备完成,样本数: {X_full.shape[0]}") logger.info(f"目标分布: 0={sum(y_full==0)}, 1={sum(y_full==1)}") return X_full, y_full, groups def feature_selection(self, X, y): """执行特征选择(内存优化版)""" logger.info("执行特征选择...") # 使用基模型评估特征重要性 base_model = lgb.LGBMClassifier( n_estimators=100, random_state=42, n_jobs=-1 ) # 分批训练(减少内存占用) batch_size = 100000 for i in range(0, X.shape[0], batch_size): end_idx = min(i + batch_size, X.shape[0]) X_batch = X[i:end_idx].toarray() if sparse.issparse(X) else X[i:end_idx] y_batch = y[i:end_idx] if i == 0: base_model.fit(X_batch, y_batch) else: base_model.fit(X_batch, y_batch, init_model=base_model) # 获取特征重要性 importance = pd.Series(base_model.feature_importances_, index=self.config.PREDICT_FEATURES) importance = importance.sort_values(ascending=False) logger.info("特征重要性:\n" + importance.to_string()) # 选择前K个重要特征 k = min(15, len(self.config.PREDICT_FEATURES)) selected_features = importance.head(k).index.tolist() logger.info(f"选择前 {k} 个特征: {selected_features}") # 更新配置中的特征列表 self.config.PREDICT_FEATURES = selected_features # 转换特征矩阵 if sparse.issparse(X): # 对于稀疏矩阵,我们需要重新索引 feature_indices = [self.config.PREDICT_FEATURES.index(f) for f in selected_features] X_selected = X[:, feature_indices] else: X_selected = X[selected_features] return X_selected, selected_features def train_model(self, X, y, groups): """训练并优化模型(内存优化版)""" if X is None or len(y) == 0: logger.error("训练数据为空,无法训练模型") return None logger.info("开始训练模型...") # 1. 处理类别不平衡 pos_count = sum(y == 1) neg_count = sum(y == 0) scale_pos_weight = neg_count / pos_count logger.info(f"类别不平衡处理: 正样本权重 = {scale_pos_weight:.2f}") # 2. 特征选择 X_selected, selected_features = self.feature_selection(X, y) # 3. 自定义评分函数 - 关注正类召回率 def positive_recall_score(y_true, y_pred): return recall_score(y_true, y_pred, pos_label=1) custom_scorer = make_scorer(positive_recall_score, greater_is_better=True) # 4. 使用分组时间序列交叉验证(减少折数) group_kfold = GroupKFold(n_splits=2) # 减少折数以节省内存 cv = list(group_kfold.split(X_selected, y, groups=groups)) # 5. 创建模型 model = lgb.LGBMClassifier( objective='binary', random_state=42, n_jobs=-1, scale_pos_weight=scale_pos_weight, verbose=-1 ) # 6. 参数搜索(减少迭代次数) search = RandomizedSearchCV( estimator=model, param_distributions=self.config.PARAM_GRID, n_iter=10, # 减少迭代次数以节省内存 scoring=custom_scorer, cv=cv, verbose=2, n_jobs=1, # 减少并行任务以节省内存 pre_dispatch='2*n_jobs', # 控制任务分发 random_state=42 ) logger.info("开始参数搜索...") # 分批处理数据(减少内存占用) if sparse.issparse(X_selected): X_dense = X_selected.toarray() # 转换为密集矩阵用于搜索 else: X_dense = X_selected search.fit(X_dense, y) # 7. 使用最佳参数训练最终模型 best_params = search.best_params_ logger.info(f"最佳参数: {best_params}") logger.info(f"最佳召回率: {search.best_score_}") final_model = lgb.LGBMClassifier( **best_params, objective='binary', random_state=42, n_jobs=-1, scale_pos_weight=scale_pos_weight ) # 使用早停策略训练最终模型 logger.info("训练最终模型...") final_model.fit( X_dense, y, eval_set=[(X_dense, y)], eval_metric='binary_logloss', callbacks=[ lgb.early_stopping(stopping_rounds=50, verbose=False), lgb.log_evaluation(period=100) ] ) # 保存特征重要性 self.feature_importance = pd.Series( final_model.feature_importances_, index=selected_features ).sort_values(ascending=False) # 8. 保存模型 model_path = f"{self.model_name}.pkl" joblib.dump((final_model, selected_features), model_path) logger.info(f"模型已保存到: {model_path}") return final_model def evaluate_model(self, model, X_test, y_test): """评估模型性能""" if model is None or len(X_test) == 0: logger.warning("无法评估模型,缺少数据或模型") return # 预测测试集 y_pred = model.predict(X_test) # 计算召回率 recall = recall_score(y_test, y_pred, pos_label=1) logger.info(f"测试集召回率: {recall:.4f}") # 计算满足条件的样本比例 condition_ratio = sum(y_test == 1) / len(y_test) logger.info(f"满足条件的样本比例: {condition_ratio:.4f}") # 详细分类报告 report = classification_report(y_test, y_pred) logger.info("分类报告:\n" + report) # 特征重要性 if self.feature_importance is not None: logger.info("特征重要性:\n" + self.feature_importance.to_string()) # ========== 主程序 ========== def main(): # 初始化配置 config = StockConfig() logger.info("===== 股票上涨预测程序 (修复版) =====") # 加载训练数据(添加抽样) logger.info(f"加载训练数据: {config.START_DATE} 至 {config.END_DATE}") train_data = load_stock_data( config.SH_PATH, config.SZ_PATH, config.START_DATE, config.END_DATE, sample_fraction=config.SAMPLE_FRACTION, debug_mode=config.DEBUG_MODE, max_stocks=config.MAX_STOCKS ) if not train_data: logger.error("错误: 没有加载到任何股票数据,请检查数据路径和格式") return # 特征工程 feature_engineer = FeatureEngineer(config) # 聚类分析 cluster_model = StockCluster(config) try: cluster_model.fit(train_data) except Exception as e: logger.error(f"聚类分析失败: {str(e)}", exc_info=True) # 创建默认聚类映射 cluster_model.cluster_map = {code: 0 for code in train_data.keys()} logger.info("使用默认聚类(所有股票归为同一类)") # 准备训练数据 trainer = StockModelTrainer(config) try: X_train, y_train, groups = trainer.prepare_dataset( train_data, cluster_model, feature_engineer ) except Exception as e: logger.error(f"准备训练数据失败: {str(e)}", exc_info=True) return if X_train is None or len(y_train) == 0: logger.error("错误: 没有可用的训练数据") return # 训练模型 model = trainer.train_model(X_train, y_train, groups) if model is None: logger.error("模型训练失败") return # 加载测试数据(添加抽样) logger.info(f"\n加载测试数据: {config.TEST_START} 至 {config.TEST_END}") test_data = load_stock_data( config.SH_PATH, config.SZ_PATH, config.TEST_START, config.TEST_END, sample_fraction=config.SAMPLE_FRACTION, debug_mode=config.DEBUG_MODE, max_stocks=config.MAX_STOCKS ) if test_data: # 准备测试数据 X_test, y_test, _ = trainer.prepare_dataset( test_data, cluster_model, feature_engineer ) if X_test is not None and len(y_test) > 0: # 评估模型 if sparse.issparse(X_test): X_test = X_test.toarray() trainer.evaluate_model(model, X_test, y_test) else: logger.warning("测试数据准备失败,无法评估模型") else: logger.warning("没有测试数据可用") logger.info("===== 程序执行完成 =====") if __name__ == "__main__": main()

import tkinter as tk from tkinter import ttk, filedialog, messagebox import pandas as pd import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import tensorflow as tf from tensorflow.keras.models import Model from tensorflow.keras.layers import Input, Dense, Lambda from tensorflow.keras.optimizers import Adam from sklearn.preprocessing import MinMaxScaler import os import time import warnings import matplotlib.dates as mdates warnings.filterwarnings('ignore', category=UserWarning, module='tensorflow') mpl.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Arial Unicode MS'] mpl.rcParams['axes.unicode_minus'] = False # 关键修复:使用 ASCII 减号 # 设置中文字体支持 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False class PINNModel(tf.keras.Model): def __init__(self, num_layers=4, hidden_units=32, dropout_rate=0.1, l2_reg=0.001, **kwargs): super(PINNModel, self).__init__(**kwargs) # 增加输入维度处理能力 self.dense_layers = [] self.bn_layers = [] # 批量归一化层 self.dropout_layers = [] # 添加L2正则化 self.l2_reg = l2_reg # 创建更深的网络结构 for i in range(num_layers): # 第一层使用更大的维度 units = hidden_units * 2 if i == 0 else hidden_units self.dense_layers.append( Dense(units, activation='swish', # 使用Swish激活函数 kernel_regularizer=tf.keras.regularizers.l2(l2_reg)) ) self.bn_layers.append(tf.keras.layers.BatchNormalization()) self.dropout_layers.append(tf.keras.layers.Dropout(dropout_rate)) # 添加跳跃连接层 self.residual_layer = Dense(hidden_units, activation='linear') # 最终输出层 self.final_layer = Dense(1, activation='linear', kernel_regularizer=tf.keras.regularizers.l2(l2_reg)) # 物理参数优化 - 使用更灵活的参数化方法 # 基本衰减系数 (使用指数确保正值) self.k1_log = tf.Variable(tf.math.log(0.1), trainable=True, dtype=tf.float32, name='k1_log') # 水位依赖的衰减系数 (使用softplus确保正值) self.k2_raw = tf.Variable(0.01, trainable=True, dtype=tf.float32, name='k2_raw') # 非线性项系数 (使用sigmoid约束在[0,1]) self.alpha_raw = tf.Variable(0.1, trainable=True, dtype=tf.float32, name='alpha_raw') # 外部影响系数 (使用tanh约束在[-0.2,0.2]) self.beta_raw = tf.Variable(0.05, trainable=True, dtype=tf.float32, name='beta_raw') @property def k1(self): """指数变换确保正值""" return tf.exp(self.k1_log) @property def k2(self): """Softplus变换确保正值""" return tf.math.softplus(self.k2_raw) * 0.1 # 约束在0-0.1之间 @property def alpha(self): """Sigmoid约束在[0,1]""" return tf.math.sigmoid(self.alpha_raw) @property def beta(self): """Tanh约束在[-0.2,0.2]""" return tf.math.tanh(self.beta_raw) * 0.2 def call(self, inputs, training=False): # 解包输入 t, h, dt, lag1, lag3, lag7, month_feats, season_feats = inputs # 组合所有特征 x = tf.concat([ t, h, dt, lag1, lag3, lag7, month_feats, season_feats, t * h, h * dt, t * dt, (t * h * dt), h * lag1, h * lag3, h * lag7, dt * lag1, dt * lag3, dt * lag7 ], axis=1) # 初始投影层 x_proj = Dense(64, activation='swish')(x) # 残差块 residual = self.residual_layer(x_proj) # 通过所有隐藏层(带批量归一化和dropout) for i, (dense_layer, bn_layer, dropout_layer) in enumerate(zip( self.dense_layers, self.bn_layers, self.dropout_layers)): # 第一层使用残差连接 if i == 0: x = dense_layer(x_proj) x = bn_layer(x, training=training) x = dropout_layer(x, training=training) x = x + residual # 残差连接 else: x = dense_layer(x) x = bn_layer(x, training=training) x = dropout_layer(x, training=training) # 注意力机制 - 增强重要特征 attention = Dense(x.shape[-1], activation='sigmoid')(x) x = x * attention return self.final_layer(x) def physics_loss(self, t, h_current, dt, training=False): """改进的物理损失计算""" # 创建零值占位符 batch_size = tf.shape(t)[0] zeros = tf.zeros((batch_size, 1), dtype=tf.float32) zeros2 = tf.zeros((batch_size, 2), dtype=tf.float32) # 预测下一时刻水位 h_next_pred = self([t, h_current, dt, zeros, zeros, zeros, zeros2, zeros2], training=training) # 物理方程计算 - 使用改进的公式 # 非线性衰减项 decay_factor = self.k1 + self.k2 * h_current exponent = -decay_factor * dt exponent = tf.clip_by_value(exponent, -50.0, 50.0) decay_term = h_current * tf.exp(exponent) # 外部影响项(考虑时间依赖性) external_factor = self.alpha * self.beta * dt external_factor = tf.clip_by_value(external_factor, -10.0, 10.0) external_term = self.alpha * (1 - tf.exp(-external_factor)) # 残差计算 residual = h_next_pred - (decay_term + external_term) # 添加物理参数正则化(鼓励简单物理模型) param_reg = 0.01 * (tf.abs(self.k1) + tf.abs(self.k2) + tf.abs(self.alpha) + tf.abs(self.beta)) return tf.reduce_mean(tf.square(residual)) + param_reg class DamSeepageModel: def __init__(self, root): self.root = root self.root.title("大坝渗流预测模型(PINNs)") self.root.geometry("1200x800") # 初始化数据 self.train_df = None # 训练集 self.test_df = None # 测试集 self.model = None self.scaler_t = MinMaxScaler(feature_range=(0, 1)) self.scaler_h = MinMaxScaler(feature_range=(0, 1)) self.scaler_dt = MinMaxScaler(feature_range=(0, 1)) self.evaluation_metrics = {} # 创建主界面 self.create_widgets() def create_widgets(self): # 创建主框架 main_frame = ttk.Frame(self.root, padding=10) main_frame.pack(fill=tk.BOTH, expand=True) # 左侧控制面板 control_frame = ttk.LabelFrame(main_frame, text="模型控制", padding=10) control_frame.pack(side=tk.LEFT, fill=tk.Y, padx=5, pady=5) # 文件选择部分 file_frame = ttk.LabelFrame(control_frame, text="数据文件", padding=10) file_frame.pack(fill=tk.X, pady=5) # 训练集选择 ttk.Label(file_frame, text="训练集:").grid(row=0, column=0, sticky=tk.W, pady=5) self.train_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.train_file_var, width=30, state='readonly').grid( row=0, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("train")).grid(row=0, column=2) # 测试集选择 ttk.Label(file_frame, text="测试集:").grid(row=1, column=0, sticky=tk.W, pady=5) self.test_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.test_file_var, width=30, state='readonly').grid(row=1, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("test")).grid(row=1, column=2) # PINNs参数设置 param_frame = ttk.LabelFrame(control_frame, text="PINNs参数", padding=10) param_frame.pack(fill=tk.X, pady=10) # 验证集切分比例 ttk.Label(param_frame, text="验证集比例:").grid(row=0, column=0, sticky=tk.W, pady=5) self.split_ratio_var = tk.DoubleVar(value=0.2) ttk.Spinbox(param_frame, from_=0, to=1, increment=0.05, textvariable=self.split_ratio_var, width=10).grid(row=0, column=1, padx=5) # 隐藏层数量 ttk.Label(param_frame, text="网络层数:").grid(row=1, column=0, sticky=tk.W, pady=5) self.num_layers_var = tk.IntVar(value=4) ttk.Spinbox(param_frame, from_=2, to=8, increment=1, textvariable=self.num_layers_var, width=10).grid(row=1, column=1, padx=5) # 每层神经元数量 ttk.Label(param_frame, text="神经元数/层:").grid(row=2, column=0, sticky=tk.W, pady=5) self.hidden_units_var = tk.IntVar(value=32) ttk.Spinbox(param_frame, from_=16, to=128, increment=4, textvariable=self.hidden_units_var, width=10).grid(row=2, column=1, padx=5) # 训练轮次 ttk.Label(param_frame, text="训练轮次:").grid(row=3, column=0, sticky=tk.W, pady=5) self.epochs_var = tk.IntVar(value=500) ttk.Spinbox(param_frame, from_=100, to=2000, increment=100, textvariable=self.epochs_var, width=10).grid(row=3, column=1, padx=5) # 物理损失权重 ttk.Label(param_frame, text="物理损失权重:").grid(row=4, column=0, sticky=tk.W, pady=5) self.physics_weight_var = tk.DoubleVar(value=0.5) ttk.Spinbox(param_frame, from_=0.1, to=1.0, increment=0.1, textvariable=self.physics_weight_var, width=10).grid(row=4, column=1, padx=5) # 控制按钮 btn_frame = ttk.Frame(control_frame) btn_frame.pack(fill=tk.X, pady=10) ttk.Button(btn_frame, text="训练模型", command=self.train_model).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="预测结果", command=self.predict).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="保存结果", command=self.save_results).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="重置", command=self.reset).pack(side=tk.RIGHT, padx=5) # 状态栏 self.status_var = tk.StringVar(value="就绪") status_bar = ttk.Label(control_frame, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W) status_bar.pack(fill=tk.X, side=tk.BOTTOM) # 右侧结果显示区域 result_frame = ttk.Frame(main_frame) result_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=5, pady=5) # 创建标签页 self.notebook = ttk.Notebook(result_frame) self.notebook.pack(fill=tk.BOTH, expand=True) # 损失曲线标签页 self.loss_frame = ttk.Frame(self.notebook) self.notebook.add(self.loss_frame, text="训练损失") # 预测结果标签页 self.prediction_frame = ttk.Frame(self.notebook) self.notebook.add(self.prediction_frame, text="预测结果") # 指标显示 self.metrics_var = tk.StringVar() metrics_label = ttk.Label( self.prediction_frame, textvariable=self.metrics_var, font=('TkDefaultFont', 10, 'bold'), relief='ridge', padding=5 ) metrics_label.pack(fill=tk.X, padx=5, pady=5) # 初始化绘图区域 self.fig, self.ax = plt.subplots(figsize=(10, 6)) self.canvas = FigureCanvasTkAgg(self.fig, master=self.prediction_frame) self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) # 损失曲线画布 self.loss_fig, self.loss_ax = plt.subplots(figsize=(10, 4)) self.loss_canvas = FigureCanvasTkAgg(self.loss_fig, master=self.loss_frame) self.loss_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) def select_file(self, file_type): """选择Excel文件并计算时间步长""" try: file_path = filedialog.askopenfilename( title=f"选择{file_type}集Excel文件", filetypes=[("Excel文件", "*.xlsx *.xls"), ("所有文件", "*.*")] ) if not file_path: return df = pd.read_excel(file_path) # 验证必需列是否存在 required_cols = ['year', 'month', 'day', '水位'] missing_cols = [col for col in required_cols if col not in df.columns] if missing_cols: messagebox.showerror("列名错误", f"缺少必需列: {', '.join(missing_cols)}") return # 时间特征处理 time_features = ['year', 'month', 'day'] missing_time_features = [feat for feat in time_features if feat not in df.columns] if missing_time_features: messagebox.showerror("列名错误", f"Excel文件缺少预处理后的时间特征列: {', '.join(missing_time_features)}") return # 创建时间戳列 (增强兼容性) time_cols = ['year', 'month', 'day'] if 'hour' in df.columns: time_cols.append('hour') if 'minute' in df.columns: time_cols.append('minute') if 'second' in df.columns: time_cols.append('second') # 填充缺失的时间单位 for col in ['hour', 'minute', 'second']: if col not in df.columns: df[col] = 0 df['datetime'] = pd.to_datetime(df[time_cols]) # 设置时间索引 df = df.set_index('datetime') # 计算相对时间(天) df['days'] = (df.index - df.index[0]).days # 新增:计算时间步长dt(单位:天) df['dt'] = df.index.to_series().diff().dt.total_seconds() / 86400 # 精确到秒级 # 处理时间步长异常值 if len(df) > 1: # 计算有效时间步长(排除<=0的值) valid_dt = df['dt'][df['dt'] > 0] if len(valid_dt) > 0: avg_dt = valid_dt.mean() else: avg_dt = 1.0 else: avg_dt = 1.0 # 替换非正值 df.loc[df['dt'] <= 0, 'dt'] = avg_dt # 填充缺失值 df['dt'] = df['dt'].fillna(avg_dt) # 添加滞后特征 (1天、3天、7天的水位) df['水位_lag1'] = df['水位'].shift(1) df['水位_lag3'] = df['水位'].shift(3) df['水位_lag7'] = df['水位'].shift(7) # 填充缺失的滞后值(用第一个有效值向后填充) lag_cols = ['水位_lag1', '水位_lag3', '水位_lag7'] df[lag_cols] = df[lag_cols].fillna(method='bfill') # 添加周期性特征 df['month'] = df.index.month df['day_of_year'] = df.index.dayofyear # 月份的正弦/余弦变换 df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12) df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12) # 季节特征(每3个月一个季节) seasons = [(12, 1, 2), (3, 4, 5), (6, 7, 8), (9, 10, 11)] season_map = {} for i, months in enumerate(seasons): for month in months: season_map[month] = i df['season'] = df['month'].map(season_map) # 季节的正弦/余弦变换 df['season_sin'] = np.sin(2 * np.pi * df['season'] / 4) df['season_cos'] = np.cos(2 * np.pi * df['season'] / 4) # 保存数据 if file_type == "train": self.train_df = df self.train_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载训练集: {len(self.train_df)}条数据") else: self.test_df = df self.test_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载测试集: {len(self.test_df)}条数据") except Exception as e: error_msg = f"文件读取失败: {str(e)}\n\n请确保:\n1. 文件不是打开状态\n2. 文件格式正确\n3. 包含必需的时间和水位列" messagebox.showerror("文件错误", error_msg) def calculate_metrics(self, y_true, y_pred): """计算评估指标""" from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score mse = mean_squared_error(y_true, y_pred) rmse = np.sqrt(mse) mae = mean_absolute_error(y_true, y_pred) non_zero_idx = np.where(y_true != 0)[0] if len(non_zero_idx) > 0: mape = np.mean(np.abs((y_true[non_zero_idx] - y_pred[non_zero_idx]) / y_true[non_zero_idx])) * 100 else: mape = float('nan') r2 = r2_score(y_true, y_pred) return { 'MSE': mse, 'RMSE': rmse, 'MAE': mae, 'MAPE': mape, # 修正键名 'R2': r2 } def train_model(self): """训练PINNs模型(带早停机制+训练指标监控)""" if self.train_df is None: messagebox.showwarning("警告", "请先选择训练集文件") return try: self.status_var.set("正在预处理数据...") self.root.update() # 从训练集中切分训练子集和验证子集(时间顺序切分) split_ratio = 1 - self.split_ratio_var.get() split_idx = int(len(self.train_df) * split_ratio) train_subset = self.train_df.iloc[:split_idx] valid_subset = self.train_df.iloc[split_idx:] # 检查数据量是否足够 if len(train_subset) < 2 or len(valid_subset) < 2: messagebox.showerror("数据错误", "训练集数据量不足(至少需要2个时间步)") return # ===== 新增:创建特征归一化器 ===== self.scaler_lag1 = MinMaxScaler(feature_range=(0, 1)) self.scaler_lag3 = MinMaxScaler(feature_range=(0, 1)) self.scaler_lag7 = MinMaxScaler(feature_range=(0, 1)) self.scaler_month = MinMaxScaler(feature_range=(0, 1)) self.scaler_season = MinMaxScaler(feature_range=(0, 1)) # 数据预处理 - 分别归一化不同特征 # 时间特征 t_train = train_subset['days'].values[1:].reshape(-1, 1) self.scaler_t.fit(t_train) t_train_scaled = self.scaler_t.transform(t_train).astype(np.float32) # 水位特征 h_train = train_subset['水位'].values[:-1].reshape(-1, 1) self.scaler_h.fit(h_train) h_train_scaled = self.scaler_h.transform(h_train).astype(np.float32) # 时间步长特征 dt_train = train_subset['dt'].values[1:].reshape(-1, 1) self.scaler_dt.fit(dt_train) dt_train_scaled = self.scaler_dt.transform(dt_train).astype(np.float32) # ===== 新增:归一化滞后特征 ===== lag1_train = train_subset['水位_lag1'].values[:-1].reshape(-1, 1) self.scaler_lag1.fit(lag1_train) lag1_train_scaled = self.scaler_lag1.transform(lag1_train).astype(np.float32) lag3_train = train_subset['水位_lag3'].values[:-1].reshape(-1, 1) self.scaler_lag3.fit(lag3_train) lag3_train_scaled = self.scaler_lag3.transform(lag3_train).astype(np.float32) lag7_train = train_subset['水位_lag7'].values[:-1].reshape(-1, 1) self.scaler_lag7.fit(lag7_train) lag7_train_scaled = self.scaler_lag7.transform(lag7_train).astype(np.float32) # ===== 新增:归一化周期性特征 ===== month_sin_train = train_subset['month_sin'].values[:-1].reshape(-1, 1) month_cos_train = train_subset['month_cos'].values[:-1].reshape(-1, 1) month_features_train = np.hstack([month_sin_train, month_cos_train]) self.scaler_month.fit(month_features_train) month_features_train_scaled = self.scaler_month.transform(month_features_train).astype(np.float32) season_sin_train = train_subset['season_sin'].values[:-1].reshape(-1, 1) season_cos_train = train_subset['season_cos'].values[:-1].reshape(-1, 1) season_features_train = np.hstack([season_sin_train, season_cos_train]) self.scaler_season.fit(season_features_train) season_features_train_scaled = self.scaler_season.transform(season_features_train).astype(np.float32) # 归一化标签(下一时刻水位) h_next_train = train_subset['水位'].values[1:].reshape(-1, 1) h_next_train_scaled = self.scaler_h.transform(h_next_train).astype(np.float32) # 准备验证数据(同样进行归一化) t_valid = valid_subset['days'].values[1:].reshape(-1, 1) t_valid_scaled = self.scaler_t.transform(t_valid).astype(np.float32) h_valid = valid_subset['水位'].values[:-1].reshape(-1, 1) h_valid_scaled = self.scaler_h.transform(h_valid).astype(np.float32) dt_valid = valid_subset['dt'].values[1:].reshape(-1, 1) dt_valid_scaled = self.scaler_dt.transform(dt_valid).astype(np.float32) h_next_valid_scaled = self.scaler_h.transform( valid_subset['水位'].values[1:].reshape(-1, 1) ).astype(np.float32) # 原始值用于指标计算 h_next_train_true = h_next_train h_next_valid_true = valid_subset['水位'].values[1:].reshape(-1, 1) # 创建模型和优化器 self.model = PINNModel( num_layers=self.num_layers_var.get(), hidden_units=self.hidden_units_var.get() ) # 创建动态学习率调度器 initial_lr = 0.001 lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay( initial_learning_rate=initial_lr, decay_steps=100, # 每100步衰减一次 decay_rate=0.95, # 衰减率 staircase=True # 阶梯式衰减 ) optimizer = Adam(learning_rate=lr_schedule) # ===== 新增:验证集的滞后特征归一化 ===== lag1_valid = valid_subset['水位_lag1'].values[:-1].reshape(-1, 1) lag1_valid_scaled = self.scaler_lag1.transform(lag1_valid).astype(np.float32) lag3_valid = valid_subset['水位_lag3'].values[:-1].reshape(-1, 1) lag3_valid_scaled = self.scaler_lag3.transform(lag3_valid).astype(np.float32) lag7_valid = valid_subset['水位_lag7'].values[:-1].reshape(-1, 1) lag7_valid_scaled = self.scaler_lag7.transform(lag7_valid).astype(np.float32) # ===== 新增:验证集的周期性特征归一化 ===== month_sin_valid = valid_subset['month_sin'].values[:-1].reshape(-1, 1) month_cos_valid = valid_subset['month_cos'].values[:-1].reshape(-1, 1) month_features_valid = np.hstack([month_sin_valid, month_cos_valid]) month_features_valid_scaled = self.scaler_month.transform(month_features_valid).astype(np.float32) season_sin_valid = valid_subset['season_sin'].values[:-1].reshape(-1, 1) season_cos_valid = valid_subset['season_cos'].values[:-1].reshape(-1, 1) season_features_valid = np.hstack([season_sin_valid, season_cos_valid]) season_features_valid_scaled = self.scaler_season.transform(season_features_valid).astype(np.float32) # 在训练循环中,使用归一化后的数据 train_dataset = tf.data.Dataset.from_tensor_slices( ((t_train_scaled, h_train_scaled, dt_train_scaled, lag1_train_scaled, lag3_train_scaled, lag7_train_scaled, month_features_train_scaled, season_features_train_scaled), h_next_train_scaled) ) train_dataset = train_dataset.shuffle(buffer_size=1024).batch(32) valid_dataset = tf.data.Dataset.from_tensor_slices( ((t_valid_scaled, h_valid_scaled, dt_valid_scaled, lag1_valid_scaled, lag3_valid_scaled, lag7_valid_scaled, month_features_valid_scaled, season_features_valid_scaled), h_next_valid_scaled) ) valid_dataset = valid_dataset.batch(32) # 初始化训练历史记录列表 train_data_loss_history = [] physics_loss_history = [] valid_data_loss_history = [] train_metrics_history = [] valid_metrics_history = [] # 早停机制参数 patience = int(self.epochs_var.get() / 3) min_delta = 1e-4 best_valid_loss = float('inf') wait = 0 best_epoch = 0 best_weights = None start_time = time.time() # 自定义训练循环 for epoch in range(self.epochs_var.get()): # 获取当前学习率 current_lr = optimizer.learning_rate.numpy() # 训练阶段 epoch_train_data_loss = [] epoch_physics_loss = [] train_pred_scaled = [] # 修改后的解包方式 # 在训练循环中: for step, (inputs, h_next_batch) in enumerate(train_dataset): t_batch, h_batch, dt_batch, lag1_batch, lag3_batch, lag7_batch, month_feats_batch, season_feats_batch = inputs with tf.GradientTape() as tape: # 预测下一时刻水位 h_pred = self.model([ t_batch, h_batch, dt_batch, lag1_batch, lag3_batch, lag7_batch, month_feats_batch, season_feats_batch ], training=True) data_loss = tf.reduce_mean(tf.square(h_next_batch - h_pred)) # 计算物理损失 physics_loss = self.model.physics_loss(t_batch, h_batch, dt_batch, training=True) # 使用设置的物理损失权重 current_physics_weight = self.physics_weight_var.get() # 添加L2正则化损失 l2_loss = tf.reduce_sum(self.model.losses) # 总损失 loss = data_loss + current_physics_weight * physics_loss + l2_loss grads = tape.gradient(loss, self.model.trainable_variables) optimizer.apply_gradients(zip(grads, self.model.trainable_variables)) epoch_train_data_loss.append(data_loss.numpy()) epoch_physics_loss.append(physics_loss.numpy()) train_pred_scaled.append(h_pred.numpy()) # 合并训练预测值 train_pred_scaled = np.concatenate(train_pred_scaled, axis=0) train_pred_true = self.scaler_h.inverse_transform(train_pred_scaled) train_metrics = self.calculate_metrics( y_true=h_next_train_true.flatten(), y_pred=train_pred_true.flatten() ) train_metrics_history.append(train_metrics) # 验证阶段 epoch_valid_data_loss = [] valid_pred_scaled = [] for (inputs, h_v_next_batch) in valid_dataset: t_v_batch, h_v_batch, dt_v_batch, lag1_v_batch, lag3_v_batch, lag7_v_batch, month_feats_v_batch, season_feats_v_batch = inputs h_v_pred = self.model([ t_v_batch, h_v_batch, dt_v_batch, lag1_v_batch, lag3_v_batch, lag7_v_batch, month_feats_v_batch, season_feats_v_batch ], training=False) valid_data_loss = tf.reduce_mean(tf.square(h_v_next_batch - h_v_pred)) epoch_valid_data_loss.append(valid_data_loss.numpy()) valid_pred_scaled.append(h_v_pred.numpy()) # 合并验证预测值(归一化后) valid_pred_scaled = np.concatenate(valid_pred_scaled, axis=0) # 反归一化得到原始预测值 valid_pred_true = self.scaler_h.inverse_transform(valid_pred_scaled) # 计算验证集指标(使用原始真实值和预测值) valid_metrics = self.calculate_metrics( y_true=h_next_valid_true.flatten(), y_pred=valid_pred_true.flatten() ) valid_metrics_history.append(valid_metrics) # 计算平均损失 avg_train_data_loss = np.mean(epoch_train_data_loss) avg_physics_loss = np.mean(epoch_physics_loss) avg_valid_data_loss = np.mean(epoch_valid_data_loss) # 记录损失 train_data_loss_history.append(avg_train_data_loss) physics_loss_history.append(avg_physics_loss) valid_data_loss_history.append(avg_valid_data_loss) # 早停机制逻辑 current_valid_loss = avg_valid_data_loss # 早停机制逻辑 current_valid_loss = avg_valid_data_loss if current_valid_loss < best_valid_loss - min_delta: best_valid_loss = current_valid_loss best_epoch = epoch + 1 wait = 0 best_weights = self.model.get_weights() else: wait += 1 if wait >= patience: self.status_var.set(f"触发早停!最佳轮次: {best_epoch},最佳验证损失: {best_valid_loss:.4f}") if best_weights is not None: self.model.set_weights(best_weights) break # 确保在此处退出循环 # 更新状态(添加当前学习率显示) if epoch % 1 == 0: # 提取当前训练/验证的关键指标 train_rmse = train_metrics['RMSE'] valid_rmse = valid_metrics['RMSE'] train_r2 = train_metrics['R2'] valid_r2 = valid_metrics['R2'] elapsed = time.time() - start_time self.status_var.set( f"训练中 | 轮次: {epoch + 1}/{self.epochs_var.get()} | " f"学习率: {current_lr:.6f} | " f"训练RMSE: {train_rmse:.4f} | 验证RMSE: {valid_rmse:.4f} | " f"训练R²: {train_r2:.4f} | 验证R²: {valid_r2:.4f} | " f"k1: {self.model.k1.numpy():.6f}, k2: {self.model.k2.numpy():.6f} | 时间: {elapsed:.1f}秒 | 早停等待: {wait}/{patience}" ) self.root.update() # 绘制损失曲线 self.loss_ax.clear() epochs_range = range(1, len(train_data_loss_history) + 1) self.loss_ax.plot(epochs_range, train_data_loss_history, 'b-', label='训练数据损失') self.loss_ax.plot(epochs_range, physics_loss_history, 'r--', label='物理损失') self.loss_ax.plot(epochs_range, valid_data_loss_history, 'g-.', label='验证数据损失') self.loss_ax.set_title('PINNs训练与验证损失') self.loss_ax.set_xlabel('轮次') self.loss_ax.set_ylabel('损失', rotation=0) self.loss_ax.legend() self.loss_ax.grid(True, alpha=0.3) self.loss_ax.set_yscale('log') self.loss_canvas.draw() # 训练完成提示 elapsed = time.time() - start_time if wait >= patience: completion_msg = ( f"早停触发 | 最佳轮次: {best_epoch} | 最佳验证损失: {best_valid_loss:.4f} | " f"最佳验证RMSE: {valid_metrics_history[best_epoch - 1]['RMSE']:.4f} | " f"总时间: {elapsed:.1f}秒" ) else: completion_msg = ( f"训练完成 | 总轮次: {self.epochs_var.get()} | " f"最终训练RMSE: {train_metrics_history[-1]['RMSE']:.4f} | " f"最终验证RMSE: {valid_metrics_history[-1]['RMSE']:.4f} | " f"最终训练R²: {train_metrics_history[-1]['R2']:.4f} | " f"最终验证R²: {valid_metrics_history[-1]['R2']:.4f} | " f"总时间: {elapsed:.1f}秒" ) # 保存训练历史 self.train_history = { 'train_data_loss': train_data_loss_history, 'physics_loss': physics_loss_history, 'valid_data_loss': valid_data_loss_history, 'train_metrics': train_metrics_history, 'valid_metrics': valid_metrics_history } # 保存学习到的物理参数 self.learned_params = { "k1": self.model.k1.numpy(), "k2": self.model.k2.numpy(), "alpha": self.model.alpha.numpy(), "beta": self.model.beta.numpy() } self.status_var.set(completion_msg) messagebox.showinfo("训练完成", f"PINNs模型训练成功完成!\n{completion_msg}") except Exception as e: messagebox.showerror("训练错误", f"模型训练失败:\n{str(e)}") self.status_var.set("训练失败") def predict(self): """使用PINNs模型进行递归预测(带Teacher Forcing和蒙特卡洛Dropout)""" if self.model is None: messagebox.showwarning("警告", "请先训练模型") return if self.test_df is None: messagebox.showwarning("警告", "请先选择测试集文件") return try: self.status_var.set("正在生成预测(使用Teacher Forcing和MC Dropout)...") self.root.update() # 预处理测试数据 - 归一化 t_test = self.test_df['days'].values.reshape(-1, 1) t_test_scaled = self.scaler_t.transform(t_test).astype(np.float32) dt_test = self.test_df['dt'].values.reshape(-1, 1) dt_test_scaled = self.scaler_dt.transform(dt_test).astype(np.float32) h_test = self.test_df['水位'].values.reshape(-1, 1) h_test_scaled = self.scaler_h.transform(h_test).astype(np.float32) # ===== 新增:归一化测试集的滞后特征 ===== lag1_test = self.test_df['水位_lag1'].values.reshape(-1, 1) lag1_test_scaled = self.scaler_lag1.transform(lag1_test).astype(np.float32) lag3_test = self.test_df['水位_lag3'].values.reshape(-1, 1) lag3_test_scaled = self.scaler_lag3.transform(lag3_test).astype(np.float32) lag7_test = self.test_df['水位_lag7'].values.reshape(-1, 1) lag7_test_scaled = self.scaler_lag7.transform(lag7_test).astype(np.float32) # ===== 新增:归一化测试集的周期性特征 ===== month_sin_test = self.test_df['month_sin'].values.reshape(-1, 1) month_cos_test = self.test_df['month_cos'].values.reshape(-1, 1) month_features_test = np.hstack([month_sin_test, month_cos_test]) month_features_test_scaled = self.scaler_month.transform(month_features_test).astype(np.float32) season_sin_test = self.test_df['season_sin'].values.reshape(-1, 1) season_cos_test = self.test_df['season_cos'].values.reshape(-1, 1) season_features_test = np.hstack([season_sin_test, season_cos_test]) season_features_test_scaled = self.scaler_season.transform(season_features_test).astype(np.float32) # 改进的递归预测参数 n = len(t_test) mc_iterations = 100 adaptive_forcing = True # 存储蒙特卡洛采样结果 mc_predictions_scaled = np.zeros((mc_iterations, n, 1), dtype=np.float32) # 进行多次蒙特卡洛采样 for mc_iter in range(mc_iterations): predicted_scaled = np.zeros((n, 1), dtype=np.float32) predicted_scaled[0] = h_test_scaled[0] # 第一个点使用真实值 # 递归预测(带自适应教师强制) for i in range(1, n): # 自适应教师强制:后期阶段增加真实值使用频率 if adaptive_forcing: # 前期70%概率使用真实值,后期提高到90% teacher_forcing_prob = 0.7 + 0.2 * min(1.0, i / (0.7 * n)) else: teacher_forcing_prob = 0.7 # 决定使用真实值还是预测值 use_actual = np.random.rand() < teacher_forcing_prob if use_actual and i < n - 1: # 不能使用未来值 h_prev = h_test_scaled[i - 1:i] else: h_prev = predicted_scaled[i - 1:i] t_prev = t_test_scaled[i - 1:i] dt_i = dt_test_scaled[i:i + 1] # 准备输入特征 inputs = ( t_test_scaled[i - 1:i], # 时间 h_prev, # 当前水位 dt_test_scaled[i:i + 1], # 时间步长 lag1_test_scaled[i - 1:i], lag3_test_scaled[i - 1:i], lag7_test_scaled[i - 1:i], month_features_test_scaled[i - 1:i], season_features_test_scaled[i - 1:i] ) # 直接传递整个输入元组 h_pred = self.model(inputs, training=True) # 物理模型预测值(用于约束) k1 = self.learned_params['k1'] k2 = self.learned_params['k2'] alpha = self.learned_params['alpha'] beta = self.learned_params['beta'] # 物理方程预测 exponent = - (k1 + k2 * h_prev) * dt_i decay_term = h_prev * np.exp(exponent) external_term = alpha * (1 - np.exp(-beta * dt_i)) physics_pred = decay_term + external_term # 混合预测:神经网络预测与物理模型预测加权平均 physics_weight = 0.3 # 物理模型权重 final_pred = physics_weight * physics_pred + (1 - physics_weight) * h_pred.numpy() predicted_scaled[i] = final_pred[0][0] mc_predictions_scaled[mc_iter] = predicted_scaled # 计算预测统计量 mean_pred_scaled = np.mean(mc_predictions_scaled, axis=0) std_pred_scaled = np.std(mc_predictions_scaled, axis=0) # 反归一化结果 predictions = self.scaler_h.inverse_transform(mean_pred_scaled) uncertainty = self.scaler_h.inverse_transform(std_pred_scaled) * 1.96 # 95%置信区间 actual_values = h_test test_time = self.test_df.index # 清除现有图表 self.ax.clear() # 计算合理的y轴范围 - 基于数据集中区域 # 获取实际值和预测值的中位数 median_val = np.median(actual_values) # 计算数据的波动范围(标准差) data_range = np.std(actual_values) * 4 # 4倍标准差覆盖大部分数据 # 设置y轴范围为中心值±数据波动范围 y_center = median_val y_half_range = max(data_range, 10) # 确保最小范围为20个单位 y_min_adjusted = y_center - y_half_range y_max_adjusted = y_center + y_half_range # 确保范围不为零 if y_max_adjusted - y_min_adjusted < 1: y_min_adjusted -= 5 y_max_adjusted += 5 # 绘制结果(带置信区间) self.ax.plot(test_time, actual_values, 'b-', label='真实值', linewidth=2) self.ax.plot(test_time, predictions, 'r--', label='预测均值', linewidth=2) self.ax.fill_between( test_time, (predictions - uncertainty).flatten(), (predictions + uncertainty).flatten(), color='orange', alpha=0.3, label='95%置信区间' ) # 设置自动调整的y轴范围 self.ax.set_ylim(y_min_adjusted, y_max_adjusted) self.ax.set_title('大坝渗流水位预测(PINNs with MC Dropout)') self.ax.set_xlabel('时间') self.ax.set_ylabel('测压管水位', rotation=0) self.ax.legend(loc='best') # 自动选择最佳位置 # 优化时间轴刻度 self.ax.xaxis.set_major_locator(mdates.YearLocator()) self.ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y')) self.ax.xaxis.set_minor_locator(mdates.MonthLocator(interval=2)) self.ax.grid(which='minor', axis='x', linestyle=':', color='gray', alpha=0.3) self.ax.grid(which='major', axis='y', linestyle='-', color='lightgray', alpha=0.5) self.ax.tick_params(axis='x', which='major', rotation=0, labelsize=9) self.ax.tick_params(axis='x', which='minor', length=2) # 计算评估指标(排除第一个点) eval_actual = actual_values[1:].flatten() eval_pred = predictions[1:].flatten() self.evaluation_metrics = self.calculate_metrics(eval_actual, eval_pred) # 添加不确定性指标 avg_uncertainty = np.mean(uncertainty) max_uncertainty = np.max(uncertainty) self.evaluation_metrics['Avg Uncertainty'] = avg_uncertainty self.evaluation_metrics['Max Uncertainty'] = max_uncertainty metrics_text = ( f"MSE: {self.evaluation_metrics['MSE']:.4f} | " f"RMSE: {self.evaluation_metrics['RMSE']:.4f} | " f"MAE: {self.evaluation_metrics['MAE']:.4f} | " f"MAPE: {self.evaluation_metrics['MAPE']:.2f}% | " f"R²: {self.evaluation_metrics['R2']:.4f}\n" f"平均不确定性: {avg_uncertainty:.4f} | 最大不确定性: {max_uncertainty:.4f}" ) self.metrics_var.set(metrics_text) # 在图表上添加指标 self.ax.text( 0.5, 1.05, metrics_text, transform=self.ax.transAxes, ha='center', fontsize=8, bbox=dict(facecolor='white', alpha=0.8) ) params_text = ( f"物理参数: k1={self.learned_params['k1']:.4f}, " f"k2={self.learned_params['k2']:.4f}, " f"alpha={self.learned_params['alpha']:.4f}, " f"beta={self.learned_params['beta']:.4f} | " f"Teacher Forcing概率: {teacher_forcing_prob}" ) self.ax.text( 0.5, 1.12, params_text, transform=self.ax.transAxes, ha='center', fontsize=8, bbox=dict(facecolor='white', alpha=0.8) ) # 调整布局 plt.tight_layout(pad=2.0) self.canvas.draw() # 保存预测结果 self.predictions = predictions self.uncertainty = uncertainty self.actual_values = actual_values self.test_time = test_time self.mc_predictions = mc_predictions_scaled self.status_var.set(f"预测完成(MC Dropout采样{mc_iterations}次)") except Exception as e: messagebox.showerror("预测错误", f"预测失败:\n{str(e)}") self.status_var.set("预测失败") import traceback traceback.print_exc() def save_results(self): """保存预测结果和训练历史数据""" if not hasattr(self, 'predictions') or not hasattr(self, 'train_history'): messagebox.showwarning("警告", "请先生成预测结果并完成训练") return # 选择保存路径 save_path = filedialog.asksaveasfilename( defaultextension=".xlsx", filetypes=[("Excel文件", "*.xlsx"), ("所有文件", "*.*")], title="保存结果" ) if not save_path: return try: # 1. 创建预测结果DataFrame result_df = pd.DataFrame({ '时间': self.test_time, '实际水位': self.actual_values.flatten(), '预测水位': self.predictions.flatten() }) # 2. 创建评估指标DataFrame metrics_df = pd.DataFrame([self.evaluation_metrics]) # 3. 创建训练历史DataFrame history_data = { '轮次': list(range(1, len(self.train_history['train_data_loss']) + 1)), '训练数据损失': self.train_history['train_data_loss'], '物理损失': self.train_history['physics_loss'], '验证数据损失': self.train_history['valid_data_loss'] } # 添加训练集指标 for metric in ['MSE', 'RMSE', 'MAE', 'MAPE', 'R2']: history_data[f'训练集_{metric}'] = [item[metric] for item in self.train_history['train_metrics']] # 添加验证集指标 for metric in ['MSE', 'RMSE', 'MAE', 'MAPE', 'R2']: history_data[f'验证集_{metric}'] = [item[metric] for item in self.train_history['valid_metrics']] history_df = pd.DataFrame(history_data) # 保存到Excel with pd.ExcelWriter(save_path) as writer: result_df.to_excel(writer, sheet_name='预测结果', index=False) metrics_df.to_excel(writer, sheet_name='评估指标', index=False) history_df.to_excel(writer, sheet_name='训练历史', index=False) # 保存图表 chart_path = os.path.splitext(save_path)[0] + "_chart.png" self.fig.savefig(chart_path, dpi=300) # 保存损失曲线图 loss_path = os.path.splitext(save_path)[0] + "_loss.png" self.loss_fig.savefig(loss_path, dpi=300) self.status_var.set(f"结果已保存至: {os.path.basename(save_path)}") messagebox.showinfo("保存成功", f"预测结果和图表已保存至:\n" f"主文件: {save_path}\n" f"预测图表: {chart_path}\n" f"损失曲线: {loss_path}") except Exception as e: messagebox.showerror("保存错误", f"保存结果失败:\n{str(e)}") def reset(self): # 重置归一化器 self.scaler_t = MinMaxScaler(feature_range=(0, 1)) self.scaler_h = MinMaxScaler(feature_range=(0, 1)) self.scaler_dt = MinMaxScaler(feature_range=(0, 1)) """重置程序状态""" self.train_df = None self.test_df = None self.model = None self.train_file_var.set("") self.test_file_var.set("") # 清除训练历史 if hasattr(self, 'train_history'): del self.train_history # 清除图表 if hasattr(self, 'ax'): self.ax.clear() if hasattr(self, 'loss_ax'): self.loss_ax.clear() # 重绘画布 if hasattr(self, 'canvas'): self.canvas.draw() if hasattr(self, 'loss_canvas'): self.loss_canvas.draw() # 清除状态 self.status_var.set("已重置,请选择新数据") # 清除预测结果 if hasattr(self, 'predictions'): del self.predictions # 清除指标文本 if hasattr(self, 'metrics_var'): self.metrics_var.set("") messagebox.showinfo("重置", "程序已重置,可以开始新的分析") if __name__ == "__main__": root = tk.Tk() app = DamSeepageModel(root) root.mainloop() 检查错误并改正

file_path = r'E:\博二资料\作业\data_21_21.txt' df = pd.read_csv(file_path, sep='\s+', header=0) # 选择小时为12的数据 df_12 = df[df['hour'] == 12].copy() # 选择前3年的数据 df_12 = df_12[df_12['year'] < 1997] # 假设数据从1994年开始,选择1994, 1995, 1996年 # 选择前3天的数据 df_12 = df_12[df_12['day'] <= 3] # 假设 'day' 列包含天数 df_12 = df_12[df_12['latitude'] < 54.5] df_12 = df_12[df_12['longitude'] < 81.5] # 定义函数以获取网格索引 def get_grid_index(value, start, step, grid_size): index = int(round((value - start) / step)) return max(1, min(index + 1, grid_size)) df_12['k'] = df_12['latitude'].apply(lambda x: get_grid_index(x, 54, 0.1, 5)) df_12['l'] = df_12['longitude'].apply(lambda x: get_grid_index(x, 81, 0.1, 5)) df_12['n'] = df_12['year'].apply(lambda x: get_grid_index(x, 1994, 1, 3)) # 输出k, l, n列及其对应的温度数据 print("计算后的k, l, n列及对应的温度数据:") result = df_12[['k', 'l', 'n', 'air_temperature']] result = result.applymap(lambda x: np.floor(x * 1e7) / 1e7) print(result) # 计算 Год-Среднемесячная температура A μ_n = df_12.groupby(['year', 'k', 'l'])['air_temperature'].mean().reset_index() μ_n = μ_n.applymap(lambda x: np.floor(x * 1e7) / 1e7) # 使用 np.floor 来确保不进行四舍五入 # 计算均值并保留到小数点后7位,不进行四舍五入 μ = μ_n.groupby(['k', 'l'])['air_temperature'].mean().unstack() μ = μ.applymap(lambda x: np.floor(x * 1e7) / 1e7) # 使用 np.floor 来确保不进行四舍五入 μ.index = μ.index - 1 # 将行索引减去 1 μ.columns = μ.columns - 1 # 将列索引减去 1 print("μ:") print(μ) # 初始化一个数组来存储ξ[i, n, k, l]的值 xi = np.zeros((3, 3, 5, 5)) for index, row in df_12.iterrows(): day = int(row['day']) - 1 n = int(row['n']) - 1 k = int(row['k']) - 1 l = int(row['l']) - 1 if 0 <= day < 3 and 0 <= n < 3 and 0 <= k < 5 and 0 <= l < 5: xi[day, n, k, l] = row['air_temperature'] print("xi:") print(xi) correlation_matrix = np.zeros((5, 5)) center_k, center_l = 2, 2 求每个坐标点到中心点的相关系数

帮我运行下面这段代码,并展示给我看输出的结果(图片)import pandas as pd import numpy as np import matplotlib.pyplot as plt from scipy.interpolate import lagrange from statsmodels.tsa.arima.model import ARIMA from sklearn.ensemble import RandomForestRegressor # 生成模拟数据(含趋势和季节性的时间序列) np.random.seed(42) x = np.linspace(0, 10, 100) original = np.sin(x) + 0.5*x + np.random.normal(0, 0.3, 100) # 创建缺失数据集 missing_indices = np.sort(np.random.choice(100, 20, replace=False)) data_missing = original.copy() data_missing[missing_indices] = np.nan # 拉格朗日插值法 def lagrange_interp(s): for idx in missing_indices: window = s[max(0,idx-3):min(100,idx+4)] valid = window[~np.isnan(window)] if len(valid) >= 2: poly = lagrange(np.where(~np.isnan(window))[0], valid) s[idx] = poly(np.where(np.isnan(window))[0][0]) return s lagrange_filled = lagrange_interp(data_missing.copy()) # ARIMA插值 arima_filled = data_missing.copy() model = ARIMA(arima_filled[~np.isnan(arima_filled)], order=(2,1,1)) model_fit = model.fit() predictions = model_fit.predict(start=missing_indices.min(), end=missing_indices.max()) arima_filled[missing_indices] = predictions[:len(missing_indices)] # 随机森林插值 rf_filled = data_missing.copy() for idx in missing_indices: X = pd.DataFrame({ 'lag1': np.roll(rf_filled, 1), 'lag2': np.roll(rf_filled, 2), 'lead1': np.roll(rf_filled, -1) }) y = rf_filled train_mask = ~np.isnan(X).any(axis=1) & ~np.isnan(y) model = RandomForestRegressor(n_estimators=100) model.fit(X[train_mask], y[train_mask]) if not np.isnan(X[idx]).any(): rf_filled[idx] = model.predict([X[idx]])[0] # 绘制对比图 plt.figure(figsize=(12, 6), dpi=100) plt.plot(original, 'k--', label='Original Values', alpha=0.8, linewidth=2) plt.plot(missing_indices, original[missing_indices], 'ro', label='Deleted Points', markersize=8, zorder=10) plt.plot(lagrange_filled, 'g-', label='Lagrange', alpha=0.7, linewidth=1.5) plt.plot(arima_filled, 'b-', label='ARIMA', alpha=0.7, linewidth=1.5) plt.plot(rf_filled, 'm-', label='Random Forest', alpha=0.7, linewidth=1.5) plt.title('Interpolation Methods Comparison\n(Simulated Elevation Displacement)', fontsize=14) plt.xlabel('Data Index', fontsize=12) plt.ylabel('Displacement Value', fontsize=12) plt.legend(loc='upper left', fontsize=10) plt.grid(True, alpha=0.3) plt.tight_layout() plt.show()

最新推荐

recommend-type

【电子设计竞赛】2018年电子设计大赛A题失真度分析仪:从理论到代码实现全解析

内容概要:本文深入解析了2018年电子设计大赛A题——失真度分析仪的设计与实现。文章首先介绍了题目的背景与要求,包括谐波计算、数据显示和无线传输三个核心任务。接着详细阐述了解题思路,涵盖信号采集(ADC)、FFT分析、失真度计算、显示与无线传输等方面的技术要点。硬件设计部分重点讲解了信号调理电路、ADC电路、显示电路和无线传输电路的具体实现方法。最后提供了软件代码实现,包括ADC采样、FFT计算、失真度计算、数据显示与无线传输的代码示例。; 适合人群:对电子设计感兴趣的初学者、电子工程专业的学生及有一定基础的电子爱好者。; 使用场景及目标:①帮助读者理解失真度分析仪的工作原理和技术实现;②为准备参加类似电子设计竞赛的人提供参考;③通过实例代码加深对电子电路、信号处理和编程的理解。; 其他说明:本文不仅涵盖了理论知识,还提供了详细的代码实现,有助于读者在实践中学习和掌握相关技能。同时,文中提到的一些优化方向也为进一步探索电子设计提供了思路。
recommend-type

Python打造的Slaee管理系统升级版发布

由于提供的文件信息中,文件名《基于python的slaee管理系统 (15).zip》与描述《基于python的slaee管理系统 (15).zip》相同,并且给出的压缩包文件名称列表中只有一个文件《基于python的slaee管理系统 (14).zip》,该信息表明我们正在讨论两个不同版本的Python系统管理软件的压缩包。以下知识点将根据这些信息详细展开: 知识点一:Python编程语言基础 Python是一种高级编程语言,以其简洁的语法和强大的库支持而闻名。它是解释型语言,具有动态类型系统和垃圾回收功能,适用于多种编程范式,包括面向对象、命令式、函数式和过程式编程。Python广泛应用于系统管理、网络服务器、开发脚本、科学计算、数据挖掘和人工智能等领域。 知识点二:系统管理相关知识 系统管理指的是对计算机系统进行配置、监控和维护的过程,包括硬件资源、软件资源和数据资源的管理。在Python中,系统管理通常涉及操作系统级别的任务,如进程管理、文件系统管理、网络配置、系统日志监控等。Python的系统管理库(例如psutil、fabric、paramiko等)提供了丰富的API来简化这些任务。 知识点三:项目版本控制 从文件名《基于python的slaee管理系统 (14).zip》和《基于python的slaee管理系统 (15).zip》可以看出,这是一个项目在不同版本之间的迭代。版本控制是一种记录一个或多个文件随时间变化的方式,它允许用户可以回到特定版本。在软件开发中,版本控制非常重要,它有助于团队协作、代码合并、分支管理和错误跟踪。常见的版本控制系统包括Git、Subversion (SVN)、Mercurial等。 知识点四:打包与部署 提到“压缩包子文件”,这通常意味着文件已经被压缩打包成一个ZIP文件。在软件开发中,打包是为了便于文件传输、存档保存和分发。在Python项目中,打包也是部署过程的一部分。一个Python项目通常需要包含源代码、依赖关系、配置文件和安装脚本等。打包成ZIP文件后,可以通过各种方式部署到服务器上运行,如使用Fabric或Ansible等自动化部署工具。 知识点五:项目命名及版本命名规则 文件命名中的“基于python的slaee管理系统”表明这是一个与Python语言相关的系统管理项目。而数字“15”和“14”则代表着项目的版本号,这表明项目在持续发展,不同的数字代表了项目在不同时期的迭代。版本号的命名规则通常遵循语义化版本控制(SemVer),这种版本控制系统以 MAJOR.MINOR.PATCH 的形式表示版本号。 知识点六:文件压缩与解压缩技术 ZIP是一种常用的文件压缩格式,它通过减少文件大小来提高存储效率和传输速度。ZIP文件通常是无损压缩,意味着文件在压缩和解压缩的过程中不会丢失信息。Python标准库提供了zipfile模块,允许用户在Python程序中创建ZIP文件、读取ZIP文件内容、提取ZIP文件中的文件等操作。用户可以使用各种图形界面工具(如WinRAR、7-Zip)或命令行工具来处理ZIP文件。 总结以上内容,从文件信息中我们可以得知该内容涉及了Python编程语言、系统管理知识、版本控制、打包与部署、命名规则以及文件压缩技术等多方面的知识点。这些知识点对于理解和应用Python进行系统管理软件开发以及软件的版本迭代、打包与部署均具有重要意义。
recommend-type

深入解析PCB走线传输延时:关键因素与实用公式

# 1. PCB走线传输延时基础 在电子工程领域,尤其是在印刷电路板(PCB)设计中,走线传输延时是一个不可忽视的重要概念。传输延时主要指的是电信号通过PCB上的导电路径从源头传送到目的地所耗费的时间。本章将介绍PCB走线传输延时的基础知识点,为后续章节的深入探讨打下坚实的基础。 首先,了解什么是传输延时是至关重要的。简单来说,它反
recommend-type

gpio很弱是什么意思

<think>嗯,用户的问题是关于GPIO信号弱的概念及其影响。用户似乎对硬件或嵌入式系统有一定了解,但需要更深入的解释。从用户提到的"信号弱"来看,ta可能遇到了实际电路设计中的驱动能力问题,或者在学习GPIO原理时遇到了术语困惑。 用户引用了四篇资料,其中提到GPIO的模拟输入输出模式、施密特触发器的作用、上拉下拉电阻的配置,以及信号线串联电阻的作用。这些内容都与GPIO的驱动能力和信号质量相关。特别是引用[4]中提到的"信号线串联小电阻"和"低频电路不考虑反射",暗示用户可能正在处理实际电路中的信号完整性问题。 用户真正想知道的可能是:为什么我的GPIO输出无法正确驱动某个设备?或者
recommend-type

Python打造的Slaee管理系统升级版发布

标题中的“基于python的slaee管理系统”表明这是一个使用Python编程语言开发的系统。Python是一种广泛使用的高级编程语言,以其易读性和简洁的语法而闻名。SLAEE管理系统可能是指一个特定类型的管理软件,但由于没有给出缩写的完整解释,我们可以假设SLAEE可能是某机构或系统名称的缩写。 从标题和描述来看,存在一处笔误:“基于python的slaee管理系统 (19).zip”和“基于python的slaee管理系统 (18).zip”所指的似乎是同一软件系统,只是版本号不同。根据文件名称列表中的两个文件名,可以推断系统至少有两个版本,一个是版本18,一个是版本19。通常情况下,版本号的增加表示软件进行了更新或改进。 接下来,根据这些信息,我们可以阐述一些相关的知识点: 1. Python编程基础:Python是一种解释型、面向对象、高级编程语言。Python支持多种编程范式,包括过程式、面向对象和函数式编程。Python由于其简洁和易于学习的特性,被广泛应用于网络开发、数据分析、人工智能、机器学习和科学计算等领域。 2. 文件压缩与打包:文件压缩是将文件的大小减小以节省存储空间或网络传输时间的技术。常见的文件压缩格式包括ZIP、RAR、7Z等。文件打包通常指的是将多个文件或文件夹压缩成一个单独的文件。这在数据备份、软件分发和档案管理中非常常见。 3. 版本控制:在软件开发中,“版本”通常指软件的特定状态,版本号则用来标识这些状态。版本控制是一种记录文件、目录或集合随着时间变化的方式,以便将来可以检索特定版本。对于软件项目来说,版本控制是至关重要的,它不仅允许开发者追踪和管理代码的变化,而且还能帮助团队协作,解决冲突,并回滚到旧版本。 4. 软件管理系统的开发:一个软件管理系统可能是针对特定业务领域而设计的,它可能包括用户界面、数据库管理、业务逻辑处理、报告生成和其他许多功能。软件管理系统的开发通常涉及需求分析、系统设计、编程、测试和维护等多个阶段。 5. Python在软件开发中的应用:Python因为具有丰富的库和框架,被广泛用于开发各种类型的软件。例如,Django和Flask是用于Web开发的流行Python框架;而对于数据分析和数据科学任务,Pandas、NumPy和Matplotlib等库提供了强大的数据处理和可视化工具;对于机器学习和人工智能,TensorFlow、PyTorch等库使得复杂算法的实现变得更为简单。 6. 系统更新与维护:随着软件的使用和发展,需求可能会变化,新的问题可能会出现,所以软件系统需要定期进行更新和维护。软件更新可能包括修复已知问题、改进现有功能、增加新功能或优化性能。开发者需要评估是否需要为修复安全漏洞或提高系统性能而更新系统,以及更新之后对用户体验的影响。 由于文件名中只提到了“基于python的slaee管理系统”,没有提供该系统具体功能的详细描述,我们无法提供更加具体的技术知识点。如果需要分析系统的工作原理或具体的技术实现细节,还需要更多的信息。
recommend-type

【Keil-ARM编程艺术】:如何编写可维护且高效的代码

# 摘要 本文旨在为读者提供ARM微控制器编程和Keil开发环境的全面指南。第一章概览了ARM微控制
recommend-type

应用层协议概述

<think>首先,用户的问题是:“我想了解应用层协议的基本概念和常见类型 请问应用层协议简介 常见应用层协议概述”。这翻译成:“I want to understand the basic concepts of application layer protocols and common types. Please introduce application layer protocols and overview common application layer protocols.” 根据系统级指令: - 所有行内数学表达式必须使用$...$格式。 - 独立公式使用$$...$$格式
recommend-type

Delphi 12 TeeChartVCLFMX控件包下载及功能介绍

标题中提到的"Delphi 12 控件之TeeChartVCLFMX-2024.40.rar"指的是Delphi 12版本中使用的TeeChartVCLFMX图表控件的特定版本(2024.40版本)。Delphi是由Embarcadero Technologies开发的一款流行的集成开发环境(IDE),专门用于使用Object Pascal和C++语言开发软件应用程序。该标题强调了Delphi 12环境下TeeChartVCLFMX控件的使用,这表明Delphi的图形用户界面(GUI)组件库中包含了一个专门用于创建复杂图表和图形的组件。 从描述中仅能得到的关于文件的名称是"TeeChartVCLFMX-2024.40.rar",这意味着文件是一个压缩包,具体包含了一个TeeChartVCLFMX的图表控件,版本号为2024.40。它可能包含了在Delphi 12版本中使用该图表控件所需的所有文件,包括库文件、二进制文件、文档等。 标签"delphi 控件"简单而直接地指出了该文件属于Delphi编程环境中的一个控件类别,表明了目标用户是Delphi开发者,他们通常使用这些控件来丰富他们的应用程序界面或增强应用程序的功能。 文件名称列表提供了关于TeeChartVCLFMX压缩包内包含的具体文件及其用途的详细信息: 1. TeeChartVCLFMX-2024.40.exe:这个文件很可能是一个安装程序或可执行文件,用于安装或运行TeeChartVCLFMX图表控件。 2. Keygen.exe:这个文件名表明它可能是一个密钥生成器(Key Generator),用于生成软件的注册码或激活码,使得控件可以脱离试用限制或进行合法授权。 3. Delphi29Binaries-2024.40-windows.pak:这个文件名暗示它包含了特定于Windows平台的Delphi 29(可能指的是Delphi 12的内部版本号)的二进制文件。pak文件是压缩包的一种格式,可能包含了运行TeeChartVCLFMX图表控件所需的库文件、DLLs、组件文件等。 4. TeeChartVCLFMX-2024.40 - D12.pdf:这是一个PDF格式的文件,很可能是用户手册或帮助文档,提供了对TeeChartVCLFMX图表控件版本2024.40在Delphi 12中的使用说明,安装指南,功能介绍或示例代码等。 综合以上信息,可以推断TeeChartVCLFMX-2024.40压缩包是为Delphi 12的开发人员提供了一个专业的图表解决方案,使得用户能够将图表功能集成到他们用Delphi开发的应用程序中。TeeChartVCLFMX可能包含各种图表类型(如条形图、折线图、饼图等),以及丰富的定制选项,如颜色、样式、图例、数据绑定、交互式功能等。开发者可以利用TeeChartVCLFMX提供的图表控件在应用程序中创建强大的数据可视化功能,这对数据分析、科学计算、商业智能、财务报告等领域特别有用。 开发者需要使用Delphi 12 IDE配合提供的组件安装程序或执行文件安装和配置TeeChartVCLFMX控件,通过阅读PDF文档来了解如何使用控件,并在需要时使用Keygen.exe激活控件。Delphi29Binaries-2024.40-windows.pak文件可能包含了控件运行所需的核心二进制组件。通过合理使用这些资源,开发者可以将TeeChartVCLFMX控件集成到自己的项目中,快速构建功能丰富的图表和数据可视化组件。
recommend-type

【Keil-ARM性能优化全解】:代码监控与优化工具的实用技巧

# 摘要 本文对基于Keil-ARM平台的性能优化进行了系统性探讨,涵盖了性能监控工具的介绍、代码优化策略、系统资源管理以及综合案例分析。首先,介绍了Keil-ARM的性能监控工具及其在性能分析中的重要性,包括工具的配置和性能数据解读方法。其次,文中讨论了代码层面的优化,包括高效循环和条件语句的编写,函数内联与尾调用优化,以及编译器优化选项的应用。然后,深入分析了ARM系统资源分配、
recommend-type

电子邮箱协议

<think>首先,用户的问题是:“我想了解电子邮箱相关的协议 请问电子邮件协议有哪些”。这直接询问电子邮件协议的种类。 从系统级指令中,我需要: - 使用中文回答。 - 结构清晰,帮助用户逐步解决问题。 - 保证回答真实可靠。 - 在行内数学表达式使用 $...$,但这里可能没有数学内容。 - 独立公式使用 $$...$$,但可能不需要。 - 生成相关问题。 - 在引用的段落末尾添加引用标识如 [^1]。 参考提供的引用: - 引用[1]: 讨论了SMTP、POP3和IMAP。 - 引用[2]: 提到电子邮件系统采用客户机/服务器模式,涉及接收和发送邮件。 - 引用[3]: 详细描述了P