活动介绍

return predictions[:min(top_n, len(predictions))]

时间: 2025-07-19 22:26:33 浏览: 4
<think>首先,用户的问题是关于如何获取预测结果的前N项。用户提到了一个代码片段:`return predictions[:min(top_n, len(predictions))]`。这看起来像是Python代码,用于从预测结果列表中获取前N个元素。 用户引用了几个引用,但这些引用似乎是从之前的上下文来的,主要是关于模型预测、车辆检测等。用户说:“上述引用仅供参考,并不是用户真正的需求”,所以我不应该过度依赖这些引用。用户真正的需求是“获取预测结果的前N项”。 用户的上一次问题是什么?系统提示说:“请尽可能多的参考用户上一次的问题和你的回答。”但在当前消息中,没有直接提到上一次问题。用户消息以“我想获取预测结果的前N项”开始,然后是关于代码的说明。 回顾用户消息:"我想获取预测结果的前N项 请问获取预测结果的前N项,return predictions[:min(top_n, len(predictions))]" 所以,用户的核心问题是:如何实现获取预测结果的前N项,并提供了一个代码示例。 我的回答需要遵循系统级指令: 1. 所有行内数学表达式使用$...$格式。 2. 独立公式使用$$...$$格式并单独成段。 3. LaTeX语法正确。 4. 使用中文回答。 5. 生成相关问题。 6. 回答中引用的段落末尾自然地添加引用标识。 另外,在用户可见层:回答结构清晰,帮助用户逐步解决问题,保证回答真实可靠。 结构清晰的回答:我应该一步步解释如何实现这个功能。 首先,理解用户的需求:用户想在模型预测后,只获取前N个预测结果,其中N是用户指定的值。 用户提供的代码片段:`return predictions[:min(top_n, len(predictions))]`。这看起来很合理,它使用了Python的切片语法,其中`predictions`是一个列表或数组,`top_n`是用户想要的项数,`min(top_n, len(predictions))`确保不会超出列表长度。 在引用的代码中,例如在引用[2]中,有类似代码:`filtered_boxes = boxes[scores > threshold]`,这是在过滤基于分数的预测。用户可能想在过滤后或直接获取前N个。 用户说:“我想获取预测结果的前N项”,所以可能是在过滤后获取前N个最高分数的项。 在目标检测中,预测结果通常包括多个检测框、分数和标签。我们需要根据分数排序,然后取前N个。 在引用[2]的代码中: ```python boxes = predictions[0]['boxes'].numpy() scores = predictions[0]['scores'].numpy() labels = predictions[0]['labels'].numpy() # 筛选置信度较高的检测结果 threshold = 0.5 filtered_boxes = boxes[scores > threshold] filtered_labels = labels[scores > threshold] ``` 这里,它基于分数阈值过滤了结果,但没有排序。用户可能想先过滤,然后按分数排序,取前N个。 在引用[3]中,预测结果是一个数组:`predictions = np.array([ ... ])`,每行有[x1, y1, x2, y2, score, class_id]等。 所以,典型地,我们需要: 1. 获取预测结果。 2. 可能过滤基于分数阈值。 3. 按分数降序排序(因为分数越高,置信度越高)。 4. 取前N个。 用户提供的代码`predictions[:min(top_n, len(predictions))]`假设`predictions`已经排序了。所以,我需要确保在取前N个之前,列表是排序的。 在回答中,我应该: 1. 解释需求。 2. 提供完整的代码示例。 3. 确保代码正确。 4. 使用中文。 5. 生成相关问题。 6. 添加引用标识(如果引用内容)。 由于用户提供了引用,我应该自然地引用它们,但用户说“上述引用仅供参考”,所以我可以参考它们,但不强制。 在回答中,我需要添加引用标识到引用的段落末尾。 现在,构建回答结构: - 开头:确认用户需求。 - 逐步解释如何实现获取前N项。 - 提供代码示例。 - 解释代码。 - 生成相关问题。 最后,添加相关问题。 系统提示有示例:"### 快速排序" 部分,然后是相关问题。 所以,我可以类似地结构。 标题:获取预测结果的前N项 然后内容。 在代码部分,使用代码块。 用户消息中,有代码示例,我应该基于那个。 用户说:“return predictions[:min(top_n, len(predictions))]”,这可能是函数的一部分。 在完整上下文中,预测结果可能是一个列表,每个元素是一个检测结果。 在引用[2]中,预测结果存储在字典中:boxes, scores, labels。 所以,我需要将这些组合成一个列表,然后排序。 常见做法是:创建一个元组列表,每个元组包含分数、框、标签等,然后按分数排序。 然后取前N个。 代码步骤: 1. 提取预测结果:假设我们有predictions,如引用[2]。 2. 过滤:可选,基于分数阈值。 3. 排序:按分数降序排序。 4. 切片:取前N个。 5. 返回。 用户提供的代码片段直接切片,但假设已排序。 所以,在回答中,我应该包括排序步骤。 完整代码示例。 使用中文。 添加引用:例如,在提到模型预测时,引用[1]或[2]。 但用户说引用仅供参考,所以我可以轻描淡写。 现在,生成相关问题:如示例中,有3个相关问题。 例如: 1. 如何设置合适的分数阈值? 2. 如何处理多个类别的预测? 3. 如何优化排序性能? 等等。 确保数学表达式正确:如果有数学,如概率等,使用$...$。 在这个上下文中,可能没有太多数学,但分数阈值等可以用。 例如,分数阈值$\theta$。 但可能不必要。 现在,起草回答。</think>### 获取预测结果的前N项 在模型预测任务中,如目标检测或分类,预测结果通常包含多个检测框(bounding boxes)、置信度分数(scores)和类别标签(labels)。用户的需求是获取这些结果的前N项(例如,前5个最高置信度的预测),这常用于提高处理效率或只关注最相关的预测。下面我将一步步解释实现方法,并提供完整的代码示例。核心思路是:先对预测结果按置信度分数降序排序(确保最高分数的项在前),然后使用切片操作获取前N个元素。如果预测结果少于N项,则返回所有可用项(使用`min(top_n, len(predictions))`处理边界情况)。 #### 步骤说明 1. **提取预测结果**:从模型输出中解析出检测框、分数和标签。模型预测通常返回一个字典或数组,如引用[2]所示。 2. **可选过滤**:根据置信度阈值(如$\theta = 0.5$)过滤低分数预测,减少噪声。这步是可选的,取决于具体需求。 3. **排序**:将过滤后的结果按分数降序排序,确保最高分数的项在列表开头。 4. **获取前N项**:使用Python切片 `[:N]` 获取前N个元素,其中N由用户指定(`top_n`)。`min(top_n, len(predictions))` 确保索引不越界。 5. **返回结果**:将前N项组织成结构化数据(如列表或元组)返回。 以下代码基于PyTorch实现,假设输入是预处理后的图像,模型输出类似于引用[2]中的格式。代码中,我添加了详细注释以帮助理解。 ```python import numpy as np def get_top_n_predictions(predictions, top_n=5, score_threshold=0.5): """ 获取预测结果的前N项(按置信度分数降序排序) :param predictions: 模型预测结果,通常是一个字典(如Faster R-CNN输出) :param top_n: 用户指定的前N项数量,默认为5 :param score_threshold: 置信度分数阈值,用于过滤低置信度预测(可选) :return: 前N个预测项的字典,包括'boxes', 'scores', 'labels' """ # 步骤1: 提取预测结果(假设predictions是模型输出,如引用[2]) boxes = predictions[0]['boxes'].detach().numpy() # 检测框坐标 scores = predictions[0]['scores'].detach().numpy() # 置信度分数 labels = predictions[0]['labels'].detach().numpy() # 类别标签 # 步骤2: 可选过滤 - 仅保留分数高于阈值的预测(如引用[2]的过滤步骤) mask = scores > score_threshold filtered_boxes = boxes[mask] filtered_scores = scores[mask] filtered_labels = labels[mask] # 步骤3: 按分数降序排序(使用argsort获取索引,然后重新排序所有数组) sorted_indices = np.argsort(filtered_scores)[::-1] # 降序排序索引 sorted_boxes = filtered_boxes[sorted_indices] sorted_scores = filtered_scores[sorted_indices] sorted_labels = filtered_labels[sorted_indices] # 步骤4: 获取前N项,使用min确保不超出列表长度 n = min(top_n, len(sorted_scores)) # 处理边界情况 top_boxes = sorted_boxes[:n] top_scores = sorted_scores[:n] top_labels = sorted_labels[:n] # 步骤5: 返回结构化的前N项结果 return { 'boxes': top_boxes, 'scores': top_scores, 'labels': top_labels } # 示例用法(假设model和image已定义,如引用[1]的预测流程) # 加载模型和进行预测(参考引用[1]的代码) model = load_model("path/to/model.pth") predictions = predict(model, preprocessed_image) # 假设predict返回原始预测字典 # 调用函数获取前3项 top_n_results = get_top_n_predictions(predictions, top_n=3, score_threshold=0.5) print(f"前{top_n}个预测结果: {top_n_results}") ``` #### 代码解释 - **提取和过滤**:从模型输出中提取boxes、scores和labels,然后基于阈值过滤(如分数$\geq \theta$)。这步参考了引用[2]的后处理流程,能有效减少误检[^2]。 - **排序**:使用`np.argsort(scores)[::-1]`实现降序排序,确保最高分数的项优先。这是关键步骤,因为模型输出可能未排序。 - **切片操作**:`[:n]` 获取前N项,其中`n = min(top_n, len(sorted_scores))` 确保在预测项不足N时安全处理(例如,如果只有2项预测,但top_n=5,则返回所有2项)。 - **返回值**:返回一个字典,便于后续使用(如可视化或分析)。输出格式类似于引用[3]的预测数组,但已结构化[^3]。 #### 注意事项 - **性能**:排序操作的时间复杂度为$O(m \log m)$(m为过滤后项数),在实时系统中可优化为部分排序(如`np.argpartition`)。 - **适用场景**:此方法适用于目标检测模型(如YOLO或Faster R-CNN),如车辆检测中快速获取最显著目标[^4]。 - **阈值设置**:分数阈值$\theta$的选择影响结果质量;太低可能引入噪声,太高可能遗漏有效预测。建议通过验证集调整。 如果您有更多具体需求(如处理多类别或优化速度),请提供细节,我可以进一步调整代码。
阅读全文

相关推荐

这个是我用来对大模型微调以实现对商品的属性进行预测的代码,注意这个数据集的分布情况比较特别,两类重量数值90%的样本均分在0-100这个区间,然后体积参数70%的样本分布在5000-500000这个区间,在进行训练时,一开始损失上万,但是马上损失降到1以内,结合代码进行分析,重点分析损失函数部分import os import json import json5 import math import logging import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from torch.optim import AdamW from torch.utils.data import Dataset, DataLoader from typing import List, Dict, Any, Optional, Tuple from pathlib import Path from PIL import Image from tqdm import tqdm from transformers import ( AutoProcessor, Qwen2_5_VLForConditionalGeneration, Trainer, TrainingArguments, get_scheduler, TrainerCallback ) import swanlab from swift import Swift logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()] ) logger = logging.getLogger(__name__) class MultiModalDataProcessor: def __init__( self, processor_path: str, max_pixels: int = 301056 // 2, max_length: int = 4096, ): self.max_pixels = max_pixels self.max_length = max_length self.processor = AutoProcessor.from_pretrained(processor_path) # 添加归一化参数(根据实际数据分布调整) self.weight_scale = 1000.0 # 重量缩放因子 self.size_scale = 10000.0 # 尺寸缩放因子 def load_data(self, file_paths: List[str]) -> List[Dict[str, Any]]: data = [] for path in file_paths: try: with open(path, "r", encoding="utf-8") as fp: for line in fp: try: item = json.loads(line.strip()) if self._validate_sample(item): data.append(item) except (json.JSONDecodeError, KeyError) as e: logger.warning(f"Error processing line in {path}: {e}") except FileNotFoundError: logger.warning(f"File not found: {path}") logger.info(f"Loaded {len(data)} samples") return data def _validate_sample(self, sample: Dict[str, Any]) -> bool: required = ["messages", "images"] if not all(key in sample for key in required): return False try: if len(sample["messages"]) < 2: return False if not isinstance(sample["messages"][0]["content"], str): return False json5.loads(sample["messages"][1]["content"]) return True except (KeyError, json.JSONDecodeError): return False @staticmethod def rescale_image(img: Image.Image, max_pixels: int) -> Image.Image: if img is None: return Image.new('RGB', (224, 224), (0, 0, 0)) if max_pixels > 0 and img.width * img.height > max_pixels: ratio = img.width / img.height height_scaled = math.sqrt(max_pixels / ratio) width_scaled = height_scaled * ratio img = img.resize((int(width_scaled), int(height_scaled)), Image.BILINEAR) # Pad to fixed size if needed if img.size != (224, 224): # or your desired fixed size img = img.resize((224, 224), Image.BILINEAR) return img def _load_image(self, image_path: str) -> Optional[Image.Image]: try: if not os.path.exists(image_path): return None return Image.open(image_path).convert("RGB") except (IOError, OSError): return None def preprocess_sample( self, sample: Dict[str, Any], require_image: bool = False ) -> Optional[Dict[str, torch.Tensor]]: try: text_content = sample["messages"][0]['content'] image_path = sample.get("images", "") image = self._load_image(image_path) if image_path else None content = [{"type": "text", "text": text_content}] content.insert(0, {"type": "image"}) messages = [{"role": "user", "content": content}] text = self.processor.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) inputs = self.processor( text=text, images=self.rescale_image(image, self.max_pixels), return_tensors="pt", padding="max_length", truncation=True, max_length=self.max_length, add_special_tokens=True, ) target_data = json5.loads(sample["messages"][1]["content"]) res = { "input_ids": inputs["input_ids"].squeeze(0), "attention_mask": inputs["attention_mask"].squeeze(0), # 归一化目标值 "product_weight": torch.tensor(target_data["product_weight"] / self.weight_scale, dtype=torch.float32), "package_length": torch.tensor(target_data["package_length"] / self.size_scale, dtype=torch.float32), "package_width": torch.tensor(target_data["package_width"] / self.size_scale, dtype=torch.float32), "package_height": torch.tensor(target_data["package_height"] / self.size_scale, dtype=torch.float32), "package_weight": torch.tensor(target_data["package_weight"] / self.weight_scale, dtype=torch.float32), } res["pixel_values"] = inputs["pixel_values"].squeeze(0) res["image_grid_thw"] = inputs["image_grid_thw"].squeeze(0) return res except Exception as e: logger.warning(f"Error processing sample: {e}") return None class MultiModalDataset(Dataset): def __init__( self, data: List[Dict[str, Any]], processor: MultiModalDataProcessor, require_image: bool = False ): self.data = data self.processor = processor self.require_image = require_image def __len__(self) -> int: return len(self.data) def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]: sample = self.data[idx] processed = self.processor.preprocess_sample(sample, self.require_image) if processed is None: return self.__getitem__(torch.randint(0, len(self), (1,)).item()) return processed class MultiOutputPredictionHead(nn.Module): def __init__(self, hidden_size: int, intermediate_size: int = 1024, dropout: float = 0.2): # 增加中间层大小 super().__init__() self.dtype = torch.bfloat16 # 分离重量和尺寸的特征提取 self.weight_feature = nn.Sequential( nn.Linear(hidden_size, intermediate_size, dtype=self.dtype), nn.GELU(), nn.Dropout(dropout), nn.LayerNorm(intermediate_size, dtype=self.dtype) ) self.size_feature = nn.Sequential( nn.Linear(hidden_size, intermediate_size, dtype=self.dtype), nn.GELU(), nn.Dropout(dropout), nn.LayerNorm(intermediate_size, dtype=self.dtype) ) # 任务特定头 self.weight_heads = nn.ModuleDict({ name: nn.Sequential( nn.Linear(intermediate_size, intermediate_size // 2, dtype=self.dtype), nn.GELU(), nn.Dropout(dropout), nn.Linear(intermediate_size // 2, 1, dtype=self.dtype), nn.Softplus() ) for name in ['product_weight', 'package_weight'] }) self.size_heads = nn.ModuleDict({ name: nn.Sequential( nn.Linear(intermediate_size, intermediate_size // 2, dtype=self.dtype), nn.GELU(), nn.Dropout(dropout), nn.Linear(intermediate_size // 2, 1, dtype=self.dtype), nn.Softplus() ) for name in ['package_length', 'package_width', 'package_height'] }) # 初始化权重 for module in self.modules(): if isinstance(module, nn.Linear): nn.init.kaiming_normal_(module.weight) if module.bias is not None: nn.init.zeros_(module.bias) def forward(self, hidden_states: torch.Tensor) -> Dict[str, torch.Tensor]: weight_features = self.weight_feature(hidden_states) size_features = self.size_feature(hidden_states) outputs = {} for name in ['product_weight', 'package_weight']: outputs[name] = self.weight_heads[name](weight_features).squeeze(-1) for name in ['package_length', 'package_width', 'package_height']: outputs[name] = self.size_heads[name](size_features).squeeze(-1) return outputs class WeightAdaptiveLoss(nn.Module): def __init__(self, weight_threshold=100.0, huber_delta=1.0, logscale_factor=0.5): super().__init__() self.weight_threshold = weight_threshold self.huber_delta = huber_delta self.logscale_factor = logscale_factor self.huber_loss = nn.HuberLoss(reduction='none', delta=self.huber_delta) def forward(self, predictions, targets): losses = {} for name in ['product_weight', 'package_weight']: pred = predictions[name] target = targets[name] low_weight_mask = target < self.weight_threshold high_weight_mask = ~low_weight_mask if torch.any(low_weight_mask): losses[f"{name}_low"] = self.huber_loss(pred[low_weight_mask], target[low_weight_mask]).mean() if torch.any(high_weight_mask): log_pred = torch.log1p(pred[high_weight_mask]) log_target = torch.log1p(target[high_weight_mask]) losses[f"{name}_high"] = F.mse_loss(log_pred, log_target) * self.logscale_factor return losses class DynamicHuberLoss(nn.Module): """动态调整delta的Huber损失,适应不同尺度""" def __init__(self, base_delta=1.0, scale_factor=0.01): super().__init__() self.base_delta = base_delta self.scale_factor = scale_factor def forward(self, input, target): # 根据目标值动态调整delta delta = self.base_delta + self.scale_factor * target.abs().mean() loss = 0.5 * (input - target)**2 * (torch.abs(input - target) <= delta).float() + delta * (torch.abs(input - target) - 0.5 * delta) * (torch.abs(input - target) > delta).float() return loss.mean() class PhysicsAwareLoss(nn.Module): def __init__(self, weight=1.0, size=1.0, constraint=0.1): super().__init__() self.weight_factor = weight self.size_factor = size self.constraint_factor = constraint def forward(self, predictions, targets): total_loss = 0 # 1. 重量损失 - 分层处理 for name in ['product_weight', 'package_weight']: # 小值样本 (0-50) small_mask = targets[name] < 50 if torch.any(small_mask): small_loss = F.mse_loss( predictions[name][small_mask], targets[name][small_mask] ) total_loss += self.weight_factor * small_loss # 中值样本 (50-100) medium_mask = (targets[name] >= 50) & (targets[name] < 100) if torch.any(medium_mask): medium_loss = F.huber_loss( predictions[name][medium_mask], targets[name][medium_mask], delta=5.0 ) total_loss += self.weight_factor * medium_loss # 大值样本 (100+) large_mask = targets[name] >= 100 if torch.any(large_mask): log_pred = torch.log1p(predictions[name][large_mask]) log_target = torch.log1p(targets[name][large_mask]) large_loss = F.mse_loss(log_pred, log_target) total_loss += self.weight_factor * large_loss * 2.0 # 增加权重 # 2. 尺寸损失 - 分层处理 for name in ['package_length', 'package_width', 'package_height']: # 小尺寸样本 (0-5000) small_mask = targets[name] < 5000 if torch.any(small_mask): small_loss = F.huber_loss( predictions[name][small_mask], targets[name][small_mask], delta=50.0 ) total_loss += self.size_factor * small_loss # 大尺寸样本 (5000+) large_mask = targets[name] >= 5000 if torch.any(large_mask): log_pred = torch.log1p(predictions[name][large_mask]) log_target = torch.log1p(targets[name][large_mask]) large_loss = F.mse_loss(log_pred, log_target) total_loss += self.size_factor * large_loss * 1.5 # 增加权重 # 3. 物理约束(降低权重) weight_constraint = F.relu(predictions['product_weight'] - predictions['package_weight']) total_loss += self.constraint_factor * torch.mean(weight_constraint) # 4. 尺寸合理性约束 size_constraint = F.relu(-predictions['package_length']) + \ F.relu(-predictions['package_width']) + \ F.relu(-predictions['package_height']) total_loss += self.constraint_factor * torch.mean(size_constraint) # 5. 体积一致性约束(显著降低权重) volume_pred = (predictions['package_length'] * predictions['package_width'] * predictions['package_height']) volume_target = (targets['package_length'] * targets['package_width'] * targets['package_height']) volume_loss = F.huber_loss(volume_pred, volume_target, delta=5000) relative_volume_loss = torch.mean( torch.abs(volume_pred - volume_target) / (volume_target + 1e-6) ) total_loss += 0.02 * (volume_loss + relative_volume_loss) return total_loss class MultiModalModel(nn.Module): def __init__(self, base_model_path: str): super().__init__() self.base_model = Qwen2_5_VLForConditionalGeneration.from_pretrained( base_model_path, torch_dtype=torch.bfloat16, low_cpu_mem_usage=True ) for param in self.base_model.parameters(): param.requires_grad = False hidden_size = self.base_model.config.hidden_size self.prediction_head = MultiOutputPredictionHead(hidden_size) self.layer_selector = nn.Sequential( nn.Linear(hidden_size, 128, dtype=torch.bfloat16), nn.GELU(), nn.Linear(128, 4, dtype=torch.bfloat16), nn.Softmax(dim=-1) ) self.base_model = Swift.prepare_model(self.base_model, config={}) # 添加梯度检查点支持 if hasattr(self.base_model, "gradient_checkpointing_enable"): self.gradient_checkpointing_enable = self.base_model.gradient_checkpointing_enable if hasattr(self.base_model, "gradient_checkpointing_disable"): self.gradient_checkpointing_disable = self.base_model.gradient_checkpointing_disable def forward(self, input_ids, attention_mask, pixel_values=None, image_grid_thw=None, **kwargs): outputs = self.base_model( input_ids=input_ids, attention_mask=attention_mask, pixel_values=pixel_values, image_grid_thw=image_grid_thw, output_hidden_states=True ) last_4_layers = outputs.hidden_states[-4:] layer_weights = self.layer_selector(outputs.hidden_states[-1][:, 0]) weighted_features = torch.zeros_like(last_4_layers[0]) for i in range(4): # 正确广播权重以匹配特征维度 weight_expanded = layer_weights[:, i].view(-1, 1, 1).expand_as(last_4_layers[i]) weighted_features += weight_expanded * last_4_layers[i] return self.prediction_head(weighted_features[:, -1, :]) class CustomTrainer(Trainer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.loss_fn = PhysicsAwareLoss(weight=1.0, size=0.8, constraint=0.5) def compute_loss(self, model, inputs, return_outputs=False, **kwargs): targets = { 'product_weight': inputs["product_weight"], 'package_length': inputs["package_length"], 'package_width': inputs["package_width"], 'package_height': inputs["package_height"], 'package_weight': inputs["package_weight"] } model_inputs = {k: v for k, v in inputs.items() if k not in targets} predictions = model(**model_inputs) total_loss = self.loss_fn(predictions, targets) if return_outputs: return (total_loss, predictions) return total_loss def compute_metrics(eval_pred): predictions, labels = eval_pred metrics = {} output_names = ['product_weight', 'package_length', 'package_width', 'package_height', 'package_weight'] for i, name in enumerate(output_names): pred = predictions[i].squeeze() label = labels[i].squeeze() abs_error = np.abs(pred - label) squared_error = (pred - label) ** 2 metrics.update({ f"{name}_mse": float(np.mean(squared_error)), f"{name}_mae": float(np.mean(abs_error)), f"{name}_r2": float(1 - np.sum(squared_error)/np.sum((label - np.mean(label)) ** 2)), f"{name}_max_error": float(np.max(abs_error)), f"{name}_median_error": float(np.median(abs_error)), f"{name}_std_error": float(np.std(abs_error)), f"{name}_correlation": float(np.corrcoef(pred, label)[0, 1]) }) for metric in ['mse', 'mae', 'r2', 'max_error', 'median_error', 'std_error', 'correlation']: values = [metrics[f"{name}_{metric}"] for name in output_names] metrics[f"avg_{metric}"] = float(np.mean(values)) return metrics def train(): TRAIN_FILES = ["./data/train_26.4k_20250703.jsonl"] VAL_FILES = ['./data/test_1.0k_20250703.jsonl'] CONFIG = { "model_path": '/nas_data/xiao/models/Qwen2.5-VL-7B-Instruct', "processor_path": '/nas_data/xiao/models/Qwen2.5-VL-7B-Instruct', "train_files": TRAIN_FILES, "val_files": VAL_FILES, "max_pixels": 301056, "max_length": 4096, "output_dir": "./output", "learning_rate": 1e-5, "batch_size": 4, "grad_accum_steps": 2, "num_epochs": 1, "warmup_ratio": 0.05, "weight_decay": 0.1, "logging_steps": 5, "eval_steps": 500, "save_steps": 500, "save_total_limit": 5, "fp16": False, "bf16": True, "gradient_checkpointing": True, "deepspeed_config": { "train_micro_batch_size_per_gpu": "auto", "gradient_accumulation_steps": "auto", "optimizer": { "type": "AdamW", "params": { "lr": "auto", "weight_decay": "auto", "betas": "auto", } }, "scheduler": { "type": "WarmupDecayLR", "params": { "warmup_min_lr": 0, "warmup_max_lr": 1e-4, "warmup_num_steps": "auto", "total_num_steps": "auto", } }, "fp16": { "enabled": "auto", }, "bf16": { "enabled": "auto", }, "zero_optimization": { "stage": 2, "offload_optimizer": { "device": "none", "pin_memory": True }, "allgather_partitions": True, "allgather_bucket_size": 2e8, "overlap_comm": True, "reduce_scatter": True, "reduce_bucket_size": 2e8, "contiguous_gradients": True }, "gradient_clipping": 1.0, "steps_per_print": 5, "wall_clock_breakdown": False } } # 初始化数据处理器和数据集 logger.info("Initializing data processor...") data_processor = MultiModalDataProcessor( processor_path=CONFIG["processor_path"], max_pixels=CONFIG["max_pixels"], max_length=CONFIG["max_length"], ) logger.info("Loading training data...") train_data = data_processor.load_data(CONFIG["train_files"]) logger.info("Loading validation data...") val_data = data_processor.load_data(CONFIG["val_files"]) logger.info("Creating datasets...") train_dataset = MultiModalDataset(train_data, data_processor) val_dataset = MultiModalDataset(val_data, data_processor) logger.info("Loading model...") model = MultiModalModel(CONFIG["model_path"]) # 计算总步数和1%的步数 batch_size = CONFIG["batch_size"] grad_accum_steps = CONFIG["grad_accum_steps"] num_epochs = CONFIG["num_epochs"] total_steps = (len(train_dataset) * num_epochs) // (batch_size * grad_accum_steps) logging_steps = max(1, total_steps // 100) # 每1%打印一次 logger.info(f"Total training steps: {total_steps}") logger.info(f"Logging every {logging_steps} steps") # 训练参数 - 添加自动日志步长 training_args = TrainingArguments( output_dir=CONFIG["output_dir"], remove_unused_columns=False, learning_rate=CONFIG["learning_rate"], per_device_train_batch_size=batch_size, gradient_accumulation_steps=grad_accum_steps, num_train_epochs=num_epochs, weight_decay=CONFIG["weight_decay"], warmup_ratio=CONFIG["warmup_ratio"], # 关键修改:设置日志记录策略 logging_strategy="steps", logging_steps=logging_steps, # 使用计算出的步数 save_steps=CONFIG["save_steps"], save_total_limit=CONFIG["save_total_limit"], fp16=CONFIG["fp16"], bf16=CONFIG["bf16"], gradient_checkpointing=CONFIG["gradient_checkpointing"], logging_first_step=True, save_strategy="steps", eval_strategy="steps", dataloader_num_workers=16, data_seed=42, greater_is_better=False, deepspeed=CONFIG["deepspeed_config"], eval_steps=CONFIG["eval_steps"], load_best_model_at_end=True, metric_for_best_model="eval_avg_mae", ) # 自定义回调类 class ProgressCallback(TrainerCallback): def __init__(self, total_steps): self.total_steps = total_steps self.last_reported = 0 def on_log(self, args, state, control, logs=None, **kwargs): if state.global_step == 0 or self.total_steps is None: return # 计算当前训练进度百分比 progress_percent = (state.global_step / self.total_steps) * 100 completed_percent = int(progress_percent) # 每1%打印一次 if completed_percent > self.last_reported: logger.info(f"Training progress: {completed_percent}% complete") logger.info(f"Current metrics: {logs}") self.last_reported = completed_percent # 创建训练器 logger.info("Creating trainer...") trainer = CustomTrainer( model=model, args=training_args, train_dataset=train_dataset, eval_dataset=val_dataset, compute_metrics=compute_metrics, callbacks=[ProgressCallback(total_steps)] # 添加自定义回调 ) # 开始训练 logger.info("Starting training...") try: train_result = trainer.train() trainer.save_model("./saved_models") logger.info(f"Training completed successfully. Metrics: {train_result.metrics}") except Exception as e: logger.error(f"Training failed: {e}") raise if __name__ == "__main__": train()

基于下列代码进行上述要求的修改 # -*- coding: utf-8 -*- """ Created on Sun Jul 20 22:17:32 2025 @author: srx20 """ # -*- coding: utf-8 -*- """ 股票预测筛选程序 - 支持自定义排名范围输出并屏蔽特定股票 """ import os import joblib import pandas as pd import numpy as np from tqdm import tqdm from typing import Dict, List, Tuple from sklearn.preprocessing import StandardScaler from sklearn.cluster import MiniBatchKMeans import talib as ta import logging from datetime import datetime import matplotlib.pyplot as plt from matplotlib import font_manager as fm import base64 from io import BytesIO # 设置中文字体支持 plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签 plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 # 设置日志记录 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('stock_prediction_filter.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # ========== 配置类 ========== class StockConfig: def __init__(self): # 数据路径 self.SH_PATH = r"D:\股票量化数据库\股票csv数据\上证" self.SZ_PATH = r"D:\股票量化数据库\股票csv数据\深证" # 聚类设置 self.CLUSTER_NUM = 8 self.CLUSTER_FEATURES = [ 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist' ] # 目标条件 self.MIN_GAIN = 0.05 self.MIN_LOW_RATIO = 0.98 # 预测特征 (初始列表,实际使用时会动态更新) self.PREDICT_FEATURES = [ 'open', 'high', 'low', 'close', 'volume', 'price_change', 'volatility', 'volume_change', 'MA5', 'MA20', 'RSI14', 'MACD_hist', 'cluster', 'MOM10', 'ATR14', 'VWAP', 'RSI_diff', 'price_vol_ratio', 'MACD_RSI', 'advance_decline', 'day_of_week', 'month' ] # 需要屏蔽的股票前缀 self.BLOCKED_PREFIXES = ['SZ_sz16', 'SH_sh88','SH_sh999','SH_sh688', 'SZ_sz300','SH_sh500'] # ========== 特征工程 ========== class FeatureEngineer: def __init__(self, config): self.config = config def safe_fillna(self, series, default=0): """安全填充NaN值""" if isinstance(series, pd.Series): return series.fillna(default) elif isinstance(series, np.ndarray): return np.nan_to_num(series, nan=default) return series def transform(self, df): """添加技术指标特征""" try: # 创建临时副本用于TA-Lib计算 df_temp = df.copy() # 将价格列转换为float64以满足TA-Lib要求 for col in ['open', 'high', 'low', 'close']: df_temp[col] = df_temp[col].astype(np.float64) # 基础特征 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 技术指标 rsi = ta.RSI(df_temp['close'].values, timeperiod=14) df['RSI14'] = self.safe_fillna(rsi, 50) macd, macd_signal, macd_hist = ta.MACD( df_temp['close'].values, fastperiod=12, slowperiod=26, signalperiod=9 ) df['MACD_hist'] = self.safe_fillna(macd_hist, 0) # 新增特征 mom = ta.MOM(df_temp['close'].values, timeperiod=10) df['MOM10'] = self.safe_fillna(mom, 0) atr = ta.ATR( df_temp['high'].values, df_temp['low'].values, df_temp['close'].values, timeperiod=14 ) df['ATR14'] = self.safe_fillna(atr, 0) # 成交量加权平均价 vwap = (df['volume'] * (df['high'] + df['low'] + df['close']) / 3).cumsum() / df['volume'].cumsum() df['VWAP'] = self.safe_fillna(vwap, 0) # 相对强弱指数差值 df['RSI_diff'] = df['RSI14'] - df['RSI14'].rolling(5).mean().fillna(0) # 价格波动比率 df['price_vol_ratio'] = df['price_change'] / (df['volatility'].replace(0, 1e-8) + 1e-8) # 技术指标组合特征 df['MACD_RSI'] = df['MACD_hist'] * df['RSI14'] # 市场情绪指标 df['advance_decline'] = (df['close'] > df['open']).astype(int).rolling(5).sum().fillna(0) # 时间特征 df['day_of_week'] = df['date'].dt.dayofweek df['month'] = df['date'].dt.month # 处理无穷大和NaN df = df.replace([np.inf, -np.inf], np.nan) df = df.fillna(0) return df except Exception as e: logger.error(f"特征工程失败: {str(e)}", exc_info=True) # 返回基本特征作为回退方案 df['price_change'] = df['close'].pct_change().fillna(0) df['volatility'] = df['close'].rolling(5).std().fillna(0) df['volume_change'] = df['volume'].pct_change().fillna(0) df['MA5'] = df['close'].rolling(5).mean().fillna(0) df['MA20'] = df['close'].rolling(20).mean().fillna(0) # 填充缺失的技术指标 for col in self.config.PREDICT_FEATURES: if col not in df.columns: df[col] = 0 return df # ========== 聚类模型 ========== class StockCluster: def __init__(self, config): self.config = config self.scaler = StandardScaler() self.kmeans = MiniBatchKMeans( n_clusters=config.CLUSTER_NUM, random_state=42, batch_size=1000 ) self.cluster_map = {} # 股票代码到聚类ID的映射 self.model_file = "stock_cluster_model.pkl" # 模型保存路径 def load(self): """从文件加载聚类模型""" if os.path.exists(self.model_file): model_data = joblib.load(self.model_file) self.kmeans = model_data['kmeans'] self.scaler = model_data['scaler'] self.cluster_map = model_data['cluster_map'] logger.info(f"从 {self.model_file} 加载聚类模型") return True else: logger.warning("聚类模型文件不存在") return False def transform(self, df, stock_code): """为数据添加聚类特征""" cluster_id = self.cluster_map.get(stock_code, -1) # 默认为-1表示未知聚类 df['cluster'] = cluster_id return df # ========== 数据加载函数 ========== def load_prediction_data(sh_path: str, sz_path: str, lookback_days: int = 30) -> Dict[str, pd.DataFrame]: """ 加载用于预测的股票数据(只加载最近lookback_days天的数据) """ stock_data = {} exchanges = [ ('SH', sh_path), ('SZ', sz_path) ] total_files = 0 for exchange, path in exchanges: if os.path.exists(path): csv_files = [f for f in os.listdir(path) if f.endswith('.csv')] total_files += len(csv_files) if total_files == 0: logger.warning("没有找到任何CSV文件") return stock_data pbar = tqdm(total=total_files, desc='加载股票数据') for exchange, path in exchanges: if not os.path.exists(path): continue for file in os.listdir(path): if not file.endswith('.csv'): continue stock_code = f"{exchange}_{file.split('.')[0]}" file_path = os.path.join(path, file) try: # 读取整个文件 df = pd.read_csv(file_path) # 验证必要的列是否存在 required_cols = ['date', 'open', 'high', 'low', 'close', 'volume'] if not all(col in df.columns for col in required_cols): logger.debug(f"股票 {stock_code} 缺少必要列,跳过") pbar.update(1) continue # 转换日期并排序 df['date'] = pd.to_datetime(df['date']) df = df.sort_values('date', ascending=False) # 只取最近lookback_days天的数据 if len(df) > lookback_days: df = df.head(lookback_days) # 转换数据类型 for col in ['open', 'high', 'low', 'close']: df[col] = pd.to_numeric(df[col], errors='coerce').astype(np.float32) df['volume'] = pd.to_numeric(df['volume'], errors='coerce').astype(np.uint32) # 删除包含NaN的行 df = df.dropna(subset=required_cols) if len(df) > 0: stock_data[stock_code] = df logger.debug(f"成功加载股票 {stock_code},数据条数: {len(df)}") else: logger.warning(f"股票 {stock_code} 无有效数据") except Exception as e: logger.error(f"加载股票 {stock_code} 失败: {str(e)}", exc_info=True) pbar.update(1) pbar.close() logger.info(f"成功加载 {len(stock_data)} 只股票数据") return stock_data # ========== 生成HTML报告 ========== def generate_html_report(top_stocks: List[Tuple[str, float]], prediction_date: str, model_version: str = "1.0", start_rank: int = 1, end_rank: int = 50, blocked_count: int = 0) -> str: """ 生成HTML格式的预测报告 参数: top_stocks: 包含(股票代码, 概率)元组的列表 prediction_date: 预测日期 model_version: 模型版本号 start_rank: 起始排名 end_rank: 结束排名 blocked_count: 屏蔽的股票数量 返回: HTML字符串 """ # 创建DataFrame df = pd.DataFrame(top_stocks, columns=['股票代码', '上涨概率']) df['排名'] = range(start_rank, start_rank + len(df)) # 创建技术指标图表 plt.figure(figsize=(10, 6)) plt.bar(df['股票代码'], df['上涨概率'], color='skyblue') plt.title(f'Top {start_rank}-{end_rank}股票上涨概率分布', fontsize=16) plt.xlabel('股票代码', fontsize=12) plt.ylabel('上涨概率', fontsize=12) plt.xticks(rotation=90, fontsize=8) plt.ylim(0.7, 1.0) plt.grid(axis='y', linestyle='--', alpha=0.7) # 将图表转换为Base64编码 buf = BytesIO() plt.savefig(buf, format='png', bbox_inches='tight') buf.seek(0) chart_base64 = base64.b64encode(buf.read()).decode('utf-8') plt.close() # 生成HTML内容 html_content = f""" <!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>大涨小跌预测结果 ({start_rank}-{end_rank}名)</title> <style> body {{ font-family: 'Microsoft YaHei', sans-serif; margin: 0; padding: 20px; background-color: #f5f7fa; color: #333; }} .container {{ max-width: 1200px; margin: 0 auto; background-color: white; border-radius: 10px; box-shadow: 0 0 20px rgba(0, 0, 0, 0.1); padding: 30px; }} .header {{ text-align: center; padding-bottom: 20px; border-bottom: 1px solid #eee; margin-bottom: 30px; }} .header h1 {{ color: #1e3a8a; margin-bottom: 10px; }} .header .subtitle {{ color: #6b7280; font-size: 18px; }} .info-box {{ background-color: #f0f7ff; border-left: 4px solid #3b82f6; padding: 15px; margin-bottom: 30px; border-radius: 0 5px 5px 0; }} .chart-container {{ text-align: center; margin-bottom: 30px; }} .chart-container img {{ max-width: 100%; border-radius: 5px; box-shadow: 0 0 10px rgba(0, 0, 0, 0.1); }} table {{ width: 100%; border-collapse: collapse; margin-bottom: 30px; }} th, td {{ padding: 12px 15px; text-align: center; border-bottom: 1px solid #e5e7eb; }} th {{ background-color: #3b82f6; color: white; font-weight: bold; }} tr:nth-child(even) {{ background-color: #f9fafb; }} tr:hover {{ background-color: #f0f7ff; }} .footer {{ text-align: center; padding-top: 20px; border-top: 1px solid #eee; color: #6b7280; font-size: 14px; }} .highlight {{ color: #10b981; font-weight: bold; }} .rank-1 {{ background-color: #ffeb3b; }} .rank-2 {{ background-color: #e0e0e0; }} .rank-3 {{ background-color: #ff9800; }} .blocked-info {{ background-color: #fff3cd; border-left: 4px solid #ffc107; padding: 10px; margin-top: 15px; border-radius: 0 5px 5px 0; }} </style> </head> <body> 大涨小跌预测结果 ({start_rank}-{end_rank}名) 基于机器学习模型的股票预测分析 预测日期:{prediction_date} 模型版本:{model_version} 排名范围:{start_rank}-{end_rank}名 筛选条件:收盘价 > 开盘价 × 105% 且 最低价 > 开盘价 × 98% 屏蔽规则:已过滤掉特定前缀的股票(SZ_sz16, SH_sh88) 说明:本报告基于历史数据预测,不构成投资建议 Top {start_rank}-{end_rank}股票上涨概率分布图 股票上涨概率分布图 详细预测结果 排名 股票代码 上涨概率 预测评级 """ # 添加表格行 for i, (stock_code, prob) in enumerate(top_stocks): rank = start_rank + i rating = "" row_class = "" if prob >= 0.95: rating = "⭐⭐⭐⭐⭐" elif prob >= 0.9: rating = "⭐⭐⭐⭐" elif prob >= 0.85: rating = "⭐⭐⭐" elif prob >= 0.8: rating = "⭐⭐" else: rating = "⭐" if rank == 1: row_class = "class='rank-1'" elif rank == 2: row_class = "class='rank-2'" elif rank == 3: row_class = "class='rank-3'" html_content += f""" {rank} {stock_code} {prob:.4f} {rating} """ # 添加HTML尾部 html_content += f""" 屏蔽信息:在预测过程中已过滤掉 {blocked_count} 只以特定前缀开头的股票 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | 预测模型:LightGBM分类器 © 2025 股票量化分析系统 | 本报告仅供研究参考 </body> </html> """ return html_content # ========== 主预测函数 ========== def predict_top_stocks(model_path: str = "stock_prediction_model.pkl", top_n: int = 50, start_rank: int = 1, end_rank: int = 50) -> List[Tuple[str, float]]: """ 预测满足条件的Top N股票并生成HTML报告 新增参数: start_rank: 起始排名 (包含) end_rank: 结束排名 (包含) """ # 验证排名范围 if start_rank < 1: logger.warning(f"起始排名不能小于1,已自动调整为1 (原值: {start_rank})") start_rank = 1 if end_rank < start_rank: logger.warning(f"结束排名不能小于起始排名,已自动调整为起始排名 (原值: end_rank={end_rank}, start_rank={start_rank})") end_rank = start_rank if end_rank > top_n: logger.warning(f"结束排名不能大于总排名数,已自动调整为最大排名数 (原值: end_rank={end_rank}, top_n={top_n})") end_rank = top_n # 1. 初始化配置 config = StockConfig() logger.info(f"===== 股票预测筛选程序 (排名范围: {start_rank}-{end_rank}) =====") logger.info(f"屏蔽规则: 将过滤掉前缀为 {config.BLOCKED_PREFIXES} 的股票") # 2. 加载模型 if not os.path.exists(model_path): logger.error(f"模型文件 {model_path} 不存在") return [] try: model, selected_features = joblib.load(model_path) logger.info(f"成功加载预测模型,使用特征: {selected_features}") except Exception as e: logger.error(f"加载模型失败: {str(e)}", exc_info=True) return [] # 3. 加载聚类模型 cluster_model = StockCluster(config) cluster_model_loaded = cluster_model.load() if not cluster_model_loaded: logger.warning("无法加载聚类模型,使用默认聚类") # 4. 加载股票数据(最近30天) logger.info("加载股票数据...") stock_data = load_prediction_data(config.SH_PATH, config.SZ_PATH, lookback_days=30) if not stock_data: logger.error("没有加载到任何股票数据") return [] # 5. 初始化特征工程 feature_engineer = FeatureEngineer(config) # 6. 准备预测数据 predictions = [] blocked_stocks = [] # 存储被屏蔽的股票 logger.info("处理股票数据并进行预测...") for stock_code, df in tqdm(stock_data.items(), desc="预测股票"): try: # 检查是否需要屏蔽该股票 if any(stock_code.startswith(prefix) for prefix in config.BLOCKED_PREFIXES): blocked_stocks.append(stock_code) continue # 确保数据按日期升序排列(用于正确计算指标) df = df.sort_values('date', ascending=True) # 特征工程 df = feature_engineer.transform(df.copy()) # 添加聚类特征 if cluster_model_loaded: df = cluster_model.transform(df, stock_code) # 获取最新一天的数据(用于预测) latest_data = df.iloc[-1:].copy() # 确保所有特征都存在 for feature in selected_features: if feature not in latest_data.columns: latest_data[feature] = 0 # 选择模型使用的特征 X_pred = latest_data[selected_features] # 预测概率(类别1的概率) proba = model.predict_proba(X_pred)[0, 1] # 添加到预测结果 predictions.append((stock_code, proba)) except Exception as e: logger.error(f"处理股票 {stock_code} 失败: {str(e)}", exc_info=True) # 记录屏蔽信息 blocked_count = len(blocked_stocks) logger.info(f"已屏蔽 {blocked_count} 只股票: {blocked_stocks}") # 7. 按概率排序并取指定排名范围 predictions.sort(key=lambda x: x[1], reverse=True) top_predictions = predictions[:top_n] # 先取全部top_n # 筛选指定排名范围 selected_predictions = top_predictions[start_rank-1:end_rank] # 列表索引从0开始 # 8. 生成HTML报告 prediction_date = datetime.now().strftime("%Y-%m-%d") html_content = generate_html_report(selected_predictions, prediction_date, start_rank=start_rank, end_rank=end_rank, blocked_count=blocked_count) # 9. 保存HTML报告 html_file = f"大涨小跌预测结果_{start_rank}-{end_rank}名.html" with open(html_file, "w", encoding="utf-8") as f: f.write(html_content) logger.info(f"已生成HTML报告: {html_file}") return selected_predictions if __name__ == "__main__": # 示例用法: # 获取第1-10名 #top_1_10 = predict_top_stocks(top_n=50, start_rank=1, end_rank=10) # 获取第11-30名 #top_11_30 = predict_top_stocks(top_n=50, start_rank=11, end_rank=30) # 获取第31-50名 #top_31_50 = predict_top_stocks(top_n=50, start_rank=31, end_rank=50) # 自定义范围 (例如第5-15名) custom_range = predict_top_stocks(top_n=50, start_rank=1, end_rank=50) # 保存所有CSV结果 results = [ #(top_1_10, "1-10"), #(top_11_30, "11-30"), #(top_31_50, "31-50"), (custom_range, "1-50") ] for result, range_name in results: if result: result_df = pd.DataFrame(result, columns=['股票代码', '上涨概率']) # 计算实际排名范围 actual_start = int(range_name.split('-')[0]) actual_end = actual_start + len(result) - 1 result_df['排名'] = range(actual_start, actual_end + 1) result_df.to_csv(f'大涨小跌预测结果_{range_name}名.csv', index=False, encoding='utf-8-sig') logger.info(f"结果已保存到 大涨小跌预测结果_{range_name}名.csv")

import os import time import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split from sklearn.metrics import mean_absolute_error, mean_squared_error import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader, TensorDataset from torch.optim.lr_scheduler import ReduceLROnPlateau # 设置随机种子确保结果可复现 torch.manual_seed(42) np.random.seed(42) sns.set_style('whitegrid') # 设备配置 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"使用设备: {device}") # 1. 数据加载与预处理函数 # -------------------------------------------------- def load_and_preprocess_data(): """加载并预处理所有数据源""" print("开始数据加载与预处理...") start_time = time.time() # 加载EC气象数据 ec_df = pd.read_csv('阿拉山口风电场_EC_data.csv', parse_dates=['生成日期', '预测日期']) ec_df = ec_df[ec_df['场站名'] == '阿拉山口风电场'] # 计算EC风速和风向 ec_df['EC风速(m/s)'] = np.sqrt(ec_df['U风分量(m/s)']**2 + ec_df['V风分量(m/s)']**2) ec_df['EC风向(度)'] = np.degrees(np.arctan2(ec_df['V风分量(m/s)'], ec_df['U风分量(m/s)'])) % 360 # 添加EC数据可用时间(生成时间+12小时) ec_df['可用时间'] = ec_df['生成日期'] + pd.Timedelta(hours=12) # 选择关键特征 ec_features = [ '可用时间', '预测日期', 'EC风速(m/s)', 'EC风向(度)', '位势高度_850hPa(gpm)', '温度_850hPa(K)', '相对湿度_850hPa(%)', '位势高度_500hPa(gpm)', '温度_500hPa(K)' ] ec_df = ec_df[ec_features] # 加载风机数据 turbine_df = pd.read_csv('阿拉山口风电场风机数据.csv', encoding='gbk', parse_dates=[0]) turbine_df.columns = ['timestamp', 'wind_speed', 'active_power'] # 加载远动数据 scada_df = pd.read_csv('阿拉山口风电场远动数据.csv', encoding='gbk', parse_dates=[0]) scada_df.columns = ['timestamp', 'active_power_total'] # 合并风机和远动数据 power_df = pd.merge(turbine_df[['timestamp', 'wind_speed']], scada_df, on='timestamp', how='outer') # 按时间排序并填充缺失值 power_df.sort_values('timestamp', inplace=True) power_df['active_power_total'].ffill(inplace=True) power_df['wind_speed'].ffill(inplace=True) # 创建完整的时间序列索引(15分钟间隔) full_range = pd.date_range( start=power_df['timestamp'].min(), end=power_df['timestamp'].max(), freq='15T' ) power_df = power_df.set_index('timestamp').reindex(full_range).reset_index() power_df.rename(columns={'index': 'timestamp'}, inplace=True) power_df[['wind_speed', 'active_power_total']] = power_df[['wind_speed', 'active_power_total']].ffill() # 合并EC数据到主数据集 ec_data = [] for idx, row in power_df.iterrows(): ts = row['timestamp'] # 获取可用的EC预测(可用时间 <= 当前时间) available_ec = ec_df[ec_df['可用时间'] <= ts] if not available_ec.empty: # 获取最近发布的EC数据 latest_gen = available_ec['可用时间'].max() latest_ec = available_ec[available_ec['可用时间'] == latest_gen] # 找到最接近当前时间点的预测 time_diff = (latest_ec['预测日期'] - ts).abs() closest_idx = time_diff.idxmin() ec_point = latest_ec.loc[closest_idx].copy() ec_point['timestamp'] = ts ec_data.append(ec_point) # 创建EC数据DataFrame并合并 ec_ts_df = pd.DataFrame(ec_data) merged_df = pd.merge(power_df, ec_ts_df, on='timestamp', how='left') # 填充缺失的EC数据 ec_cols = [col for col in ec_ts_df.columns if col not in ['timestamp', '可用时间', '预测日期']] for col in ec_cols: merged_df[col] = merged_df[col].interpolate(method='time') # 添加时间特征 merged_df['hour'] = merged_df['timestamp'].dt.hour merged_df['day_of_week'] = merged_df['timestamp'].dt.dayofweek merged_df['day_of_year'] = merged_df['timestamp'].dt.dayofyear merged_df['month'] = merged_df['timestamp'].dt.month # 计算实际风向(如果有测风塔数据,这里使用EC风向) merged_df['风向(度)'] = merged_df['EC风向(度)'] # 移除包含NaN的行 merged_df.dropna(inplace=True) # 特征选择 feature_cols = [ 'wind_speed', 'active_power_total', 'EC风速(m/s)', '风向(度)', '位势高度_850hPa(gpm)', '温度_850hPa(K)', '相对湿度_850hPa(%)', '位势高度_500hPa(gpm)', '温度_500hPa(K)', 'hour', 'day_of_week', 'day_of_year', 'month' ] target_col = 'active_power_total' print(f"数据处理完成! 耗时: {time.time()-start_time:.2f}秒") print(f"数据集形状: {merged_df.shape}") print(f"特征数量: {len(feature_cols)}") return merged_df[feature_cols], merged_df[target_col], merged_df['timestamp'] # 2. 数据准备类 (PyTorch Dataset) # -------------------------------------------------- class WindPowerDataset(Dataset): """风功率预测数据集类""" def __init__(self, X, y, look_back, forecast_steps): """ :param X: 特征数据 (n_samples, n_features) :param y: 目标数据 (n_samples,) :param look_back: 回溯时间步长 :param forecast_steps: 预测步长 """ self.X = X self.y = y self.look_back = look_back self.forecast_steps = forecast_steps self.n_samples = len(X) - look_back - forecast_steps + 1 def __len__(self): return self.n_samples def __getitem__(self, idx): # 获取历史序列 x_seq = self.X[idx:idx+self.look_back] # 获取未来目标序列 y_seq = self.y[idx+self.look_back:idx+self.look_back+self.forecast_steps] # 转换为PyTorch张量 x_tensor = torch.tensor(x_seq, dtype=torch.float32) y_tensor = torch.tensor(y_seq, dtype=torch.float32) return x_tensor, y_tensor # 3. LSTM模型 (PyTorch实现) # -------------------------------------------------- class WindPowerLSTM(nn.Module): """风功率预测LSTM模型""" def __init__(self, input_size, hidden_size, num_layers, output_steps): """ :param input_size: 输入特征维度 :param hidden_size: LSTM隐藏层大小 :param num_layers: LSTM层数 :param output_steps: 输出步长 """ super(WindPowerLSTM, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.output_steps = output_steps # LSTM层 self.lstm = nn.LSTM( input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, dropout=0.2 if num_layers > 1 else 0 ) # 全连接层 self.fc = nn.Sequential( nn.Linear(hidden_size, 128), nn.ReLU(), nn.Dropout(0.3), nn.Linear(128, 64), nn.ReLU(), nn.Dropout(0.2), nn.Linear(64, output_steps) ) def forward(self, x): # 初始化隐藏状态 h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) # 前向传播LSTM out, _ = self.lstm(x, (h0, c0)) # 只取最后一个时间步的输出 out = out[:, -1, :] # 全连接层 out = self.fc(out) return out # 4. 训练和评估函数 # -------------------------------------------------- def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, epochs, model_name): """训练模型""" print(f"\n开始训练 {model_name} 模型...") start_time = time.time() best_val_loss = float('inf') history = {'train_loss': [], 'val_loss': []} for epoch in range(epochs): # 训练阶段 model.train() train_loss = 0.0 for inputs, targets in train_loader: inputs, targets = inputs.to(device), targets.to(device) # 前向传播 outputs = model(inputs) loss = criterion(outputs, targets) # 反向传播和优化 optimizer.zero_grad() loss.backward() optimizer.step() train_loss += loss.item() * inputs.size(0) # 验证阶段 model.eval() val_loss = 0.0 with torch.no_grad(): for inputs, targets in val_loader: inputs, targets = inputs.to(device), targets.to(device) outputs = model(inputs) loss = criterion(outputs, targets) val_loss += loss.item() * inputs.size(0) # 计算平均损失 train_loss = train_loss / len(train_loader.dataset) val_loss = val_loss / len(val_loader.dataset) history['train_loss'].append(train_loss) history['val_loss'].append(val_loss) # 更新学习率 if scheduler: scheduler.step(val_loss) # 保存最佳模型 if val_loss < best_val_loss: best_val_loss = val_loss torch.save(model.state_dict(), f'best_{model_name}_model.pth') # 打印进度 if (epoch + 1) % 5 == 0: print(f"Epoch [{epoch+1}/{epochs}] - Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}") print(f"训练完成! 耗时: {time.time()-start_time:.2f}秒") print(f"最佳验证损失: {best_val_loss:.6f}") return history def evaluate_model(model, test_loader, scaler, model_name): """评估模型性能""" model.eval() actuals = [] predictions = [] with torch.no_grad(): for inputs, targets in test_loader: inputs = inputs.to(device) # 预测 outputs = model(inputs) # 反归一化 outputs_np = outputs.cpu().numpy() targets_np = targets.numpy() # 反归一化 outputs_inv = scaler.inverse_transform(outputs_np) targets_inv = scaler.inverse_transform(targets_np.reshape(-1, 1)).flatten() # 收集结果 actuals.extend(targets_inv) predictions.extend(outputs_inv.flatten()) # 转换为numpy数组 actuals = np.array(actuals) predictions = np.array(predictions) # 计算性能指标 mae = mean_absolute_error(actuals, predictions) rmse = np.sqrt(mean_squared_error(actuals, predictions)) print(f"\n{model_name} 模型评估结果:") print(f"MAE: {mae:.2f} kW") print(f"RMSE: {rmse:.2f} kW") return actuals, predictions, mae, rmse # 5. 可视化函数 # -------------------------------------------------- def plot_predictions(actuals, predictions, timestamps, model_name, forecast_steps, mae): """可视化预测结果""" # 创建结果DataFrame results = pd.DataFrame({ 'timestamp': timestamps, 'actual': actuals, 'predicted': predictions }) # 设置时间索引 results.set_index('timestamp', inplace=True) # 选择一段代表性的时间序列展示 sample = results.iloc[1000:1300] plt.figure(figsize=(15, 7)) # 绘制实际值 plt.plot(sample.index, sample['actual'], label='实际功率', color='blue', alpha=0.7, linewidth=2) # 绘制预测值 plt.plot(sample.index, sample['predicted'], label='预测功率', color='red', alpha=0.7, linestyle='--', linewidth=2) plt.title(f'{model_name}风功率预测 (预测步长: {forecast_steps}步, MAE: {mae:.2f} kW)', fontsize=14) plt.xlabel('时间', fontsize=12) plt.ylabel('有功功率 (kW)', fontsize=12) plt.legend(fontsize=12) plt.grid(True, linestyle='--', alpha=0.7) plt.xticks(rotation=45) plt.tight_layout() plt.savefig(f'{model_name}_prediction_plot.png', dpi=300) plt.show() return results def plot_training_history(history, model_name): """绘制训练过程中的损失曲线""" plt.figure(figsize=(12, 6)) # 绘制训练损失 plt.plot(history['train_loss'], label='训练损失') # 绘制验证损失 if 'val_loss' in history: plt.plot(history['val_loss'], label='验证损失') plt.title(f'{model_name} 训练过程', fontsize=14) plt.xlabel('训练轮次', fontsize=12) plt.ylabel('损失 (MSE)', fontsize=12) plt.legend(fontsize=12) plt.grid(True, linestyle='--', alpha=0.7) plt.tight_layout() plt.savefig(f'{model_name}_training_history.png', dpi=300) plt.show() # 6. 主函数 # -------------------------------------------------- def main(): # 加载数据 X, y, timestamps = load_and_preprocess_data() # 定义预测配置 ULTRA_SHORT_CONFIG = { 'name': '超短期', 'look_back': 24, # 6小时历史 (24*15min) 'forecast_steps': 16, # 4小时预测 (16*15min) 'batch_size': 64, 'hidden_size': 128, 'num_layers': 2, 'epochs': 100, 'lr': 0.001 } SHORT_TERM_CONFIG = { 'name': '短期', 'look_back': 96, # 24小时历史 (96*15min) 'forecast_steps': 288, # 72小时预测 (288*15min) 'batch_size': 32, 'hidden_size': 256, 'num_layers': 3, 'epochs': 150, 'lr': 0.0005 } # 准备超短期预测数据集 print("\n准备超短期预测数据集...") # 数据标准化 X_scaler = StandardScaler() y_scaler = StandardScaler() X_scaled = X_scaler.fit_transform(X) y_scaled = y_scaler.fit_transform(y.values.reshape(-1, 1)).flatten() # 创建数据集 dataset = WindPowerDataset(X_scaled, y_scaled, ULTRA_SHORT_CONFIG['look_back'], ULTRA_SHORT_CONFIG['forecast_steps']) # 划分数据集 train_size = int(0.8 * len(dataset)) val_size = int(0.1 * len(dataset)) test_size = len(dataset) - train_size - val_size train_dataset, val_dataset, test_dataset = torch.utils.data.random_split( dataset, [train_size, val_size, test_size] ) # 创建数据加载器 train_loader = DataLoader(train_dataset, batch_size=ULTRA_SHORT_CONFIG['batch_size'], shuffle=True) val_loader = DataLoader(val_dataset, batch_size=ULTRA_SHORT_CONFIG['batch_size']) test_loader = DataLoader(test_dataset, batch_size=ULTRA_SHORT_CONFIG['batch_size']) # 创建模型 model_ultra = WindPowerLSTM( input_size=X.shape[1], hidden_size=ULTRA_SHORT_CONFIG['hidden_size'], num_layers=ULTRA_SHORT_CONFIG['num_layers'], output_steps=ULTRA_SHORT_CONFIG['forecast_steps'] ).to(device) # 损失函数和优化器 criterion = nn.MSELoss() optimizer = optim.Adam(model_ultra.parameters(), lr=ULTRA_SHORT_CONFIG['lr']) scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True) # 训练模型 history_ultra = train_model( model_ultra, train_loader, val_loader, criterion, optimizer, scheduler, ULTRA_SHORT_CONFIG['epochs'], 'ultra_short' ) # 评估模型 actuals_ultra, preds_ultra, mae_ultra, rmse_ultra = evaluate_model( model_ultra, test_loader, y_scaler, '超短期' ) # 可视化结果 plot_training_history(history_ultra, '超短期模型') # 获取对应的时间戳 ultra_timestamps = timestamps[ULTRA_SHORT_CONFIG['look_back']:][:len(actuals_ultra)] results_ultra = plot_predictions( actuals_ultra, preds_ultra, ultra_timestamps, '超短期', ULTRA_SHORT_CONFIG['forecast_steps'], mae_ultra ) # 准备短期预测数据集 print("\n准备短期预测数据集...") # 创建数据集 dataset_short = WindPowerDataset(X_scaled, y_scaled, SHORT_TERM_CONFIG['look_back'], SHORT_TERM_CONFIG['forecast_steps']) # 划分数据集 train_size_short = int(0.8 * len(dataset_short)) val_size_short = int(0.1 * len(dataset_short)) test_size_short = len(dataset_short) - train_size_short - val_size_short train_dataset_short, val_dataset_short, test_dataset_short = torch.utils.data.random_split( dataset_short, [train_size_short, val_size_short, test_size_short] ) # 创建数据加载器 train_loader_short = DataLoader(train_dataset_short, batch_size=SHORT_TERM_CONFIG['batch_size'], shuffle=True) val_loader_short = DataLoader(val_dataset_short, batch_size=SHORT_TERM_CONFIG['batch_size']) test_loader_short = DataLoader(test_dataset_short, batch_size=SHORT_TERM_CONFIG['batch_size']) # 创建模型 model_short = WindPowerLSTM( input_size=X.shape[1], hidden_size=SHORT_TERM_CONFIG['hidden_size'], num_layers=SHORT_TERM_CONFIG['num_layers'], output_steps=SHORT_TERM_CONFIG['forecast_steps'] ).to(device) # 损失函数和优化器 optimizer_short = optim.Adam(model_short.parameters(), lr=SHORT_TERM_CONFIG['lr']) scheduler_short = ReduceLROnPlateau(optimizer_short, mode='min', factor=0.5, patience=10, verbose=True) # 训练模型 history_short = train_model( model_short, train_loader_short, val_loader_short, criterion, optimizer_short, scheduler_short, SHORT_TERM_CONFIG['epochs'], 'short_term' ) # 评估模型 actuals_short, preds_short, mae_short, rmse_short = evaluate_model( model_short, test_loader_short, y_scaler, '短期' ) # 可视化结果 plot_training_history(history_short, '短期模型') # 获取对应的时间戳 short_timestamps = timestamps[SHORT_TERM_CONFIG['look_back']:][:len(actuals_short)] results_short = plot_predictions( actuals_short, preds_short, short_timestamps, '短期', SHORT_TERM_CONFIG['forecast_steps'], mae_short ) # 最终报告 print("\n" + "="*50) print("风功率预测模型训练完成!") print("="*50) print(f"超短期模型 (4小时预测):") print(f" - 回溯步长: {ULTRA_SHORT_CONFIG['look_back']} (6小时)") print(f" - 预测步长: {ULTRA_SHORT_CONFIG['forecast_steps']} (4小时)") print(f" - 测试集MAE: {mae_ultra:.2f} kW") print(f" - 测试集RMSE: {rmse_ultra:.2f} kW") print(f"\n短期模型 (72小时预测):") print(f" - 回溯步长: {SHORT_TERM_CONFIG['look_back']} (24小时)") print(f" - 预测步长: {SHORT_TERM_CONFIG['forecast_steps']} (72小时)") print(f" - 测试集MAE: {mae_short:.2f} kW") print(f" - 测试集RMSE: {rmse_short:.2f} kW") print("="*50) # 保存预测结果 results_df = pd.DataFrame({ 'timestamp': short_timestamps, '实际功率': actuals_short, '超短期预测': results_ultra['predicted'].values[:len(actuals_short)], '短期预测': preds_short }) results_df.to_csv('风功率预测结果.csv', index=False) print("预测结果已保存到 '风功率预测结果.csv'") if __name__ == "__main__": main() 请在上述的代码基础上修改,本次修改要求只使用"阿拉山口风电场_EC_data"和"阿拉山口风电场风机数据"这两个数据集进行训练和预测。

import tkinter as tk from tkinter import ttk, filedialog, messagebox import pandas as pd import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import tensorflow as tf from tensorflow.keras.models import Model from tensorflow.keras.layers import Input, Dense, Lambda from tensorflow.keras.optimizers import Adam from sklearn.preprocessing import MinMaxScaler import os import time mpl.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Arial Unicode MS'] mpl.rcParams['axes.unicode_minus'] = False # 关键修复:使用 ASCII 减号 # 设置中文字体支持 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False class PINNModel(tf.keras.Model): def __init__(self, num_layers=4, hidden_units=32, **kwargs): super(PINNModel, self).__init__(**kwargs) self.dense_layers = [Dense(hidden_units, activation='tanh') for _ in range(num_layers)] self.final_layer = Dense(1, activation='linear') # 添加带约束的物理参数 self.k_raw = tf.Variable(0.01, trainable=True, dtype=tf.float32, name='k_raw') self.k = tf.math.sigmoid(self.k_raw) * 0.5 # 约束在0-0.5之间 def call(self, inputs): t, h = inputs x = tf.concat([t, h], axis=1) for layer in self.dense_layers: x = layer(x) return self.final_layer(x) def physics_loss(self, t, h_current): """计算物理损失(基于离散渗流方程)""" # 预测下一时刻的水位 h_next_pred = self([t, h_current]) # 离散渗流方程: h_{t+1} = h_t - k * h_t (时间步长=1) residual = h_next_pred - h_current * (1 - self.k) return tf.reduce_mean(tf.square(residual)) class DamSeepageModel: def __init__(self, root): self.root = root self.root.title("大坝渗流预测模型(PINNs)") self.root.geometry("1200x800") # 初始化数据 self.train_df = None #训练集 self.test_df = None #测试集 self.model = None self.scaler = MinMaxScaler(feature_range=(0, 1)) self.evaluation_metrics = {} # 创建主界面 self.create_widgets() def create_widgets(self): # 创建主框架 main_frame = ttk.Frame(self.root, padding=10) main_frame.pack(fill=tk.BOTH, expand=True) # 左侧控制面板 control_frame = ttk.LabelFrame(main_frame, text="模型控制", padding=10) control_frame.pack(side=tk.LEFT, fill=tk.Y, padx=5, pady=5) # 文件选择部分 file_frame = ttk.LabelFrame(control_frame, text="数据文件", padding=10) file_frame.pack(fill=tk.X, pady=5) # 训练集选择 ttk.Label(file_frame, text="训练集:").grid(row=0, column=0, sticky=tk.W, pady=5) self.train_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.train_file_var, width=30, state='readonly').grid(row=0, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("train")).grid(row=0, column=2) # 测试集选择 ttk.Label(file_frame, text="测试集:").grid(row=1, column=0, sticky=tk.W, pady=5) self.test_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.test_file_var, width=30, state='readonly').grid(row=1, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("test")).grid(row=1, column=2) # PINNs参数设置 param_frame = ttk.LabelFrame(control_frame, text="PINNs参数", padding=10) param_frame.pack(fill=tk.X, pady=10) # 验证集切分比例 ttk.Label(param_frame, text="验证集比例:").grid(row=0, column=0, sticky=tk.W, pady=5) self.split_ratio_var = tk.DoubleVar(value=0.2) ttk.Spinbox(param_frame, from_=0, to=1, increment=0.05, textvariable=self.split_ratio_var, width=10).grid(row=0, column=1, padx=5) # 隐藏层数量 ttk.Label(param_frame, text="网络层数:").grid(row=1, column=0, sticky=tk.W, pady=5) self.num_layers_var = tk.IntVar(value=4) ttk.Spinbox(param_frame, from_=2, to=8, increment=1, textvariable=self.num_layers_var, width=10).grid(row=1, column=1, padx=5) # 每层神经元数量 ttk.Label(param_frame, text="神经元数/层:").grid(row=2, column=0, sticky=tk.W, pady=5) self.hidden_units_var = tk.IntVar(value=32) ttk.Spinbox(param_frame, from_=16, to=128, increment=4, textvariable=self.hidden_units_var, width=10).grid(row=2, column=1, padx=5) # 训练轮次 ttk.Label(param_frame, text="训练轮次:").grid(row=3, column=0, sticky=tk.W, pady=5) self.epochs_var = tk.IntVar(value=500) ttk.Spinbox(param_frame, from_=100, to=2000, increment=100, textvariable=self.epochs_var, width=10).grid(row=3, column=1, padx=5) # 物理损失权重 ttk.Label(param_frame, text="物理损失权重:").grid(row=4, column=0, sticky=tk.W, pady=5) self.physics_weight_var = tk.DoubleVar(value=0.5) ttk.Spinbox(param_frame, from_=0.1, to=1.0, increment=0.1, textvariable=self.physics_weight_var, width=10).grid(row=4, column=1, padx=5) # 控制按钮 btn_frame = ttk.Frame(control_frame) btn_frame.pack(fill=tk.X, pady=10) ttk.Button(btn_frame, text="训练模型", command=self.train_model).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="预测结果", command=self.predict).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="保存结果", command=self.save_results).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="重置", command=self.reset).pack(side=tk.RIGHT, padx=5) # 状态栏 self.status_var = tk.StringVar(value="就绪") status_bar = ttk.Label(control_frame, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W) status_bar.pack(fill=tk.X, side=tk.BOTTOM) # 右侧结果显示区域 result_frame = ttk.Frame(main_frame) result_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=5, pady=5) # 创建标签页 self.notebook = ttk.Notebook(result_frame) self.notebook.pack(fill=tk.BOTH, expand=True) # 损失曲线标签页 self.loss_frame = ttk.Frame(self.notebook) self.notebook.add(self.loss_frame, text="训练损失") # 预测结果标签页 self.prediction_frame = ttk.Frame(self.notebook) self.notebook.add(self.prediction_frame, text="预测结果") # 指标显示 self.metrics_var = tk.StringVar() metrics_label = ttk.Label( self.prediction_frame, textvariable=self.metrics_var, font=('TkDefaultFont', 10, 'bold'), relief='ridge', padding=5 ) metrics_label.pack(fill=tk.X, padx=5, pady=5) # 初始化绘图区域 self.fig, self.ax = plt.subplots(figsize=(10, 6)) self.canvas = FigureCanvasTkAgg(self.fig, master=self.prediction_frame) self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) # 损失曲线画布 self.loss_fig, self.loss_ax = plt.subplots(figsize=(10, 4)) self.loss_canvas = FigureCanvasTkAgg(self.loss_fig, master=self.loss_frame) self.loss_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) def select_file(self, file_type): """选择Excel文件""" file_path = filedialog.askopenfilename( title=f"选择{file_type}集Excel文件", filetypes=[("Excel文件", "*.xlsx *.xls"), ("所有文件", "*.*")] ) if file_path: try: df = pd.read_excel(file_path) # 时间特征处理 time_features = ['year', 'month', 'day'] missing_time_features = [feat for feat in time_features if feat not in df.columns] if missing_time_features: messagebox.showerror("列名错误", f"Excel文件缺少预处理后的时间特征列: {', '.join(missing_time_features)}") return # 创建时间戳列 (增强兼容性) time_cols = ['year', 'month', 'day'] if 'hour' in df.columns: time_cols.append('hour') if 'minute' in df.columns: time_cols.append('minute') if 'second' in df.columns: time_cols.append('second') # 填充缺失的时间单位 for col in ['hour', 'minute', 'second']: if col not in df.columns: df[col] = 0 df['datetime'] = pd.to_datetime(df[time_cols]) # 设置时间索引 df = df.set_index('datetime') # 计算相对时间(天) df['days'] = (df.index - df.index[0]).days # 保存数据 if file_type == "train": self.train_df = df self.train_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载训练集: {len(self.train_df)}条数据") else: self.test_df = df self.test_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载测试集: {len(self.test_df)}条数据") except Exception as e: messagebox.showerror("文件错误", f"读取文件失败: {str(e)}") def calculate_metrics(self, y_true, y_pred): """计算评估指标""" from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score mse = mean_squared_error(y_true, y_pred) rmse = np.sqrt(mse) mae = mean_absolute_error(y_true, y_pred) non_zero_idx = np.where(y_true != 0)[0] if len(non_zero_idx) > 0: mape = np.mean(np.abs((y_true[non_zero_idx] - y_pred[non_zero_idx]) / y_true[non_zero_idx])) * 100 else: mape = float('nan') r2 = r2_score(y_true, y_pred) return { 'MSE': mse, 'RMSE': rmse, 'MAE': mae, 'MAPE': mape, 'R2': r2 } def train_model(self): """训练PINNs模型(带早停机制+训练指标监控,无指标绘图)""" if self.train_df is None: messagebox.showwarning("警告", "请先选择训练集文件") return try: self.status_var.set("正在预处理数据...") self.root.update() # 从训练集中切分训练子集和验证子集(时间顺序切分) split_ratio = 1 - self.split_ratio_var.get() split_idx = int(len(self.train_df) * split_ratio) train_subset = self.train_df.iloc[:split_idx] valid_subset = self.train_df.iloc[split_idx:] # 检查数据量是否足够 if len(train_subset) < 2 or len(valid_subset) < 2: messagebox.showerror("数据错误", "训练集数据量不足(至少需要2个时间步)") return # 数据预处理(训练子集拟合scaler,验证子集用相同scaler) train_subset_scaled = self.scaler.fit_transform(train_subset[['水位']]) valid_subset_scaled = self.scaler.transform(valid_subset[['水位']]) # 准备训练数据(原始值用于指标计算) t_train = train_subset['days'].values[1:].reshape(-1, 1).astype(np.float32) h_train = train_subset_scaled[:-1].astype(np.float32) h_next_train_scaled = train_subset_scaled[1:].astype(np.float32) # 归一化后的标签 h_next_train_true = train_subset['水位'].values[1:].reshape(-1, 1) # 原始真实值(反归一化前) # 准备验证数据(原始值用于指标计算) t_valid = valid_subset['days'].values[1:].reshape(-1, 1).astype(np.float32) h_valid = valid_subset_scaled[:-1].astype(np.float32) h_next_valid_scaled = valid_subset_scaled[1:].astype(np.float32) # 归一化后的标签 h_next_valid_true = valid_subset['水位'].values[1:].reshape(-1, 1) # 原始真实值 # 创建模型和优化器 self.model = PINNModel( num_layers=self.num_layers_var.get(), hidden_units=self.hidden_units_var.get() ) optimizer = Adam(learning_rate=0.001) # 构建训练/验证数据集 train_dataset = tf.data.Dataset.from_tensor_slices(((t_train, h_train), h_next_train_scaled)) train_dataset = train_dataset.shuffle(buffer_size=1024).batch(32) valid_dataset = tf.data.Dataset.from_tensor_slices(((t_valid, h_valid), h_next_valid_scaled)) valid_dataset = valid_dataset.batch(32) # 验证集无需shuffle # 损失记录(新增指标记录) train_data_loss_history = [] physics_loss_history = [] valid_data_loss_history = [] # 新增:训练集和验证集的指标历史(MSE, RMSE等) train_metrics_history = [] # 每个元素是字典(如{'MSE':..., 'RMSE':...}) valid_metrics_history = [] # 早停机制参数 patience = int(self.epochs_var.get() / 3) min_delta = 1e-4 best_valid_loss = float('inf') wait = 0 best_epoch = 0 best_weights = None start_time = time.time() # 自定义训练循环(新增指标计算) for epoch in range(self.epochs_var.get()): # 训练阶段 epoch_train_data_loss = [] epoch_physics_loss = [] # 收集训练预测值(归一化后) train_pred_scaled = [] for step, ((t_batch, h_batch), h_next_batch) in enumerate(train_dataset): with tf.GradientTape() as tape: h_pred = self.model([t_batch, h_batch]) data_loss = tf.reduce_mean(tf.square(h_next_batch - h_pred)) physics_loss = self.model.physics_loss(t_batch, h_batch) loss = data_loss + self.physics_weight_var.get() * physics_loss grads = tape.gradient(loss, self.model.trainable_variables) optimizer.apply_gradients(zip(grads, self.model.trainable_variables)) epoch_train_data_loss.append(data_loss.numpy()) epoch_physics_loss.append(physics_loss.numpy()) train_pred_scaled.append(h_pred.numpy()) # 保存训练预测值(归一化) # 合并训练预测值(归一化后) train_pred_scaled = np.concatenate(train_pred_scaled, axis=0) # 反归一化得到原始预测值 train_pred_true = self.scaler.inverse_transform(train_pred_scaled) # 计算训练集指标(使用原始真实值和预测值) train_metrics = self.calculate_metrics( y_true=h_next_train_true.flatten(), y_pred=train_pred_true.flatten() ) train_metrics_history.append(train_metrics) # 验证阶段 epoch_valid_data_loss = [] valid_pred_scaled = [] for ((t_v_batch, h_v_batch), h_v_next_batch) in valid_dataset: h_v_pred = self.model([t_v_batch, h_v_batch]) valid_data_loss = tf.reduce_mean(tf.square(h_v_next_batch - h_v_pred)) epoch_valid_data_loss.append(valid_data_loss.numpy()) valid_pred_scaled.append(h_v_pred.numpy()) # 保存验证预测值(归一化) # 合并验证预测值(归一化后) valid_pred_scaled = np.concatenate(valid_pred_scaled, axis=0) # 反归一化得到原始预测值 valid_pred_true = self.scaler.inverse_transform(valid_pred_scaled) # 计算验证集指标(使用原始真实值和预测值) valid_metrics = self.calculate_metrics( y_true=h_next_valid_true.flatten(), y_pred=valid_pred_true.flatten() ) valid_metrics_history.append(valid_metrics) # 计算平均损失 avg_train_data_loss = np.mean(epoch_train_data_loss) avg_physics_loss = np.mean(epoch_physics_loss) avg_valid_data_loss = np.mean(epoch_valid_data_loss) # 记录损失 train_data_loss_history.append(avg_train_data_loss) physics_loss_history.append(avg_physics_loss) valid_data_loss_history.append(avg_valid_data_loss) # 早停机制逻辑(与原代码一致) current_valid_loss = avg_valid_data_loss if current_valid_loss < best_valid_loss - min_delta: best_valid_loss = current_valid_loss best_epoch = epoch + 1 wait = 0 best_weights = self.model.get_weights() else: wait += 1 if wait >= patience: self.status_var.set(f"触发早停!最佳轮次: {best_epoch},最佳验证损失: {best_valid_loss:.4f}") if best_weights is not None: self.model.set_weights(best_weights) break # 更新状态(新增指标显示) if epoch % 10 == 0: # 提取当前训练/验证的关键指标(如RMSE) train_rmse = train_metrics['RMSE'] valid_rmse = valid_metrics['RMSE'] train_r2 = train_metrics['R2'] valid_r2 = valid_metrics['R2'] k_value = self.model.k.numpy() elapsed = time.time() - start_time self.status_var.set( f"训练中 | 轮次: {epoch + 1}/{self.epochs_var.get()} | " f"训练RMSE: {train_rmse:.4f} | 验证RMSE: {valid_rmse:.4f} | " f"训练R²: {train_r2:.4f} | 验证R²: {valid_r2:.4f} | " f"k: {k_value:.6f} | 时间: {elapsed:.1f}秒 | 早停等待: {wait}/{patience}" ) self.root.update() # 绘制损失曲线(仅保留原始损失曲线) self.loss_ax.clear() epochs_range = range(1, len(train_data_loss_history) + 1) self.loss_ax.plot(epochs_range, train_data_loss_history, 'b-', label='训练数据损失') self.loss_ax.plot(epochs_range, physics_loss_history, 'r--', label='物理损失') self.loss_ax.plot(epochs_range, valid_data_loss_history, 'g-.', label='验证数据损失') self.loss_ax.set_title('PINNs训练与验证损失') self.loss_ax.set_xlabel('轮次') self.loss_ax.set_ylabel('损失', rotation=0) self.loss_ax.legend() self.loss_ax.grid(True, alpha=0.3) self.loss_ax.set_yscale('log') self.loss_canvas.draw() # 训练完成提示(保留指标总结) elapsed = time.time() - start_time if wait >= patience: completion_msg = ( f"早停触发 | 最佳轮次: {best_epoch} | 最佳验证损失: {best_valid_loss:.4f} | " f"最佳验证RMSE: {valid_metrics_history[best_epoch - 1]['RMSE']:.4f} | " f"总时间: {elapsed:.1f}秒" ) else: completion_msg = ( f"训练完成 | 总轮次: {self.epochs_var.get()} | " f"最终训练RMSE: {train_metrics_history[-1]['RMSE']:.4f} | " f"最终验证RMSE: {valid_metrics_history[-1]['RMSE']:.4f} | " f"最终训练R²: {train_metrics_history[-1]['R2']:.4f} | " f"最终验证R²: {valid_metrics_history[-1]['R2']:.4f} | " f"总时间: {elapsed:.1f}秒" ) self.status_var.set(completion_msg) messagebox.showinfo("训练完成", f"PINNs模型训练成功完成!\n{completion_msg}") except Exception as e: messagebox.showerror("训练错误", f"模型训练失败:\n{str(e)}") self.status_var.set("训练失败") # 在训练循环结束后,保存训练历史 self.train_history = { 'train_data_loss': train_data_loss_history, 'physics_loss': physics_loss_history, 'valid_data_loss': valid_data_loss_history, 'train_metrics': train_metrics_history, 'valid_metrics': valid_metrics_history } def predict(self): """使用PINNs模型进行预测(优化时间轴刻度与网格线)""" if self.model is None: messagebox.showwarning("警告", "请先训练模型") return if self.test_df is None: messagebox.showwarning("警告", "请先选择测试集文件") return try: self.status_var.set("正在生成预测...") self.root.update() # 预处理测试数据 test_scaled = self.scaler.transform(self.test_df[['水位']]) # 准备时间特征 t_test = self.test_df['days'].values.reshape(-1, 1).astype(np.float32) # 递归预测 predictions = [] for i in range(len(t_test)): h_current = np.array([[test_scaled[i][0]]]).astype(np.float32) h_pred = self.model([t_test[i:i + 1], h_current]) predictions.append(h_pred.numpy()[0][0]) # 反归一化 predictions = np.array(predictions).reshape(-1, 1) predictions = self.scaler.inverse_transform(predictions) actual_values = self.scaler.inverse_transform(test_scaled) # 创建时间索引(确保为DatetimeIndex) test_time = self.test_df.index # 假设为pandas DatetimeIndex类型 # 清除现有图表 self.ax.clear() # 绘制结果 self.ax.plot(test_time, actual_values, 'b-', label='真实值') self.ax.plot(test_time, predictions, 'r--', label='预测值') self.ax.set_title('大坝渗流水位预测结果(PINNs)') self.ax.set_xlabel('时间') self.ax.set_ylabel('测压管水位', rotation=0) self.ax.legend() # 添加网格和样式(优化时间轴) import matplotlib.dates as mdates # 导入日期刻度工具 # 设置x轴刻度:主刻度(年份)和次要刻度(每2个月) # 主刻度:每年1月1日(或数据起始年的第一个时间点) self.ax.xaxis.set_major_locator(mdates.YearLocator()) self.ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y')) # 仅显示年份 # 次要刻度:每2个月(如2月、4月、6月...) self.ax.xaxis.set_minor_locator(mdates.MonthLocator(interval=2)) # 添加次要网格线(每2个月的竖直虚线) self.ax.grid(which='minor', axis='x', linestyle='--', color='gray', alpha=0.3) # 主要网格线(可选,保持原有水平网格) self.ax.grid(which='major', axis='y', linestyle='-', color='lightgray', alpha=0.5) # 优化刻度标签显示(避免重叠) self.ax.tick_params(axis='x', which='major', rotation=0, labelsize=10) self.ax.tick_params(axis='x', which='minor', length=3) # 次要刻度线长度 # 计算并显示评估指标(保持原有逻辑) self.evaluation_metrics = self.calculate_metrics( actual_values.flatten(), predictions.flatten() ) metrics_text = ( f"MSE: {self.evaluation_metrics['MSE']:.4f} | " f"RMSE: {self.evaluation_metrics['RMSE']:.4f} | " f"MAE: {self.evaluation_metrics['MAE']:.4f} | " f"MAPE: {self.evaluation_metrics['MAPE']:.2f}% | " f"R²: {self.evaluation_metrics['R2']:.4f}" ) # 更新文本标签 self.metrics_var.set(metrics_text) # 在图表上添加指标(位置调整,避免覆盖时间刻度) self.ax.text( 0.5, 1.08, metrics_text, # 略微上移避免与网格重叠 transform=self.ax.transAxes, ha='center', fontsize=10, bbox=dict(facecolor='white', alpha=0.8) ) # 调整布局(重点优化时间轴边距) plt.tight_layout(pad=2.0) # 增加底部边距避免刻度标签被截断 self.canvas.draw() # 保存预测结果(保持原有逻辑) self.predictions = predictions self.actual_values = actual_values self.test_time = test_time self.status_var.set("预测完成,结果已显示") except Exception as e: messagebox.showerror("预测错误", f"预测失败:\n{str(e)}") self.status_var.set("预测失败") def save_results(self): """保存预测结果和训练历史数据""" if not hasattr(self, 'predictions') or not hasattr(self, 'train_history'): messagebox.showwarning("警告", "请先生成预测结果并完成训练") return # 选择保存路径 save_path = filedialog.asksaveasfilename( defaultextension=".xlsx", filetypes=[("Excel文件", "*.xlsx"), ("所有文件", "*.*")], title="保存结果" ) if not save_path: return try: # 1. 创建预测结果DataFrame result_df = pd.DataFrame({ '时间': self.test_time, '实际水位': self.actual_values.flatten(), '预测水位': self.predictions.flatten() }) # 2. 创建评估指标DataFrame metrics_df = pd.DataFrame([self.evaluation_metrics]) # 3. 创建训练历史DataFrame history_data = { '轮次': list(range(1, len(self.train_history['train_data_loss']) + 1)), '训练数据损失': self.train_history['train_data_loss'], '物理损失': self.train_history['physics_loss'], '验证数据损失': self.train_history['valid_data_loss'] } # 添加训练集指标 for metric in ['MSE', 'RMSE', 'MAE', 'MAPE', 'R2']: history_data[f'训练集_{metric}'] = [item[metric] for item in self.train_history['train_metrics']] # 添加验证集指标 for metric in ['MSE', 'RMSE', 'MAE', 'MAPE', 'R2']: history_data[f'验证集_{metric}'] = [item[metric] for item in self.train_history['valid_metrics']] history_df = pd.DataFrame(history_data) # 保存到Excel with pd.ExcelWriter(save_path) as writer: result_df.to_excel(writer, sheet_name='预测结果', index=False) metrics_df.to_excel(writer, sheet_name='评估指标', index=False) history_df.to_excel(writer, sheet_name='训练历史', index=False) # 保存图表 chart_path = os.path.splitext(save_path)[0] + "_chart.png" self.fig.savefig(chart_path, dpi=300) # 保存损失曲线图 loss_path = os.path.splitext(save_path)[0] + "_loss.png" self.loss_fig.savefig(loss_path, dpi=300) self.status_var.set(f"结果已保存至: {os.path.basename(save_path)}") messagebox.showinfo("保存成功", f"预测结果和图表已保存至:\n" f"主文件: {save_path}\n" f"预测图表: {chart_path}\n" f"损失曲线: {loss_path}") except Exception as e: messagebox.showerror("保存错误", f"保存结果失败:\n{str(e)}") def reset(self): """重置程序状态""" self.train_df = None self.test_df = None self.model = None self.train_file_var.set("") self.test_file_var.set("") # 清除训练历史 if hasattr(self, 'train_history'): del self.train_history # 清除图表 if hasattr(self, 'ax'): self.ax.clear() if hasattr(self, 'loss_ax'): self.loss_ax.clear() # 重绘画布 if hasattr(self, 'canvas'): self.canvas.draw() if hasattr(self, 'loss_canvas'): self.loss_canvas.draw() # 清除状态 self.status_var.set("已重置,请选择新数据") # 清除预测结果 if hasattr(self, 'predictions'): del self.predictions # 清除指标文本 if hasattr(self, 'metrics_var'): self.metrics_var.set("") messagebox.showinfo("重置", "程序已重置,可以开始新的分析") if __name__ == "__main__": root = tk.Tk() app = DamSeepageModel(root) root.mainloop() 检查一下有什么错误,只需要给出局部代码即可

import pywt import torch import numpy as np from torch.utils.data import Dataset def wavelet_decomposition(signal, wavelet, level): # 动态计算最大可行分解层级 max_level = pywt.dwt_max_level(len(signal), wavelet) valid_level = min(level, max_level) if valid_level <= 0: return [signal] # 返回原始信号作为替代方案 coeffs = pywt.wavedec(signal, wavelet, level=valid_level) return coeffs class CustomDataset(Dataset): def __init__(self, ..., wavelet_level=3): # 添加长度验证逻辑 self.min_length = 2**wavelet_level # 计算最小需求长度 self.valid_indices = [i for i, data in enumerate(raw_data) if len(data) >= self.min_length] def __len__(self): return len(self.valid_indices) def __getitem__(self, idx): real_idx = self.valid_indices[idx] # ... 原始数据处理逻辑 class VGGModel(torch.nn.Module): def forward(self, x): # ... 网络结构 return output.squeeze() # 确保输出维度匹配 [batch_size] def train(model, dataloader, criterion, optimizer): for inputs, targets in dataloader: outputs = model(inputs) # 统一维度 loss = criterion(outputs.view(-1,1), targets.view(-1,1)) # 统一为[N,1]形状 def predict(model, dataset): try: test_x, test_y = dataset[-1] with torch.no_grad(): pred = model(test_x.unsqueeze(0)) return test_y.numpy(), pred.numpy() except ValueError as e: print(f"预测时发生错误:{str(e)}") return None, None def main(): # 初始化配置 wavelet_level = 3 # 可调整但建议不超过3 dataset = CustomDataset(..., wavelet_level=wavelet_level) # 验证数据集是否为空 if len(dataset) == 0: raise ValueError("数据集无有效样本,请检查输入数据长度和小波层级设置") # 模型训练和预测 model = VGGModel() # ... 训练流程 true, pred = predict(model, dataset) if __name__ == "__main__": main() 请联系上下文,这段代码出示完整

import tkinter as tk from tkinter import ttk, filedialog, messagebox import pandas as pd import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import tensorflow as tf from tensorflow.keras.models import Model from tensorflow.keras.layers import Input, Dense, Lambda from tensorflow.keras.optimizers import Adam from sklearn.preprocessing import MinMaxScaler import os import time mpl.rcParams[‘font.sans-serif’] = [‘SimHei’, ‘Microsoft YaHei’, ‘Arial Unicode MS’] mpl.rcParams[‘axes.unicode_minus’] = False # 关键修复:使用 ASCII 减号 设置中文字体支持 plt.rcParams[‘font.sans-serif’] = [‘SimHei’] plt.rcParams[‘axes.unicode_minus’] = False class PINNModel(tf.keras.Model): def init(self, num_layers=4, hidden_units=32, **kwargs): super(PINNModel, self).init(**kwargs) self.dense_layers = [Dense(hidden_units, activation=‘tanh’) for _ in range(num_layers)] self.final_layer = Dense(1, activation=‘linear’) # 添加带约束的物理参数 self.k_raw = tf.Variable(0.01, trainable=True, dtype=tf.float32, name=‘k_raw’) self.k = tf.math.sigmoid(self.k_raw) * 0.5 # 约束在0-0.5之间 def call(self, inputs): t, h = inputs x = tf.concat([t, h], axis=1) for layer in self.dense_layers: x = layer(x) return self.final_layer(x) def physics_loss(self, t, h_current): """计算物理损失(基于离散渗流方程)""" # 预测下一时刻的水位 h_next_pred = self([t, h_current]) # 离散渗流方程: h_{t+1} = h_t - k * h_t (时间步长=1) residual = h_next_pred - h_current * (1 - self.k) return tf.reduce_mean(tf.square(residual)) class DamSeepageModel: def init(self, root): self.root = root self.root.title(“大坝渗流预测模型(PINNs)”) self.root.geometry(“1200x800”) # 初始化数据 self.train_df = None #训练集 self.test_df = None #测试集 self.model = None self.scaler = MinMaxScaler(feature_range=(0, 1)) self.evaluation_metrics = {} # 创建主界面 self.create_widgets() def create_widgets(self): # 创建主框架 main_frame = ttk.Frame(self.root, padding=10) main_frame.pack(fill=tk.BOTH, expand=True) # 左侧控制面板 control_frame = ttk.LabelFrame(main_frame, text="模型控制", padding=10) control_frame.pack(side=tk.LEFT, fill=tk.Y, padx=5, pady=5) # 文件选择部分 file_frame = ttk.LabelFrame(control_frame, text="数据文件", padding=10) file_frame.pack(fill=tk.X, pady=5) # 训练集选择 ttk.Label(file_frame, text="训练集:").grid(row=0, column=0, sticky=tk.W, pady=5) self.train_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.train_file_var, width=30, state='readonly').grid(row=0, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("train")).grid(row=0, column=2) # 测试集选择 ttk.Label(file_frame, text="测试集:").grid(row=1, column=0, sticky=tk.W, pady=5) self.test_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.test_file_var, width=30, state='readonly').grid(row=1, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("test")).grid(row=1, column=2) # PINNs参数设置 param_frame = ttk.LabelFrame(control_frame, text="PINNs参数", padding=10) param_frame.pack(fill=tk.X, pady=10) # 验证集切分比例 ttk.Label(param_frame, text="验证集比例:").grid(row=0, column=0, sticky=tk.W, pady=5) self.split_ratio_var = tk.DoubleVar(value=0.2) ttk.Spinbox(param_frame, from_=0, to=1, increment=0.05, textvariable=self.split_ratio_var, width=10).grid(row=0, column=1, padx=5) # 隐藏层数量 ttk.Label(param_frame, text="网络层数:").grid(row=1, column=0, sticky=tk.W, pady=5) self.num_layers_var = tk.IntVar(value=4) ttk.Spinbox(param_frame, from_=2, to=8, increment=1, textvariable=self.num_layers_var, width=10).grid(row=1, column=1, padx=5) # 每层神经元数量 ttk.Label(param_frame, text="神经元数/层:").grid(row=2, column=0, sticky=tk.W, pady=5) self.hidden_units_var = tk.IntVar(value=32) ttk.Spinbox(param_frame, from_=16, to=128, increment=4, textvariable=self.hidden_units_var, width=10).grid(row=2, column=1, padx=5) # 训练轮次 ttk.Label(param_frame, text="训练轮次:").grid(row=3, column=0, sticky=tk.W, pady=5) self.epochs_var = tk.IntVar(value=500) ttk.Spinbox(param_frame, from_=100, to=2000, increment=100, textvariable=self.epochs_var, width=10).grid(row=3, column=1, padx=5) # 物理损失权重 ttk.Label(param_frame, text="物理损失权重:").grid(row=4, column=0, sticky=tk.W, pady=5) self.physics_weight_var = tk.DoubleVar(value=0.5) ttk.Spinbox(param_frame, from_=0.1, to=1.0, increment=0.1, textvariable=self.physics_weight_var, width=10).grid(row=4, column=1, padx=5) # 控制按钮 btn_frame = ttk.Frame(control_frame) btn_frame.pack(fill=tk.X, pady=10) ttk.Button(btn_frame, text="训练模型", command=self.train_model).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="预测结果", command=self.predict).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="保存结果", command=self.save_results).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="重置", command=self.reset).pack(side=tk.RIGHT, padx=5) # 状态栏 self.status_var = tk.StringVar(value="就绪") status_bar = ttk.Label(control_frame, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W) status_bar.pack(fill=tk.X, side=tk.BOTTOM) # 右侧结果显示区域 result_frame = ttk.Frame(main_frame) result_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=5, pady=5) # 创建标签页 self.notebook = ttk.Notebook(result_frame) self.notebook.pack(fill=tk.BOTH, expand=True) # 损失曲线标签页 self.loss_frame = ttk.Frame(self.notebook) self.notebook.add(self.loss_frame, text="训练损失") # 预测结果标签页 self.prediction_frame = ttk.Frame(self.notebook) self.notebook.add(self.prediction_frame, text="预测结果") # 指标显示 self.metrics_var = tk.StringVar() metrics_label = ttk.Label( self.prediction_frame, textvariable=self.metrics_var, font=('TkDefaultFont', 10, 'bold'), relief='ridge', padding=5 ) metrics_label.pack(fill=tk.X, padx=5, pady=5) # 初始化绘图区域 self.fig, self.ax = plt.subplots(figsize=(10, 6)) self.canvas = FigureCanvasTkAgg(self.fig, master=self.prediction_frame) self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) # 损失曲线画布 self.loss_fig, self.loss_ax = plt.subplots(figsize=(10, 4)) self.loss_canvas = FigureCanvasTkAgg(self.loss_fig, master=self.loss_frame) self.loss_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) def select_file(self, file_type): """选择Excel文件""" file_path = filedialog.askopenfilename( title=f"选择{file_type}集Excel文件", filetypes=[("Excel文件", "*.xlsx *.xls"), ("所有文件", "*.*")] ) if file_path: try: df = pd.read_excel(file_path) # 时间特征处理 time_features = ['year', 'month', 'day'] missing_time_features = [feat for feat in time_features if feat not in df.columns] if missing_time_features: messagebox.showerror("列名错误", f"Excel文件缺少预处理后的时间特征列: {', '.join(missing_time_features)}") return # 创建时间戳列 (增强兼容性) time_cols = ['year', 'month', 'day'] if 'hour' in df.columns: time_cols.append('hour') if 'minute' in df.columns: time_cols.append('minute') if 'second' in df.columns: time_cols.append('second') # 填充缺失的时间单位 for col in ['hour', 'minute', 'second']: if col not in df.columns: df[col] = 0 df['datetime'] = pd.to_datetime(df[time_cols]) # 设置时间索引 df = df.set_index('datetime') # 计算相对时间(天) df['days'] = (df.index - df.index[0]).days # 保存数据 if file_type == "train": self.train_df = df self.train_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载训练集: {len(self.train_df)}条数据") else: self.test_df = df self.test_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载测试集: {len(self.test_df)}条数据") except Exception as e: messagebox.showerror("文件错误", f"读取文件失败: {str(e)}") def calculate_metrics(self, y_true, y_pred): """计算评估指标""" from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score mse = mean_squared_error(y_true, y_pred) rmse = np.sqrt(mse) mae = mean_absolute_error(y_true, y_pred) non_zero_idx = np.where(y_true != 0)[0] if len(non_zero_idx) > 0: mape = np.mean(np.abs((y_true[non_zero_idx] - y_pred[non_zero_idx]) / y_true[non_zero_idx])) * 100 else: mape = float('nan') r2 = r2_score(y_true, y_pred) return { 'MSE': mse, 'RMSE': rmse, 'MAE': mae, 'MAPE': mape, 'R2': r2 } def train_model(self): """训练PINNs模型(带早停机制+训练指标监控,无指标绘图)""" if self.train_df is None: messagebox.showwarning("警告", "请先选择训练集文件") return try: self.status_var.set("正在预处理数据...") self.root.update() # 从训练集中切分训练子集和验证子集(时间顺序切分) split_ratio = 1 - self.split_ratio_var.get() split_idx = int(len(self.train_df) * split_ratio) train_subset = self.train_df.iloc[:split_idx] valid_subset = self.train_df.iloc[split_idx:] # 检查数据量是否足够 if len(train_subset) < 2 or len(valid_subset) < 2: messagebox.showerror("数据错误", "训练集数据量不足(至少需要2个时间步)") return # 数据预处理(训练子集拟合scaler,验证子集用相同scaler) train_subset_scaled = self.scaler.fit_transform(train_subset[['水位']]) valid_subset_scaled = self.scaler.transform(valid_subset[['水位']]) # 准备训练数据(原始值用于指标计算) t_train = train_subset['days'].values[1:].reshape(-1, 1).astype(np.float32) h_train = train_subset_scaled[:-1].astype(np.float32) h_next_train_scaled = train_subset_scaled[1:].astype(np.float32) # 归一化后的标签 h_next_train_true = train_subset['水位'].values[1:].reshape(-1, 1) # 原始真实值(反归一化前) # 准备验证数据(原始值用于指标计算) t_valid = valid_subset['days'].values[1:].reshape(-1, 1).astype(np.float32) h_valid = valid_subset_scaled[:-1].astype(np.float32) h_next_valid_scaled = valid_subset_scaled[1:].astype(np.float32) # 归一化后的标签 h_next_valid_true = valid_subset['水位'].values[1:].reshape(-1, 1) # 原始真实值 # 创建模型和优化器 self.model = PINNModel( num_layers=self.num_layers_var.get(), hidden_units=self.hidden_units_var.get() ) optimizer = Adam(learning_rate=0.001) # 构建训练/验证数据集 train_dataset = tf.data.Dataset.from_tensor_slices(((t_train, h_train), h_next_train_scaled)) train_dataset = train_dataset.shuffle(buffer_size=1024).batch(32) valid_dataset = tf.data.Dataset.from_tensor_slices(((t_valid, h_valid), h_next_valid_scaled)) valid_dataset = valid_dataset.batch(32) # 验证集无需shuffle # 损失记录(新增指标记录) train_data_loss_history = [] physics_loss_history = [] valid_data_loss_history = [] # 新增:训练集和验证集的指标历史(MSE, RMSE等) train_metrics_history = [] # 每个元素是字典(如{'MSE':..., 'RMSE':...}) valid_metrics_history = [] # 早停机制参数 patience = int(self.epochs_var.get() / 3) min_delta = 1e-4 best_valid_loss = float('inf') wait = 0 best_epoch = 0 best_weights = None start_time = time.time() # 自定义训练循环(新增指标计算) for epoch in range(self.epochs_var.get()): # 训练阶段 epoch_train_data_loss = [] epoch_physics_loss = [] # 收集训练预测值(归一化后) train_pred_scaled = [] for step, ((t_batch, h_batch), h_next_batch) in enumerate(train_dataset): with tf.GradientTape() as tape: h_pred = self.model([t_batch, h_batch]) data_loss = tf.reduce_mean(tf.square(h_next_batch - h_pred)) physics_loss = self.model.physics_loss(t_batch, h_batch) loss = data_loss + self.physics_weight_var.get() * physics_loss grads = tape.gradient(loss, self.model.trainable_variables) optimizer.apply_gradients(zip(grads, self.model.trainable_variables)) epoch_train_data_loss.append(data_loss.numpy()) epoch_physics_loss.append(physics_loss.numpy()) train_pred_scaled.append(h_pred.numpy()) # 保存训练预测值(归一化) # 合并训练预测值(归一化后) train_pred_scaled = np.concatenate(train_pred_scaled, axis=0) # 反归一化得到原始预测值 train_pred_true = self.scaler.inverse_transform(train_pred_scaled) # 计算训练集指标(使用原始真实值和预测值) train_metrics = self.calculate_metrics( y_true=h_next_train_true.flatten(), y_pred=train_pred_true.flatten() ) train_metrics_history.append(train_metrics) # 验证阶段 epoch_valid_data_loss = [] valid_pred_scaled = [] for ((t_v_batch, h_v_batch), h_v_next_batch) in valid_dataset: h_v_pred = self.model([t_v_batch, h_v_batch]) valid_data_loss = tf.reduce_mean(tf.square(h_v_next_batch - h_v_pred)) epoch_valid_data_loss.append(valid_data_loss.numpy()) valid_pred_scaled.append(h_v_pred.numpy()) # 保存验证预测值(归一化) # 合并验证预测值(归一化后) valid_pred_scaled = np.concatenate(valid_pred_scaled, axis=0) # 反归一化得到原始预测值 valid_pred_true = self.scaler.inverse_transform(valid_pred_scaled) # 计算验证集指标(使用原始真实值和预测值) valid_metrics = self.calculate_metrics( y_true=h_next_valid_true.flatten(), y_pred=valid_pred_true.flatten() ) valid_metrics_history.append(valid_metrics) # 计算平均损失 avg_train_data_loss = np.mean(epoch_train_data_loss) avg_physics_loss = np.mean(epoch_physics_loss) avg_valid_data_loss = np.mean(epoch_valid_data_loss) # 记录损失 train_data_loss_history.append(avg_train_data_loss) physics_loss_history.append(avg_physics_loss) valid_data_loss_history.append(avg_valid_data_loss) # 早停机制逻辑(与原代码一致) current_valid_loss = avg_valid_data_loss if current_valid_loss < best_valid_loss - min_delta: best_valid_loss = current_valid_loss best_epoch = epoch + 1 wait = 0 best_weights = self.model.get_weights() else: wait += 1 if wait >= patience: self.status_var.set(f"触发早停!最佳轮次: {best_epoch},最佳验证损失: {best_valid_loss:.4f}") if best_weights is not None: self.model.set_weights(best_weights) break # 更新状态(新增指标显示) if epoch % 10 == 0: # 提取当前训练/验证的关键指标(如RMSE) train_rmse = train_metrics['RMSE'] valid_rmse = valid_metrics['RMSE'] train_r2 = train_metrics['R2'] valid_r2 = valid_metrics['R2'] k_value = self.model.k.numpy() elapsed = time.time() - start_time self.status_var.set( f"训练中 | 轮次: {epoch + 1}/{self.epochs_var.get()} | " f"训练RMSE: {train_rmse:.4f} | 验证RMSE: {valid_rmse:.4f} | " f"训练R²: {train_r2:.4f} | 验证R²: {valid_r2:.4f} | " f"k: {k_value:.6f} | 时间: {elapsed:.1f}秒 | 早停等待: {wait}/{patience}" ) self.root.update() # 绘制损失曲线(仅保留原始损失曲线) self.loss_ax.clear() epochs_range = range(1, len(train_data_loss_history) + 1) self.loss_ax.plot(epochs_range, train_data_loss_history, 'b-', label='训练数据损失') self.loss_ax.plot(epochs_range, physics_loss_history, 'r--', label='物理损失') self.loss_ax.plot(epochs_range, valid_data_loss_history, 'g-.', label='验证数据损失') self.loss_ax.set_title('PINNs训练与验证损失') self.loss_ax.set_xlabel('轮次') self.loss_ax.set_ylabel('损失', rotation=0) self.loss_ax.legend() self.loss_ax.grid(True, alpha=0.3) self.loss_ax.set_yscale('log') self.loss_canvas.draw() # 训练完成提示(保留指标总结) elapsed = time.time() - start_time if wait >= patience: completion_msg = ( f"早停触发 | 最佳轮次: {best_epoch} | 最佳验证损失: {best_valid_loss:.4f} | " f"最佳验证RMSE: {valid_metrics_history[best_epoch - 1]['RMSE']:.4f} | " f"总时间: {elapsed:.1f}秒" ) else: completion_msg = ( f"训练完成 | 总轮次: {self.epochs_var.get()} | " f"最终训练RMSE: {train_metrics_history[-1]['RMSE']:.4f} | " f"最终验证RMSE: {valid_metrics_history[-1]['RMSE']:.4f} | " f"最终训练R²: {train_metrics_history[-1]['R2']:.4f} | " f"最终验证R²: {valid_metrics_history[-1]['R2']:.4f} | " f"总时间: {elapsed:.1f}秒" ) # 在训练循环结束后,保存训练历史 self.train_history = { 'train_data_loss': train_data_loss_history, 'physics_loss': physics_loss_history, 'valid_data_loss': valid_data_loss_history, 'train_metrics': train_metrics_history, 'valid_metrics': valid_metrics_history } self.status_var.set(completion_msg) messagebox.showinfo("训练完成", f"PINNs模型训练成功完成!\n{completion_msg}") except Exception as e: messagebox.showerror("训练错误", f"模型训练失败:\n{str(e)}") self.status_var.set("训练失败") def predict(self): """使用PINNs模型进行预测(优化时间轴刻度与网格线)""" if self.model is None: messagebox.showwarning("警告", "请先训练模型") return if self.test_df is None: messagebox.showwarning("警告", "请先选择测试集文件") return try: self.status_var.set("正在生成预测...") self.root.update() # 预处理测试数据 test_scaled = self.scaler.transform(self.test_df[['水位']]) # 准备时间特征 t_test = self.test_df['days'].values.reshape(-1, 1).astype(np.float32) # 递归预测 predictions = [] for i in range(len(t_test)): h_current = np.array([[test_scaled[i][0]]]).astype(np.float32) h_pred = self.model([t_test[i:i + 1], h_current]) predictions.append(h_pred.numpy()[0][0]) # 反归一化 predictions = np.array(predictions).reshape(-1, 1) predictions = self.scaler.inverse_transform(predictions) actual_values = self.scaler.inverse_transform(test_scaled) # 创建时间索引(确保为DatetimeIndex) test_time = self.test_df.index # 假设为pandas DatetimeIndex类型 # 清除现有图表 self.ax.clear() # 绘制结果 self.ax.plot(test_time, actual_values, 'b-', label='真实值') self.ax.plot(test_time, predictions, 'r--', label='预测值') self.ax.set_title('大坝渗流水位预测结果(PINNs)') self.ax.set_xlabel('时间') self.ax.set_ylabel('测压管水位', rotation=0) self.ax.legend() # 添加网格和样式(优化时间轴) import matplotlib.dates as mdates # 导入日期刻度工具 # 设置x轴刻度:主刻度(年份)和次要刻度(每2个月) # 主刻度:每年1月1日(或数据起始年的第一个时间点) self.ax.xaxis.set_major_locator(mdates.YearLocator()) self.ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y')) # 仅显示年份 # 次要刻度:每2个月(如2月、4月、6月...) self.ax.xaxis.set_minor_locator(mdates.MonthLocator(interval=2)) # 添加次要网格线(每2个月的竖直虚线) self.ax.grid(which='minor', axis='x', linestyle='--', color='gray', alpha=0.3) # 主要网格线(可选,保持原有水平网格) self.ax.grid(which='major', axis='y', linestyle='-', color='lightgray', alpha=0.5) # 优化刻度标签显示(避免重叠) self.ax.tick_params(axis='x', which='major', rotation=0, labelsize=10) self.ax.tick_params(axis='x', which='minor', length=3) # 次要刻度线长度 # 计算并显示评估指标(保持原有逻辑) self.evaluation_metrics = self.calculate_metrics( actual_values.flatten(), predictions.flatten() ) metrics_text = ( f"MSE: {self.evaluation_metrics['MSE']:.4f} | " f"RMSE: {self.evaluation_metrics['RMSE']:.4f} | " f"MAE: {self.evaluation_metrics['MAE']:.4f} | " f"MAPE: {self.evaluation_metrics['MAPE']:.2f}% | " f"R²: {self.evaluation_metrics['R2']:.4f}" ) # 更新文本标签 self.metrics_var.set(metrics_text) # 在图表上添加指标(位置调整,避免覆盖时间刻度) self.ax.text( 0.5, 1.08, metrics_text, # 略微上移避免与网格重叠 transform=self.ax.transAxes, ha='center', fontsize=10, bbox=dict(facecolor='white', alpha=0.8) ) # 调整布局(重点优化时间轴边距) plt.tight_layout(pad=2.0) # 增加底部边距避免刻度标签被截断 self.canvas.draw() # 保存预测结果(保持原有逻辑) self.predictions = predictions self.actual_values = actual_values self.test_time = test_time self.status_var.set("预测完成,结果已显示") except Exception as e: messagebox.showerror("预测错误", f"预测失败:\n{str(e)}") self.status_var.set("预测失败") def save_results(self): """保存预测结果和训练历史数据""" if not hasattr(self, 'predictions') or not hasattr(self, 'train_history'): messagebox.showwarning("警告", "请先生成预测结果并完成训练") return # 选择保存路径 save_path = filedialog.asksaveasfilename( defaultextension=".xlsx", filetypes=[("Excel文件", "*.xlsx"), ("所有文件", "*.*")], title="保存结果" ) if not save_path: return try: # 1. 创建预测结果DataFrame result_df = pd.DataFrame({ '时间': self.test_time, '实际水位': self.actual_values.flatten(), '预测水位': self.predictions.flatten() }) # 2. 创建评估指标DataFrame metrics_df = pd.DataFrame([self.evaluation_metrics]) # 3. 创建训练历史DataFrame history_data = { '轮次': list(range(1, len(self.train_history['train_data_loss']) + 1)), '训练数据损失': self.train_history['train_data_loss'], '物理损失': self.train_history['physics_loss'], '验证数据损失': self.train_history['valid_data_loss'] } # 添加训练集指标 for metric in ['MSE', 'RMSE', 'MAE', 'MAPE', 'R2']: history_data[f'训练集_{metric}'] = [item[metric] for item in self.train_history['train_metrics']] # 添加验证集指标 for metric in ['MSE', 'RMSE', 'MAE', 'MAPE', 'R2']: history_data[f'验证集_{metric}'] = [item[metric] for item in self.train_history['valid_metrics']] history_df = pd.DataFrame(history_data) # 保存到Excel with pd.ExcelWriter(save_path) as writer: result_df.to_excel(writer, sheet_name='预测结果', index=False) metrics_df.to_excel(writer, sheet_name='评估指标', index=False) history_df.to_excel(writer, sheet_name='训练历史', index=False) # 保存图表 chart_path = os.path.splitext(save_path)[0] + "_chart.png" self.fig.savefig(chart_path, dpi=300) # 保存损失曲线图 loss_path = os.path.splitext(save_path)[0] + "_loss.png" self.loss_fig.savefig(loss_path, dpi=300) self.status_var.set(f"结果已保存至: {os.path.basename(save_path)}") messagebox.showinfo("保存成功", f"预测结果和图表已保存至:\n" f"主文件: {save_path}\n" f"预测图表: {chart_path}\n" f"损失曲线: {loss_path}") except Exception as e: messagebox.showerror("保存错误", f"保存结果失败:\n{str(e)}") def reset(self): """重置程序状态""" self.train_df = None self.test_df = None self.model = None self.train_file_var.set("") self.test_file_var.set("") # 清除训练历史 if hasattr(self, 'train_history'): del self.train_history # 清除图表 if hasattr(self, 'ax'): self.ax.clear() if hasattr(self, 'loss_ax'): self.loss_ax.clear() # 重绘画布 if hasattr(self, 'canvas'): self.canvas.draw() if hasattr(self, 'loss_canvas'): self.loss_canvas.draw() # 清除状态 self.status_var.set("已重置,请选择新数据") # 清除预测结果 if hasattr(self, 'predictions'): del self.predictions # 清除指标文本 if hasattr(self, 'metrics_var'): self.metrics_var.set("") messagebox.showinfo("重置", "程序已重置,可以开始新的分析") if name == “main”: root = tk.Tk() app = DamSeepageModel(root) root.mainloop() 在这个模型中添加一个查看历史训练记录的功能,选择了某次记录的时候就自动把当时的训练集和测试集文件放入模型中并且显示好预测结果和损失曲线

TypeError: in user code: TypeError: outer_factory.<locals>.inner_factory.<locals>.tf__combine_features() takes 1 positional argument but 2 were given,下面是这个出现这个问题的原代码,你帮我修改一下import os import re import glob import tensorflow as tf import numpy as np from tqdm import tqdm import matplotlib.pyplot as plt import matplotlib as mpl from sklearn.model_selection import train_test_split import imageio import sys from skimage.transform import resize import traceback from tensorflow.keras import layers, models, Input, Model from tensorflow.keras.optimizers import Adam from pathlib import Path from tensorflow.keras.losses import MeanSquaredError from tensorflow.keras.metrics import MeanAbsoluteError from skimage import measure, morphology, filters # =============== 配置参数===================================== BASE_DIR = "F:/2025.7.2wavelengthtiff" # 根目录路径 START_WAVELENGTH = 788.55500 # 起始波长 END_WAVELENGTH = 788.55600 # 结束波长 STEP = 0.00005 # 波长步长 BATCH_SIZE = 8 # 批处理大小 IMAGE_SIZE = (256, 256) # 图像尺寸 TARGET_CHANNELS = 1 # 目标通道数 - 使用灰度图像 TEST_SIZE = 0.2 # 测试集比例 RANDOM_SEED = 42 # 随机种子 MODEL_SAVE_PATH = Path.home() / "Documents" / "wavelength_model.keras" # 修改为.keras格式 # ================================================================ # 设置中文字体支持 try: mpl.rcParams['font.sans-serif'] = ['SimHei'] # 使用黑体 mpl.rcParams['axes.unicode_minus'] = False # 解决负号显示问题 print("已设置中文字体支持") except: print("警告:无法设置中文字体,图表可能无法正确显示中文") def generate_folder_names(start, end, step): """生成波长文件夹名称列表""" num_folders = int((end - start) / step) + 1 folder_names = [] for i in range(num_folders): wavelength = start + i * step folder_name = f"{wavelength:.5f}" folder_names.append(folder_name) return folder_names def find_best_match_file(folder_path, target_wavelength): """在文件夹中找到波长最接近目标值的TIFF文件""" tiff_files = glob.glob(os.path.join(folder_path, "*.tiff")) + glob.glob(os.path.join(folder_path, "*.tif")) if not tiff_files: return None best_match = None min_diff = float('inf') for file_path in tiff_files: filename = os.path.basename(file_path) match = re.search(r'\s*([\d.]+)_', filename) if not match: continue try: file_wavelength = float(match.group(1)) diff = abs(file_wavelength - target_wavelength) if diff < min_diff: min_diff = diff best_match = file_path except ValueError: continue return best_match def extract_shape_features(binary_image): """从二值化图像中提取形状和边界特征""" features = np.zeros(6, dtype=np.float32) # 初始化特征向量 try: contours = measure.find_contours(binary_image, 0.5) if not contours: return features main_contour = max(contours, key=len) contour_length = len(main_contour) label_image = morphology.label(binary_image) region = measure.regionprops(label_image)[0] contour_area = region.area hull = morphology.convex_hull_image(label_image) hull_area = np.sum(hull) solidity = region.solidity eccentricity = region.eccentricity orientation = region.orientation features[0] = contour_length / 1000 # 归一化 features[1] = contour_area / 1000 # 归一化 features[2] = solidity features[3] = eccentricity features[4] = orientation features[5] = hull_area / 1000 # 凸包面积 except Exception as e: print(f"形状特征提取错误: {e}") traceback.print_exc() return features def load_and_preprocess_image(file_path): """加载并预处理TIFF图像""" try: image = imageio.imread(file_path) if image.dtype == np.uint16: image = image.astype(np.float32) / 65535.0 elif image.dtype == np.uint8: image = image.astype(np.float32) / 255.0 else: image = image.astype(np.float32) if np.max(image) > 1.0: image = image / np.max(image) if len(image.shape) == 3 and image.shape[2] > 1: image = 0.299 * image[:, :, 0] + 0.587 * image[:, :, 1] + 0.114 * image[:, :, 2] image = np.expand_dims(image, axis=-1) image = resize(image, (IMAGE_SIZE[0], IMAGE_SIZE[1]), anti_aliasing=True) blurred = filters.gaussian(image[..., 0], sigma=1.0) thresh = filters.threshold_otsu(blurred) binary = blurred > thresh * 0.8 return image, binary except Exception as e: print(f"图像加载失败: {e}, 使用空白图像代替") return np.zeros((IMAGE_SIZE[0], IMAGE_SIZE[1], 1), dtype=np.float32), np.zeros((IMAGE_SIZE[0], IMAGE_SIZE[1]), dtype=np.bool) def create_tiff_dataset(file_paths): """从文件路径列表创建TensorFlow数据集""" dataset = tf.data.Dataset.from_tensor_slices(file_paths) def load_wrapper(file_path): file_path_str = file_path.numpy().decode('utf-8') image, binary = load_and_preprocess_image(file_path_str) features = extract_shape_features(binary) return image, features def tf_load_wrapper(file_path): result = tf.py_function( func=load_wrapper, inp=[file_path], Tout=(tf.float32, tf.float32) ) image = result[0] features = result[1] image.set_shape((IMAGE_SIZE[0], IMAGE_SIZE[1], 1)) # 单通道 features.set_shape((6,)) # 6个形状特征 return image, features dataset = dataset.map(tf_load_wrapper, num_parallel_calls=tf.data.experimental.AUTOTUNE) dataset = dataset.batch(BATCH_SIZE).prefetch(tf.data.experimental.AUTOTUNE) return dataset def load_and_prepare_data(): """加载所有数据并准备训练/测试集""" folder_names = generate_folder_names(START_WAVELENGTH, END_WAVELENGTH, STEP) print(f"\n生成的文件夹数量: {len(folder_names)}") print(f"起始文件夹: {folder_names[0]}") print(f"结束文件夹: {folder_names[-1]}") valid_files = [] wavelengths = [] print("\n扫描文件夹并匹配文件...") for folder_name in tqdm(folder_names, desc="处理文件夹"): folder_path = os.path.join(BASE_DIR, folder_name) if not os.path.isdir(folder_path): continue try: target_wavelength = float(folder_name) file_path = find_best_match_file(folder_path, target_wavelength) if file_path: valid_files.append(file_path) wavelengths.append(target_wavelength) except ValueError: continue print(f"\n找到的有效文件: {len(valid_files)}/{len(folder_names)}") if not valid_files: raise ValueError("未找到任何有效文件,请检查路径和文件夹名称") wavelengths = np.array(wavelengths) min_wavelength = np.min(wavelengths) max_wavelength = np.max(wavelengths) wavelength_range = max_wavelength - min_wavelength wavelengths_normalized = (wavelengths - min_wavelength) / wavelength_range print(f"波长范围: {min_wavelength:.6f} 到 {max_wavelength:.6f}, 范围大小: {wavelength_range:.6f}") train_files, test_files, train_wavelengths, test_wavelengths = train_test_split( valid_files, wavelengths_normalized, test_size=TEST_SIZE, random_state=RANDOM_SEED ) print(f"训练集大小: {len(train_files)}") print(f"测试集大小: {len(test_files)}") train_dataset = create_tiff_dataset(train_files) test_dataset = create_tiff_dataset(test_files) train_labels = tf.data.Dataset.from_tensor_slices(train_wavelengths) test_labels = tf.data.Dataset.from_tensor_slices(test_wavelengths) # 修复后的 combine_features 函数 def combine_features(data): """将图像特征和标签组合成模型需要的格式""" image_features, label = data image, shape_features = image_features return (image, shape_features), label train_dataset_unet = tf.data.Dataset.zip((train_dataset, train_labels)).map( combine_features, num_parallel_calls=tf.data.experimental.AUTOTUNE ) test_dataset_unet = tf.data.Dataset.zip((test_dataset, test_labels)).map( combine_features, num_parallel_calls=tf.data.experimental.AUTOTUNE ) train_dataset_cnn_dense = tf.data.Dataset.zip((train_dataset.map(lambda x: x[0]), train_labels)) test_dataset_cnn_dense = tf.data.Dataset.zip((test_dataset.map(lambda x: x[0]), test_labels)) return train_dataset_unet, test_dataset_unet, train_dataset_cnn_dense, test_dataset_cnn_dense, valid_files, min_wavelength, wavelength_range def build_unet_model(input_shape, shape_feature_size): """构建 U-Net 模型,同时接受图像输入和形状特征输入""" # 图像输入 image_input = Input(shape=input_shape, name='image_input') # Encoder conv1 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(image_input) conv1 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(conv1) pool1 = layers.MaxPooling2D(pool_size=(2, 2))(conv1) conv2 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(pool1) conv2 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(conv2) pool2 = layers.MaxPooling2D(pool_size=(2, 2))(conv2) conv3 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(pool2) conv3 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(conv3) pool3 = layers.MaxPooling2D(pool_size=(2, 2))(conv3) # Bottleneck conv4 = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(pool3) conv4 = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(conv4) drop4 = layers.Dropout(0.5)(conv4) # Decoder up5 = layers.Conv2D(128, (2, 2), activation='relu', padding='same')(layers.UpSampling2D(size=(2, 2))(drop4)) merge5 = layers.Concatenate()([conv3, up5]) conv5 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(merge5) conv5 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(conv5) up6 = layers.Conv2D(64, (2, 2), activation='relu', padding='same')(layers.UpSampling2D(size=(2, 2))(conv5)) merge6 = layers.Concatenate()([conv2, up6]) conv6 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(merge6) conv6 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(conv6) up7 = layers.Conv2D(32, (2, 2), activation='relu', padding='same')(layers.UpSampling2D(size=(2, 2))(conv6)) merge7 = layers.Concatenate()([conv1, up7]) conv7 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(merge7) conv7 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(conv7) # 形状特征输入 shape_input = Input(shape=(shape_feature_size,), name='shape_input') shape_dense = layers.Dense(128, activation='relu')(shape_input) shape_dense = layers.Dense(64, activation='relu')(shape_dense) # 合并图像特征和形状特征 flat = layers.GlobalAveragePooling2D()(conv7) combined = layers.Concatenate()([flat, shape_dense]) # 输出层 outputs = layers.Dense(1, activation='linear')(combined) model = Model(inputs=[image_input, shape_input], outputs=outputs) model.compile(optimizer=Adam(learning_rate=1e-4), loss='mean_squared_error', metrics=['mae']) return model def build_dense_model(input_shape): """构建简单的全连接网络""" inputs = Input(shape=input_shape) x = layers.Flatten()(inputs) x = layers.Dense(128, activation='relu')(x) x = layers.Dense(64, activation='relu')(x) outputs = layers.Dense(1, activation='linear')(x) model = Model(inputs=[inputs], outputs=[outputs]) model.compile(optimizer=Adam(learning_rate=1e-4), loss='mean_squared_error', metrics=['mae']) return model def build_cnn_model(input_shape): """构建传统的 CNN 模型""" inputs = Input(shape=input_shape) x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(inputs) x = layers.MaxPooling2D((2, 2))(x) x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x) x = layers.MaxPooling2D((2, 2))(x) x = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(x) x = layers.MaxPooling2D((2, 2))(x) x = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(x) x = layers.GlobalAveragePooling2D()(x) x = layers.Dense(256, activation='relu')(x) outputs = layers.Dense(1, activation='linear')(x) model = Model(inputs=[inputs], outputs=[outputs]) model.compile(optimizer=Adam(learning_rate=1e-4), loss='mean_squared_error', metrics=['mae']) return model def train_models(train_dataset_unet, test_dataset_unet, train_dataset_cnn_dense, test_dataset_cnn_dense, input_shape, shape_feature_size, wavelength_range): """训练多个模型""" unet_model = build_unet_model(input_shape, shape_feature_size) dense_model = build_dense_model(input_shape) cnn_model = build_cnn_model(input_shape) unet_model.summary() dense_model.summary() cnn_model.summary() callbacks = [ tf.keras.callbacks.EarlyStopping(patience=30, restore_best_weights=True, monitor='val_loss', min_delta=1e-6), tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=10, min_lr=1e-7) ] unet_history = unet_model.fit(train_dataset_unet, epochs=300, validation_data=test_dataset_unet, callbacks=callbacks, verbose=2) dense_history = dense_model.fit(train_dataset_cnn_dense, epochs=300, validation_data=test_dataset_cnn_dense, callbacks=callbacks, verbose=2) cnn_history = cnn_model.fit(train_dataset_cnn_dense, epochs=300, validation_data=test_dataset_cnn_dense, callbacks=callbacks, verbose=2) return unet_model, dense_model, cnn_model def predict_with_voting(models, test_image_path, min_wavelength, wavelength_range): """使用多个模型进行预测,并通过投票机制决定最终结果""" image, binary = load_and_preprocess_image(test_image_path) image = np.expand_dims(image, axis=0) shape_features = extract_shape_features(binary) shape_features = np.expand_dims(shape_features, axis=0) predictions = [] for model in models: if model.name == 'unet_model': predicted_normalized = model.predict([image, shape_features], verbose=0)[0][0] else: predicted_normalized = model.predict(image, verbose=0)[0][0] predicted_wavelength = predicted_normalized * wavelength_range + min_wavelength predictions.append(predicted_wavelength) # 使用投票机制(例如取平均值) final_prediction = np.mean(predictions) print(f"最终预测波长: {final_prediction:.8f} 纳米") return final_prediction def main(): """主函数""" print(f"TensorFlow 版本: {tf.__version__}") try: train_dataset_unet, test_dataset_unet, train_dataset_cnn_dense, test_dataset_cnn_dense, all_files, min_wavelength, wavelength_range = load_and_prepare_data() print(f"最小波长: {min_wavelength:.6f}, 波长范围: {wavelength_range:.6f}") except Exception as e: print(f"数据加载失败: {str(e)}") traceback.print_exc() return print("\n开始训练模型...") try: unet_model, dense_model, cnn_model = train_models(train_dataset_unet, test_dataset_unet, train_dataset_cnn_dense, test_dataset_cnn_dense, (IMAGE_SIZE[0], IMAGE_SIZE[1], 1), 6, wavelength_range) except Exception as e: print(f"模型训练失败: {str(e)}") traceback.print_exc() return print("\n从测试集中随机选择一张图片进行预测...") try: for (images, features), labels in test_dataset_unet.take(1): if images.shape[0] > 0: test_image = images[0].numpy() test_features = features[0].numpy() labels_np = labels.numpy() if labels_np.ndim == 0: true_wavelength_normalized = labels_np.item() else: true_wavelength_normalized = labels_np[0] true_wavelength = true_wavelength_normalized * wavelength_range + min_wavelength test_image_path = "f:/phD/代码/test_image.tiff" imageio.imwrite(test_image_path, (test_image[..., 0] * 255).astype(np.uint8)) predicted_wavelength = predict_with_voting([unet_model, dense_model, cnn_model], test_image_path, min_wavelength, wavelength_range) print(f"真实波长: {true_wavelength:.6f} 纳米") print(f"预测波长: {predicted_wavelength:.6f} 纳米") print(f"绝对误差: {abs(predicted_wavelength - true_wavelength):.8f} 纳米") print(f"相对误差: {abs(predicted_wavelength - true_wavelength) / wavelength_range * 100:.4f}%") else: print("错误:测试批次中没有样本") except Exception as e: print(f"测试失败: {str(e)}") traceback.print_exc() print("\n您可以使用自己的图片进行测试:") model = tf.keras.models.load_model(MODEL_SAVE_PATH) image_path = input("请输入您要测试的图片路径(例如:'test_image.tiff'): ") predicted = predict_with_voting([unet_model, dense_model, cnn_model], image_path, min_wavelength, wavelength_range) print(f"预测波长: {predicted:.6f} 纳米") print("\n程序执行完成。") if __name__ == "__main__": os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' try: from skimage import filters, measure, morphology except ImportError: print("安装必要的库...") import subprocess subprocess.run([sys.executable, "-m", "pip", "install", "scikit-image", "imageio"]) from skimage import filters, measure, morphology main()

修正import numpy as np import importlib from sklearn.datasets import load_iris from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler # 数据预处理 iris = load_iris() X = iris.data[:100] # Setosa(0)和Versicolor(1) y = np.where(iris.target[:100] == 0, -1, 1) # 转换为{-1,1} scaler = StandardScaler() X_scaled = scaler.fit_transform(X) X_train, X_test, y_train, y_test = train_test_split( X_scaled, y, test_size=0.3, stratify=y, random_state=42 ) # 弱分类器构建 def generate_weak_classifiers(X, num_per_feature=5): classifiers = [] n_features = X.shape[1] for feat_idx in range(n_features): feat_values = X[:, feat_idx] thresholds = np.linspace(np.min(feat_values), np.max(feat_values), num_per_feature + 2)[1:-1] for thresh in thresholds: classifiers.append((feat_idx, thresh)) return classifiers M = 20 weak_classifiers = generate_weak_classifiers(X_train, num_per_feature=5) def predict_weak_classifier(X, classifier): feat_idx, threshold = classifier return np.where(X[:, feat_idx] > threshold, 1, -1) H_train = np.array([predict_weak_classifier(X_train, wc) for wc in weak_classifiers]).T H_test = np.array([predict_weak_classifier(X_test, wc) for wc in weak_classifiers]).T # QUBO模型构建 def build_qubo_matrix(H, y, lambda_reg=0.05): N, M = H.shape HTH = H.T @ H yH = y @ H Q = np.zeros((M, M)) np.fill_diagonal(Q, [HTH[j, j] - 2*yH[j] + lambda_reg for j in range(M)]) for j in range(M): for k in range(j+1, M): Q[j, k] = 2 * HTH[j, k] Q[k, j] = 2 * HTH[j, k] return Q Q = build_qubo_matrix(H_train, y_train, lambda_reg=0.05) # Kaiwu SDK 求解 import importlib def load_module(): kaiwu = importlib.import_module('kaiwu') return kaiwu.do_something() import kaiwu as kw kw.license.init(user_id='106158405202239490', sdk_code='2a0In6Z9D2andYKWTt0wJtUjUKPxDj') # kaiwu.py class SimulatedAnnealingSolver: ... def some_function(): # 在需要时再导入(此时模块已初始化) from kaiwu import SimulatedAnnealingSolver solver = SimulatedAnnealingSolver() solver = SimulatedAnnealingSolver() result = solver.solve(Q) optimal_weights = result['solution'] # 模型评估 def strong_classifier_predict(X, weights, classifiers): predictions = np.zeros(X.shape[0]) for j, (feat_idx, threshold) in enumerate(classifiers): if weights[j] == 1: pred = np.where(X[:, feat_idx] > threshold, 1, -1) predictions += pred return np.sign(predictions) y_pred_test = strong_classifier_predict(X_test, optimal_weights, weak_classifiers) accuracy = np.mean(y_pred_test == y_test) print(f'最终强分类器在测试集上的准确率: {accuracy:.4f}')

这个模型是在干什么:import pandas as pd import numpy as np import lightgbm as lgb from lightgbm import early_stopping, log_evaluation from sklearn.model_selection import train_test_split from sklearn.metrics import roc_auc_score import chardet import os import gc import joblib from tqdm import tqdm import warnings warnings.filterwarnings('ignore') # 内存优化函数 def reduce_mem_usage(df, use_float16=False): """迭代降低DataFrame的内存占用""" start_mem = df.memory_usage().sum() / 1024**2 print(f"内存优化前: {start_mem:.2f} MB") for col in df.columns: col_type = df[col].dtype if col_type != object: c_min = df[col].min() c_max = df[col].max() if str(col_type)[:3] == "int": if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16) elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max: df[col] = df[col].astype(np.int64) else: if use_float16 and c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: df[col] = df[col].astype(np.float16) elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: df[col] = df[col].astype(np.float32) else: df[col] = df[col].astype(np.float64) end_mem = df.memory_usage().sum() / 1024**2 print(f"内存优化后: {end_mem:.2f} MB ({100*(start_mem-end_mem)/start_mem:.1f}% 减少)") return df def detect_encoding(file_path): with open(file_path, 'rb') as f: result = chardet.detect(f.read(10000)) return result['encoding'], result['confidence'] def load_data_for_day(day): """逐天加载数据并进行基本处理,返回优化后的DataFrame""" dtypes = {'did': 'category', 'vid': 'category'} day_str = f"{day:02d}" try: # 加载 see 数据 see_path = f'see_{day_str}.csv' if not os.path.exists(see_path): print(f"⚠️ 警告: 文件 {see_path} 不存在,跳过该天数据") return None, None, None see = pd.read_csv(see_path, encoding='latin1', dtype=dtypes) if 'did' not in see.columns or 'vid' not in see.columns: print(f"⚠️ 警告: see_{day_str}.csv 缺少必要字段") return None, None, None see['day'] = day_str see = reduce_mem_usage(see) # 加载 click 数据 click_path = f'click_{day_str}.csv' if os.path.exists(click_path): click = pd.read_csv( click_path, encoding='ISO-8859-1', on_bad_lines='skip', dtype=dtypes ) if 'click_time' not in click.columns: print(f"⚠️ 警告: click_{day_str}.csv 缺少 click_time 字段") click = None else: click = click[['did', 'vid']] click = reduce_mem_usage(click) else: click = None print(f"⚠️ 警告: click_{day_str}.csv 不存在") # 加载 play 数据 play_path = f'playplus_{day_str}.csv' if os.path.exists(play_path): play = pd.read_csv( play_path, engine='python', encoding_errors='ignore', dtype=dtypes ) if 'play_time' not in play.columns: print(f"⚠️ 警告: playplus_{day_str}.csv 缺少 play_time 字段") play = None else: play = play[['did', 'vid', 'play_time']] play = reduce_mem_usage(play) else: play = None print(f"⚠️ 警告: playplus_{day_str}.csv 不存在") return see, click, play except Exception as e: print(f"⚠️ 加载第 {day_str} 天数据时出错: {str(e)}") return None, None, None def process_data_in_chunks(days, feature_builder=None): """分块处理数据,避免内存溢出""" # 首先处理视频信息(一次性) if not os.path.exists('vid_info_table.csv'): raise FileNotFoundError("错误: vid_info_table.csv 文件不存在") video_info = pd.read_csv('vid_info_table.csv', encoding='gbk', dtype={'vid': 'category'}) if 'item_duration' not in video_info.columns: raise ValueError("vid_info_table.csv 缺少 item_duration 字段") video_info = reduce_mem_usage(video_info) video_info['vid'] = video_info['vid'].astype('category') # 初始化全局数据结构 user_stats = {} video_stats = {} # 逐天处理数据 for day in tqdm(range(1, days + 1), desc="处理每日数据"): see, click, play = load_data_for_day(day) if see is None: continue # 处理曝光数据 see_grouped = see.groupby('did')['vid'].nunique().reset_index(name='exposure_count') see_grouped = reduce_mem_usage(see_grouped) # 合并播放数据(如果存在) if play is not None: see = pd.merge(see, play, on=['did', 'vid'], how='left') see['play_time'] = see['play_time'].fillna(0).astype(np.float32) else: see['play_time'] = 0.0 # 合并点击数据(如果存在) if click is not None: click['clicked'] = 1 see = pd.merge(see, click, on=['did', 'vid'], how='left') see['clicked'] = see['clicked'].fillna(0).astype(np.int8) else: see['clicked'] = 0 # 合并视频信息 see = pd.merge(see, video_info[['vid', 'item_duration']], on='vid', how='left') see['item_duration'] = see['item_duration'].fillna(1.0) see.loc[see['item_duration'] <= 0, 'item_duration'] = 1.0 # 计算完成率 see['completion_rate'] = (see['play_time'] / see['item_duration']).clip(0, 1).astype(np.float16) # 创建标签 see['label'] = np.select( [(see['completion_rate'] > 0.4), (see['clicked'] == 1)], [2, 1], # 2=完成, 1=点击 default=0 # 0=曝光未点击 ).astype(np.int8) see['binary_label'] = see['label'].apply(lambda x: 1 if x >= 1 else 0).astype(np.int8) # 更新用户统计 for _, row in see.iterrows(): did = row['did'] vid = row['vid'] # 初始化用户统计 if did not in user_stats: user_stats[did] = { 'exposure_count': 0, 'click_count': 0, 'active_days': set() } # 更新曝光计数 user_stats[did]['exposure_count'] += 1 # 更新点击计数 if row['clicked'] == 1: user_stats[did]['click_count'] += 1 # 更新活跃天数 user_stats[did]['active_days'].add(day) # 初始化视频统计 if vid not in video_stats: video_stats[vid] = { 'click_users': set() } # 更新视频点击用户 if row['clicked'] == 1: video_stats[vid]['click_users'].add(did) # 释放内存 del see gc.collect() # 计算全局特征 print("计算全局特征...") user_features = [] for did, stats in user_stats.items(): active_days = len(stats['active_days']) click_count = stats['click_count'] exposure_count = stats['exposure_count'] if stats['exposure_count'] > 0 else 1 user_click_rate = click_count / exposure_count user_features.append({ 'did': did, 'user_click_rate': user_click_rate, 'user_active_days': active_days }) video_features = [] for vid, stats in video_stats.items(): video_popularity = len(stats['click_users']) video_features.append({ 'vid': vid, 'video_popularity': video_popularity }) user_df = pd.DataFrame(user_features) video_df = pd.DataFrame(video_features) # 释放内存 del user_stats, video_stats gc.collect() # 保存特征 user_df = reduce_mem_usage(user_df) video_df = reduce_mem_usage(video_df) user_df.to_csv('user_click_rate.csv', index=False) video_df.to_csv('video_popularity.csv', index=False) return user_df, video_df def prepare_samples(days=7): """准备训练样本(内存优化版本)""" # 处理数据并获取全局特征 user_df, video_df = process_data_in_chunks(days) # 读取并处理最近一天的数据作为样本 see, _, play = load_data_for_day(days) if see is None: raise ValueError("无法加载样本数据") # 合并用户特征 see = pd.merge(see, user_df, on='did', how='left') see['user_click_rate'] = see['user_click_rate'].fillna(0).astype(np.float32) see['user_active_days'] = see['user_active_days'].fillna(1).astype(np.int16) # 合并视频特征 see = pd.merge(see, video_df, on='vid', how='left') see['video_popularity'] = see['video_popularity'].fillna(0).astype(np.float32) # 特征交叉 see['user_video_interaction'] = (see['user_active_days'] * np.log1p(see['video_popularity'])).astype(np.float32) see['user_video_affinity'] = (see['user_click_rate'] * see['video_popularity']).astype(np.float32) # 处理视频信息 video_info = pd.read_csv('vid_info_table.csv', encoding='gbk', dtype={'vid': 'category'}) see = pd.merge(see, video_info[['vid', 'item_duration']], on='vid', how='left') see['item_duration'] = see['item_duration'].fillna(1.0) see.loc[see['item_duration'] <= 0, 'item_duration'] = 1.0 # 计算完成率 if 'play_time' not in see.columns: see['play_time'] = 0.0 see['completion_rate'] = (see['play_time'] / see['item_duration']).clip(0, 1).astype(np.float16) # 创建标签 see['label'] = np.select( [(see['completion_rate'] > 0.4), (see['clicked'] == 1)], [2, 1], # 2=完成, 1=点击 default=0 # 0=曝光未点击 ).astype(np.int8) see['binary_label'] = see['label'].apply(lambda x: 1 if x >= 1 else 0).astype(np.int8) # 优化内存 see = reduce_mem_usage(see) return see, user_df, video_df def train_model(samples): """训练模型(内存优化版本)""" print("准备训练数据...") features = ['user_click_rate', 'video_popularity', 'user_active_days', 'user_video_interaction', 'user_video_affinity'] # 确保特征存在 available_features = [f for f in features if f in samples.columns] print(f"使用的特征: {available_features}") X = samples[available_features] y = samples['binary_label'] # 检查标签分布 if len(y.unique()) < 2: raise ValueError("标签数据不平衡,需要正负样本") # 划分训练集和测试集 X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) # 优化内存 X_train = reduce_mem_usage(X_train) X_test = reduce_mem_usage(X_test) # 创建数据集 lgb_train = lgb.Dataset(X_train, y_train, free_raw_data=True) lgb_eval = lgb.Dataset(X_test, y_test, reference=lgb_train, free_raw_data=True) # 优化模型参数(降低复杂度) params = { 'boosting_type': 'gbdt', 'objective': 'binary', 'metric': 'auc', 'num_leaves': 31, # 减少叶子节点 'max_depth': 7, # 减少深度 'learning_rate': 0.05, 'feature_fraction': 0.7, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'min_child_samples': 100, # 增加以降低内存 'verbosity': -1, 'seed': 42 } # 训练模型 print("训练模型...") model = lgb.train( params, lgb_train, num_boost_round=500, # 减少迭代次数 valid_sets=[lgb_train, lgb_eval], callbacks=[ early_stopping(stopping_rounds=50, verbose=True), log_evaluation(period=100) ] ) # 评估模型 y_pred = model.predict(X_test) auc_score = roc_auc_score(y_test, y_pred) print(f"✅ 模型训练完成,验证集AUC: {auc_score:.4f}") # 保存模型 joblib.dump(model, 'lightgbm_model.pkl') print("💾 模型已保存") # 保存特征列表 with open('feature_columns.txt', 'w') as f: f.write('\n'.join(available_features)) return model, available_features, auc_score def predict_new_data(model, feature_columns, test_file): """预测新数据(内存优化版本)""" print("加载测试数据...") test_data = pd.read_csv(test_file, dtype={'did': 'category', 'vid': 'category'}) test_data = reduce_mem_usage(test_data) # 加载特征映射 user_df = pd.read_csv('user_click_rate.csv') if os.path.exists('user_click_rate.csv') else pd.DataFrame() video_df = pd.read_csv('video_popularity.csv') if os.path.exists('video_popularity.csv') else pd.DataFrame() # 使用全局均值用于填充新用户/新视频 global_user_rate = user_df['user_click_rate'].mean() if not user_df.empty else 0 global_video_pop = video_df['video_popularity'].mean() if not video_df.empty else 0 global_active_days = user_df['user_active_days'].mean() if not user_df.empty else 1 # 创建映射字典(减少内存) user_click_map = user_df.set_index('did')['user_click_rate'].to_dict() if not user_df.empty else {} video_pop_map = video_df.set_index('vid')['video_popularity'].to_dict() if not video_df.empty else {} user_active_map = user_df.set_index('did')['user_active_days'].to_dict() if not user_df.empty else {} # 添加特征 print("添加特征...") test_data['user_click_rate'] = test_data['did'].map(user_click_map).fillna(global_user_rate).astype(np.float32) test_data['video_popularity'] = test_data['vid'].map(video_pop_map).fillna(global_video_pop).astype(np.float32) test_data['user_active_days'] = test_data['did'].map(user_active_map).fillna(global_active_days).astype(np.int16) # 特征交叉 test_data['user_video_interaction'] = (test_data['user_active_days'] * np.log1p(test_data['video_popularity'])).astype(np.float32) test_data['user_video_affinity'] = (test_data['user_click_rate'] * test_data['video_popularity']).astype(np.float32) # 确保所有特征都存在 print("准备预测数据...") test_features = test_data[feature_columns].copy() # 释放内存 del test_data gc.collect() # 分批预测(避免内存溢出) print("开始预测...") batch_size = 100000 predictions = [] for i in tqdm(range(0, len(test_features), batch_size), desc="预测批次"): batch = test_features.iloc[i:i+batch_size] preds = model.predict(batch) predictions.extend(preds.tolist()) del batch gc.collect() # 重新加载测试数据以获取did和vid test_data = pd.read_csv(test_file, dtype={'did': 'category', 'vid': 'category'}, usecols=['did', 'vid']) test_data['click_prob'] = predictions # 生成并保存结果 print("生成最终结果...") top_predictions = test_data.sort_values('click_prob', ascending=False).groupby('did').head(1) result = top_predictions[['did', 'vid', 'click_prob']].copy() result.to_csv('prediction_result.csv', index=False) print(f"✅ 预测完成,结果已保存至 prediction_result.csv") print(f"预测样本数量: {len(result)}") # 释放内存 del test_features, predictions, top_predictions gc.collect() return result if __name__ == '__main__': try: print("🚀 开始视频推荐模型训练与预测流程 (内存优化版)") # 设置较小的天数 TRAIN_DAYS = 7 # 仅使用7天数据 print(f"⚙️ 配置: 使用{TRAIN_DAYS}天数据训练") # 准备样本 print("🔧 准备训练样本...") samples, _, _ = prepare_samples(days=TRAIN_DAYS) if samples is None: raise ValueError("样本准备失败") print(f"✅ 样本准备完成 - 总样本数: {len(samples)}") # 标签分布 label_dist = samples['binary_label'].value_counts(normalize=True) print(f"📊 标签分布 - 正样本: {label_dist[1]:.2%}, 负样本: {label_dist[0]:.2%}") # 训练模型 print("🤖 开始训练LightGBM模型...") model, features, auc_score = train_model(samples) print(f"🎯 最优模型AUC: {auc_score:.4f}") # 释放内存 del samples gc.collect() # 预测新数据 print("🔮 开始预测新数据...") test_file = 'testA_did_show.csv' # 直接加载保存的模型(避免内存中的模型占用) if not os.path.exists('lightgbm_model.pkl'): raise FileNotFoundError("模型文件不存在") model = joblib.load('lightgbm_model.pkl') # 加载特征列表 if not os.path.exists('feature_columns.txt'): raise FileNotFoundError("特征列表文件不存在") with open('feature_columns.txt', 'r') as f: features = f.read().splitlines() result = predict_new_data(model, features, test_file) print("✅ 流程成功完成!") except Exception as e: print(f"❌ 流程出错: {str(e)}") import traceback traceback.print_exc()

import pandas as pd from sklearn.ensemble import RandomForestRegressor from sklearn.model_selection import train_test_split, GridSearchCV from sklearn.metrics import mean_squared_error from sklearn.preprocessing import StandardScaler import numpy as np def extract_dn_a_values_from_csv(csv_file, target_param): # 从CSV中提取DN_A值并返回DataFrame df = pd.read_csv(csv_file, header=None) # 读取CSV文件 dn_a_values = [] # 存储DN_A值的列表 for index, row in df.iterrows(): line = row[0] # 获取行内容 parameters = line.split(';') # 按分号分割参数 for param in parameters: if param.startswith(target_param + ':'): # 查找以 "DN_Y:" 开头的参数 dn_a_value = float(param.split(':')[1]) # 取出DN_A值 dn_a_values.append(dn_a_value) # 添加到列表 return pd.DataFrame({target_param: dn_a_values}) # 返回包含 DN_A 值的 DataFrame def extract_other_params_from_csv(csv_file, target_param): # 从CSV中提取其他参数值并返回DataFrame df = pd.read_csv(csv_file, header=None) # 读取CSV文件 other_params = [] # 存储其他参数值的列表 dn_a_values = [] # 存储DN_A值的列表 for index, row in df.iterrows(): line = row[0] # 获取行内容 parameters = line.split(';') # 按分号分割参数 other_param = [] dn_a_value = None for param in parameters: parts = param.split(':') if len(parts) == 2 and not param.startswith(target_param + ':'): try: other_param.append(float(parts[1])) except ValueError: continue elif param.startswith(target_param + ':'): dn_a_value = float(param.split(':')[1]) if dn_a_value is not None: other_params.append(other_param) dn_a_values.append(dn_a_value) return pd.DataFrame(other_params), pd.DataFrame({target_param: dn_a_values}) if __name__ == "__main__": target_param = 'DN_A' # 假设 PARAMETER.csv 文件已存在并包含所需格式数据 other_params_df, dn_a_df = extract_other_params_from_csv('PARAMETER.csv', target_param) # 提取其他参数值和DN_A值 # 填充缺失值 max_len = max(len(x) for x in other_params_df.values.tolist()) for i, x in enumerate(other_params_df.values.tolist()): x += [0] * (max_len - len(x)) other_params_df = pd.DataFrame(other_params_df.values.tolist()) # 清除NaN和无穷大值 other_params_df = other_params_df.replace([np.inf, -np.inf], np.nan) other_params_df = other_params_df.dropna() # 数据归一化 scaler = StandardScaler() other_params_df = pd.DataFrame(scaler.fit_transform(other_params_df), columns=other_params_df.columns) # 确保行数一致 min_len = min(len(other_params_df), len(dn_a_df)) other_params_df = other_params_df.head(min_len) dn_a_df = dn_a_df.head(min_len) # 划分训练和测试数据 X_train, X_test, y_train, y_test = train_test_split(other_params_df, dn_a_df[target_param], test_size=0.2, random_state=42) # 超参数调优 param_grid = {'n_estimators': [10, 50, 100, 200], 'max_depth': [None, 5, 10, 15]} grid_search = GridSearchCV(RandomForestRegressor(random_state=42), param_grid, cv=5, scoring='neg_mean_squared_error') grid_search.fit(X_train, y_train) # 训练随机森林模型 rf = grid_search.best_estimator_ rf.fit(X_train, y_train) # 评估模型性能 y_pred = rf.predict(X_test) mse = mean_squared_error(y_test, y_pred) print(f'MSE: {mse:.2f}') # 使用模型进行DN_A参数推荐 def recommend_dn_a(other_params): other_params_df = pd.DataFrame([other_params + [0] * (len(X_train.columns) - len(other_params))]) other_params_df = pd.DataFrame(scaler.transform(other_params_df), columns=other_params_df.columns) predicted_dn_a = rf.predict(other_params_df) return predicted_dn_a # 示例使用 other_params = [800, 80, 11, -63.94775, 0.782, 800, 80, 11, -64.35547, 0.745] recommended_dn_a = recommend_dn_a(other_params) print(f'推荐的{target_param}值:{recommended_dn_a[0]}') 可以在进行优化吗?

import datasets import transformers import modelscope from itertools import chain import glob import torch import evaluate from swanlab.integration.transformers import SwanLabCallback import swanlab import numpy as np from sklearn.metrics import accuracy_score import evaluate import transformers import torch import torch.nn as nn from transformers import AutoModelForCausalLM, GPT2Config from transformers import RobertaTokenizerFast # 实现旋转位置编码 class RotaryPositionalEmbedding(nn.Module): def __init__(self, dim, max_seq_len=2048, base=10000): super().__init__() self.dim = dim self.max_seq_len = max_seq_len self.base = base inv_freq = 1.0 / (base ** (torch.arange(0, dim, 2).float() / dim)) self.register_buffer('inv_freq', inv_freq) self.register_buffer('positions', torch.arange(max_seq_len)) def forward(self, x): batch_size, seq_len, hidden_size = x.size() seq_len = min(seq_len, self.max_seq_len) positions = self.positions[:seq_len] angles = positions.unsqueeze(1) * self.inv_freq.unsqueeze(0) angles = angles.unsqueeze(0).repeat(batch_size, 1, 1) cos_angles = torch.cos(angles) sin_angles = torch.sin(angles) x_reshaped = x.view(batch_size, seq_len, -1, 2) x1, x2 = x_reshaped[..., 0], x_reshaped[..., 1] rotated_x1 = x1 * cos_angles - x2 * sin_angles rotated_x2 = x1 * sin_angles + x2 * cos_angles rotated = torch.stack([rotated_x1, rotated_x2], dim=-1).view(batch_size, seq_len, hidden_size) return rotated # 自定义模型类,继承自原模型的PreTrainedModel class CustomModelWithRotary(transformers.PreTrainedModel): config_class = GPT2Config def __init__(self, config): super().__init__(config) # 修改属性名避免与父类冲突 self.gpt2_model = transformers.AutoModelForCausalLM.from_pretrained( "/root/workspace/workspace/gpt2-large", config=config, ignore_mismatched_sizes=True ) self.rotary_emb = RotaryPositionalEmbedding( dim=config.hidden_size, max_seq_len=config.max_position_embeddings ) self.config.position_encoding_type = "rotary" def forward(self, input_ids=None, attention_mask=None, **kwargs): # 使用新的属性名调用模型 outputs = self.gpt2_model(input_ids=input_ids, attention_mask=attention_mask, **kwargs) if hasattr(outputs, 'last_hidden_state'): outputs.last_hidden_state = self.rotary_emb(outputs.last_hidden_state) else: outputs = (self.rotary_emb(outputs[0]),) + outputs[1:] return outputs def main(): swanlab.init("PreTrain-GPT2-SELFIES") swanlab_callback = SwanLabCallback( project="PreTrain-GPT2-SELFIES", experiment_name="PreTrain-GPT2-SELFIES" ) raw_datasets = datasets.load_dataset( "json", data_files="/root/workspace/selfies1.json" ) # split dataset raw_datasets = raw_datasets["train"].train_test_split(test_size=0.05, seed=2333) print("dataset info") print(raw_datasets) saved_path = '/root/workspace/robertatokenizer' tokenizer = RobertaTokenizerFast.from_pretrained(saved_path) print("Loaded RobertaTokenizerFast from: " + saved_path) # 设置分词器的最大长度 tokenizer.model_max_length = 512 print(f"Set tokenizer model_max_length to: {512}") context_length = 512 # use a small context length) vocab_size = tokenizer.vocab_size print(f"Tokenizer vocab size: {vocab_size}") # preprocess dataset def tokenize(element): # 对数据集进行预处理,将文本转换为模型可以处理的输入格式 # 这里使用的是Qwen2-0.5B的Tokenizer,将文本转换为模型可以处理的输入格式 # truncation=True表示如果文本长度超过了context_length,就截断 # max_length=context_length表示文本的最大长度为context_length # return_overflowing_tokens=True表示返回溢出的tokens outputs = tokenizer( element["text"], truncation=True, max_length=context_length, return_overflowing_tokens=True, return_length=True, ) input_batch = [] # 作用是将溢出的tokens转换为模型可以处理的输入格式 # 这里使用的是Qwen2-0.5B的Tokenizer,将文本转换为模型可以处理的输入格式 # 这里的input_ids是一个二维数组,每一行表示一个文本的输入格式 # 这里的length是一个一维数组,每一个元素表示一个文本的长度 # 这里的input_batch是一个二维数组,每一行表示一个文本的输入格式 # 这里的context_length是一个整数,表示文本的最大长度 for length, input_ids in zip(outputs["length"], outputs["input_ids"]): if length == context_length: input_batch.append(input_ids) return {"input_ids": input_batch} # map函数的作用是将tokenize函数应用到数据集的每一个元素上 # batched=True表示将数据集分成batch进行处理 # remove_columns=raw_datasets["train"].column_names表示删除原始数据集的列名 tokenized_datasets = raw_datasets.map( tokenize, batched=True,num_proc=20, remove_columns=raw_datasets["train"].column_names ) print("tokenize dataset info") # print(tokenized_datasets) # eos_token的作用是表示文本的结束 # pad_token的作用是表示填充的token tokenizer.pad_token = tokenizer.eos_token # DataCollatorForLanguageModeling的作用是将数据集转换为模型可以处理的输入格式 # 这里使用的是Qwen2-0.5B的Tokenizer,将文本转换为模型可以处理的输入格式 # mlm=False表示不进行masked language modeling data_collator = transformers.DataCollatorForLanguageModeling(tokenizer, mlm=False) # 加载配置 config = GPT2Config.from_pretrained("/root/workspace/workspace/gpt2-large") config.vocab_size = vocab_size # 更新配置中的 vocab_size # 检查配置中的 vocab_size print(f"Model config vocab size: {config.vocab_size}") # 直接实例化自定义模型,无需注册 model = CustomModelWithRotary(config) model_size = sum(t.numel() for t in model.parameters()) print("Model Config:") print(config) print(f"Model Size: {model_size/1000**2:.1f}M parameters") # 加载各个所需的指标 accuracy_metric = evaluate.load('./metrics/accuracy') def compute_metrics(eval_preds): logits, labels = eval_preds # 获取预测结果(取logits中概率最大的索引) preds = np.argmax(logits, axis=-1) # 形状: [batch_size, sequence_length] labels = labels[:, 1:].reshape(-1) preds = preds[:, :-1].reshape(-1) # 计算每个标记的准确度 accuracy = accuracy_metric.compute(predictions=preds, references=labels) return accuracy import random def compute_metrics_partial(eval_preds, subset_ratio=0.5): # 这里假设 eval_preds 是一个 list,包含 logits 和 labels logits, labels = eval_preds # 随机选择部分批次进行计算 batch_size = logits.shape[0] subset_size = int(batch_size * subset_ratio) # 计算子集的大小 selected_indices = random.sample(range(batch_size), subset_size) # 获取预测结果(只对选定的批次进行计算) selected_logits = logits[selected_indices] selected_labels = labels[selected_indices] preds = np.argmax(selected_logits, axis=-1) # shape: [subset_size, sequence_length] selected_labels = selected_labels[:, 1:].reshape(-1) selected_preds = preds[:, :-1].reshape(-1) # 计算准确度 accuracy = accuracy_metric.compute(predictions=selected_preds, references=selected_labels) return accuracy # train args = transformers.TrainingArguments( output_dir="./GPT2-SELFIES", per_device_train_batch_size=3, # 每个GPU的训练batch数 per_device_eval_batch_size=3, # 每个GPU的测试batch数 eval_strategy="steps", eval_steps=2, logging_steps=5, gradient_accumulation_steps=8, # 梯度累计总数 num_train_epochs=100, # 训练epoch数 lr_scheduler_type="cosine", # 学习率衰减策略 learning_rate=1e-5, # 基础学习率, save_steps=500, save_total_limit=10, bf16=True, # 开启bf16训练, 对于Amper架构以下的显卡建议替换为fp16=True ) print("Train Args:") print(args) # enjoy training trainer = transformers.Trainer( model=model, tokenizer=tokenizer, args=args, data_collator=data_collator, train_dataset=tokenized_datasets["train"], eval_dataset=tokenized_datasets["test"], compute_metrics=compute_metrics, callbacks=[swanlab_callback], ) trainer.train() # save model trainer.save_model("./GPT2-SELFIES/Weight") # 保存模型的路径 # generate pipe = transformers.pipeline("text-generation", model=model, tokenizer=tokenizer) print("GENERATE:", pipe("人工智能", num_return_sequences=1)[0]["generated_text"]) prompts = ["牛顿", "北京市", "亚洲历史"] examples = [] for i in range(3): # 根据提示词生成数据 text = pipe(prompts[i], num_return_sequences=1)[0]["generated_text"] text = swanlab.Text(text) examples.append(text) swanlab.log({"Generate": examples}) if __name__ == "__main__": main()解决报错 File "/root/workspace/gpt2-large.py", line 247, in <module> main() File "/root/workspace/gpt2-large.py", line 228, in main trainer.train() File "/opt/conda/lib/python3.10/site-packages/transformers/trainer.py", line 2241, in train return inner_training_loop( File "/opt/conda/lib/python3.10/site-packages/transformers/trainer.py", line 2548, in _inner_training_loop tr_loss_step = self.training_step(model, inputs, num_items_in_batch) File "/opt/conda/lib/python3.10/site-packages/transformers/trainer.py", line 3698, in training_step loss = self.compute_loss(model, inputs, num_items_in_batch=num_items_in_batch) File "/opt/conda/lib/python3.10/site-packages/transformers/trainer.py", line 3759, in compute_loss outputs = model(**inputs) File "/opt/conda/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1518, in _wrapped_call_impl return self._call_impl(*args, **kwargs) File "/opt/conda/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1527, in _call_impl return forward_call(*args, **kwargs) File "/opt/conda/lib/python3.10/site-packages/accelerate/utils/operations.py", line 819, in forward return model_forward(*args, **kwargs) File "/opt/conda/lib/python3.10/site-packages/accelerate/utils/operations.py", line 807, in __call__ return convert_to_fp32(self.model_forward(*args, **kwargs)) File "/opt/conda/lib/python3.10/site-packages/torch/amp/autocast_mode.py", line 16, in decorate_autocast return func(*args, **kwargs) File "/root/workspace/gpt2-large.py", line 68, in forward outputs = (self.rotary_emb(outputs[0]),) + outputs[1:] File "/opt/conda/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1518, in _wrapped_call_impl return self._call_impl(*args, **kwargs) File "/opt/conda/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1527, in _call_impl return forward_call(*args, **kwargs) File "/root/workspace/gpt2-large.py", line 31, in forward batch_size, seq_len, hidden_size = x.size() not enough values to unpack (expected 3, got 0)

大家在看

recommend-type

基于 ADS9110的隔离式数据采集 (DAQ) 系统方案(待编辑)-电路方案

描述 该“可实现最大 SNR 和采样率的 18 位 2Msps 隔离式数据采集参考设计”演示了如何应对隔离式数据采集系统设计中的典型性能限制挑战: 通过将数字隔离器引入的传播延迟降至最低,使采样率达到最大 通过有效地减轻数字隔离器引入的 ADC 采样时钟抖动,使高频交流信号链性能 (SNR) 达到最大 特性 18 位、2Msps、1 通道、差分输入、隔离式数据采集 (DAQ) 系统 利用 ADS9110 的 multiSPI:trade_mark: 数字接口实现 2MSPS 采样率,同时保持低 SPI 数据速率 源同步 SPI 数据传输模式,可将隔离器传播延迟降至最低并提高采样率 可降低隔离器引入的抖动的技术,能够将 SNR 提高 12dB(100kHz Fin,2MSPS) 经测试的设计包含理论和计算、组件选择、PCB 设计和测量结果 原理图 附件文档: 方案相关器件: ISO1541:低功耗、双向 I2C 隔离器 ISO7840:高性能 5.7kVRMS 增强型四通道数字隔离器 ISO7842:高性能 5.7kVRMS 增强型四通道数字隔离器
recommend-type

自动化图书管理系统 v7.0

自动化图书馆管理系统包含了目前图书馆管理业务的每个环节,能同时管理图书和期刊,能打印条码、书标,并制作借书证,最大藏书量在300万册以上。系统采用CNMARC标准及中图法第四版分类,具有Web检索与发布功能,条码扫描,支持一卡通,支持触摸屏。系统包括系统管理、读者管理、编目、流通、统计、查询等功能。能够在一个界面下实现图书、音像、期刊的管理,设置假期、设置暂离锁(提高安全性)、暂停某些读者的借阅权、导入导出读者、交换MARC数据、升级辅助编目库等。安装本系统前请先安装SQL 2000SQL 下载地址 http://pan.baidu.com/s/145vkr安装过程如有问题可咨询: TEL 13851381727  QQ 306404635
recommend-type

真正的VB6.0免安装,可以装U盘启动了

这个,,资源都来自CSDN大神们,在这里声明下。
recommend-type

详细说明 VC++的MFC开发串口调试助手源代码,包括数据发送,接收,显示制式等29782183com

详细说明 VC++的MFC开发串口调试助手源代码,包括数据发送,接收,显示制式等29782183com
recommend-type

文档编码批量转换UTF16toUTF8.rar

将UTF16编码格式的文件转换编码到UTF8 使用格式:U16toU8.exe [output] 如果没有output,则覆盖源文件,否则输出到output中 方便命令行使用,批量转换文件编码

最新推荐

recommend-type

C#类库封装:简化SDK调用实现多功能集成,构建地磅无人值守系统

内容概要:本文介绍了利用C#类库封装多个硬件设备的SDK接口,实现一系列复杂功能的一键式调用。具体功能包括身份证信息读取、人证识别、车牌识别(支持臻识和海康摄像头)、LED显示屏文字输出、称重数据读取、二维码扫描以及语音播报。所有功能均被封装为简单的API,极大降低了开发者的工作量和技术门槛。文中详细展示了各个功能的具体实现方式及其应用场景,如身份证读取、人证核验、车牌识别等,并最终将这些功能整合到一起,形成了一套完整的地磅称重无人值守系统解决方案。 适合人群:具有一定C#编程经验的技术人员,尤其是需要快速集成多种硬件设备SDK的应用开发者。 使用场景及目标:适用于需要高效集成多种硬件设备SDK的项目,特别是那些涉及身份验证、车辆管理、物流仓储等领域的企业级应用。通过使用这些封装好的API,可以大大缩短开发周期,降低维护成本,提高系统的稳定性和易用性。 其他说明:虽然封装后的API极大地简化了开发流程,但对于一些特殊的业务需求,仍然可能需要深入研究底层SDK。此外,在实际部署过程中,还需考虑网络环境、硬件兼容性等因素的影响。
recommend-type

基于STM32F1的BLDC无刷直流电机与PMSM永磁同步电机源码解析:传感器与无传感器驱动详解

基于STM32F1的BLDC无刷直流电机和PMSM永磁同步电机的驱动实现方法,涵盖了有传感器和无传感两种驱动方式。对于BLDC电机,有传感器部分采用霍尔传感器进行六步换相,无传感部分则利用反电动势过零点检测实现换相。对于PMSM电机,有传感器部分包括霍尔传感器和编码器的方式,无传感部分则采用了滑模观测器进行矢量控制(FOC)。文中不仅提供了详细的代码片段,还分享了许多调试经验和技巧。 适合人群:具有一定嵌入式系统和电机控制基础知识的研发人员和技术爱好者。 使用场景及目标:适用于需要深入了解和实现BLDC和PMSM电机驱动的开发者,帮助他们掌握不同传感器条件下的电机控制技术和优化方法。 其他说明:文章强调了实际调试过程中可能遇到的问题及其解决方案,如霍尔传感器的中断触发换相、反电动势过零点检测的采样时机、滑模观测器的参数调整以及编码器的ABZ解码等。
recommend-type

基于Java的跨平台图像处理软件ImageJ:多功能图像编辑与分析工具

内容概要:本文介绍了基于Java的图像处理软件ImageJ,详细阐述了它的跨平台特性、多线程处理能力及其丰富的图像处理功能。ImageJ由美国国立卫生研究院开发,能够在多种操作系统上运行,包括Windows、Mac OS、Linux等。它支持多种图像格式,如TIFF、PNG、GIF、JPEG、BMP、DICOM、FITS等,并提供图像栈功能,允许多个图像在同一窗口中进行并行处理。此外,ImageJ还提供了诸如缩放、旋转、扭曲、平滑处理等基本操作,以及区域和像素统计、间距、角度计算等高级功能。这些特性使ImageJ成为科研、医学、生物等多个领域的理想选择。 适合人群:需要进行图像处理的专业人士,如科研人员、医生、生物学家,以及对图像处理感兴趣的普通用户。 使用场景及目标:适用于需要高效处理大量图像数据的场合,特别是在科研、医学、生物学等领域。用户可以通过ImageJ进行图像的编辑、分析、处理和保存,提高工作效率。 其他说明:ImageJ不仅功能强大,而且操作简单,用户无需安装额外的运行环境即可直接使用。其基于Java的开发方式确保了不同操作系统之间的兼容性和一致性。
recommend-type

MATLAB语音识别系统:基于GUI的数字0-9识别及深度学习模型应用 · GUI v1.2

内容概要:本文介绍了一款基于MATLAB的语音识别系统,主要功能是识别数字0到9。该系统采用图形用户界面(GUI),方便用户操作,并配有详尽的代码注释和开发报告。文中详细描述了系统的各个组成部分,包括音频采集、信号处理、特征提取、模型训练和预测等关键环节。此外,还讨论了MATLAB在此项目中的优势及其面临的挑战,如提高识别率和处理背景噪音等问题。最后,通过对各模块的工作原理和技术细节的总结,为未来的研究和发展提供了宝贵的参考资料。 适合人群:对语音识别技术和MATLAB感兴趣的初学者、学生或研究人员。 使用场景及目标:适用于希望深入了解语音识别技术原理的人群,特别是希望通过实际案例掌握MATLAB编程技巧的学习者。目标是在实践中学习如何构建简单的语音识别应用程序。 其他说明:该程序需要MATLAB 2019b及以上版本才能正常运行,建议使用者确保软件环境符合要求。
recommend-type

c语言通讯录管理系统源码.zip

C语言项目源码
recommend-type

Teleport Pro教程:轻松复制网站内容

标题中提到的“复制别人网站的软件”指向的是一种能够下载整个网站或者网站的特定部分,然后在本地或者另一个服务器上重建该网站的技术或工具。这类软件通常被称作网站克隆工具或者网站镜像工具。 描述中提到了一个具体的教程网址,并提到了“天天给力信誉店”,这可能意味着有相关的教程或资源可以在这个网店中获取。但是这里并没有提供实际的教程内容,仅给出了网店的链接。需要注意的是,根据互联网法律法规,复制他人网站内容并用于自己的商业目的可能构成侵权,因此在此类工具的使用中需要谨慎,并确保遵守相关法律法规。 标签“复制 别人 网站 软件”明确指出了这个工具的主要功能,即复制他人网站的软件。 文件名称列表中列出了“Teleport Pro”,这是一款具体的网站下载工具。Teleport Pro是由Tennyson Maxwell公司开发的网站镜像工具,允许用户下载一个网站的本地副本,包括HTML页面、图片和其他资源文件。用户可以通过指定开始的URL,并设置各种选项来决定下载网站的哪些部分。该工具能够帮助开发者、设计师或内容分析人员在没有互联网连接的情况下对网站进行离线浏览和分析。 从知识点的角度来看,Teleport Pro作为一个网站克隆工具,具备以下功能和知识点: 1. 网站下载:Teleport Pro可以下载整个网站或特定网页。用户可以设定下载的深度,例如仅下载首页及其链接的页面,或者下载所有可访问的页面。 2. 断点续传:如果在下载过程中发生中断,Teleport Pro可以从中断的地方继续下载,无需重新开始。 3. 过滤器设置:用户可以根据特定的规则过滤下载内容,如排除某些文件类型或域名。 4. 网站结构分析:Teleport Pro可以分析网站的链接结构,并允许用户查看网站的结构图。 5. 自定义下载:用户可以自定义下载任务,例如仅下载图片、视频或其他特定类型的文件。 6. 多任务处理:Teleport Pro支持多线程下载,用户可以同时启动多个下载任务来提高效率。 7. 编辑和管理下载内容:Teleport Pro具备编辑网站镜像的能力,并可以查看、修改下载的文件。 8. 离线浏览:下载的网站可以在离线状态下浏览,这对于需要测试网站在不同环境下的表现的情况十分有用。 9. 备份功能:Teleport Pro可以用来备份网站,确保重要数据的安全。 在实际使用此类工具时,需要注意以下几点: - 著作权法:复制网站内容可能侵犯原作者的版权,因此在使用此类工具时,必须确保有合法权利去下载和使用目标网站的内容。 - 服务条款:许多网站的服务条款明确禁止未经授权的网站克隆。因此,在使用此类软件之前,应当仔细阅读并遵守目标网站的服务条款。 - 数据隐私:下载含有个人数据的网站可能触及隐私保护法律,特别是在欧洲通用数据保护条例(GDPR)等法规的环境下。 - 网络安全:随意下载网站可能导致恶意软件和病毒的传播,用户应当使用最新的反病毒软件,并在安全的环境中进行操作。 综上所述,虽然“复制别人网站的软件”在技术上是可行的,且可以用于多种正当目的,但在使用此类工具时,必须遵守法律法规和道德规范,不可侵犯他人的合法权益。
recommend-type

【跨平台开发者的必读】:解决Qt5Widgetsd.lib目标计算机类型冲突终极指南

# 1. 跨平台开发与Qt框架简介 跨平台开发已经成为现代软件开发中不可或缺的一部分,它使得开发者能够构建出能够在多种操作系统上运行的应用程序,从而触及更广泛的用户群体。在这其中,Qt框架作为一套成熟的跨平台应用程序和用户界面框架,扮演着关键角色。Qt不仅仅是一个跨平台的工具集,它的设计哲学还包含了模块化和可扩展性,使其能够适应各种规模和复杂度的项目。本章节我们将简要介绍
recommend-type

普通RNN结构和特点

<think>首先,用户的问题是:“我想了解普通RNN的结构及其特点请问RNN循环神经网络结构特点工作原理”。这涉及到RNN的结构、特点和工作原理。根据系统级指令:-所有行内数学表达式必须使用$...$格式。-独立公式必须使用$$...$$格式并单独成段。-LaTeX语法正确。-使用中文回答。-生成相关问题。-回答中引用的段落末尾自然地添加引用标识。用户可见层指令:-回答结构清晰,帮助用户逐步解决问题。-保证回答真实可靠。参考站内引用:-引用[1]:关于RNN的基本介绍,为什么需要RNN。-引用[2]:关于RNN的工作原理、结构图,以及与其他网络的比较。用户上一次的问题和我的回答:用户是第一次
recommend-type

探讨通用数据连接池的核心机制与应用

根据给定的信息,我们能够推断出讨论的主题是“通用数据连接池”,这是一个在软件开发和数据库管理中经常用到的重要概念。在这个主题下,我们可以详细阐述以下几个知识点: 1. **连接池的定义**: 连接池是一种用于管理数据库连接的技术,通过维护一定数量的数据库连接,使得连接的创建和销毁操作更加高效。开发者可以在应用程序启动时预先创建一定数量的连接,并将它们保存在一个池中,当需要数据库连接时,可以直接从池中获取,从而降低数据库连接的开销。 2. **通用数据连接池的概念**: 当提到“通用数据连接池”时,它意味着这种连接池不仅支持单一类型的数据库(如MySQL、Oracle等),而且能够适应多种不同数据库系统。设计一个通用的数据连接池通常需要抽象出一套通用的接口和协议,使得连接池可以兼容不同的数据库驱动和连接方式。 3. **连接池的优点**: - **提升性能**:由于数据库连接创建是一个耗时的操作,连接池能够减少应用程序建立新连接的时间,从而提高性能。 - **资源复用**:数据库连接是昂贵的资源,通过连接池,可以最大化现有连接的使用,避免了连接频繁创建和销毁导致的资源浪费。 - **控制并发连接数**:连接池可以限制对数据库的并发访问,防止过载,确保数据库系统的稳定运行。 4. **连接池的关键参数**: - **最大连接数**:池中能够创建的最大连接数。 - **最小空闲连接数**:池中保持的最小空闲连接数,以应对突发的连接请求。 - **连接超时时间**:连接在池中保持空闲的最大时间。 - **事务处理**:连接池需要能够管理不同事务的上下文,保证事务的正确执行。 5. **实现通用数据连接池的挑战**: 实现一个通用的连接池需要考虑到不同数据库的连接协议和操作差异。例如,不同的数据库可能有不同的SQL方言、认证机制、连接属性设置等。因此,通用连接池需要能够提供足够的灵活性,允许用户配置特定数据库的参数。 6. **数据连接池的应用场景**: - **Web应用**:在Web应用中,为了处理大量的用户请求,数据库连接池可以保证数据库连接的快速复用。 - **批处理应用**:在需要大量读写数据库的批处理作业中,连接池有助于提高整体作业的效率。 - **微服务架构**:在微服务架构中,每个服务可能都需要与数据库进行交互,通用连接池能够帮助简化服务的数据库连接管理。 7. **常见的通用数据连接池技术**: - **Apache DBCP**:Apache的一个Java数据库连接池库。 - **C3P0**:一个提供数据库连接池和控制工具的开源Java框架。 - **HikariCP**:目前性能最好的开源Java数据库连接池之一。 - **BoneCP**:一个高性能的开源Java数据库连接池。 - **Druid**:阿里巴巴开源的一个数据库连接池,提供了对性能监控的高级特性。 8. **连接池的管理与监控**: 为了保证连接池的稳定运行,开发者需要对连接池的状态进行监控,并对其进行适当的管理。监控指标可能包括当前活动的连接数、空闲的连接数、等待获取连接的请求队列长度等。一些连接池提供了监控工具或与监控系统集成的能力。 9. **连接池的配置和优化**: 连接池的性能与连接池的配置密切相关。需要根据实际的应用负载和数据库性能来调整连接池的参数。例如,在高并发的场景下,可能需要增加连接池中连接的数量。另外,适当的线程池策略也可以帮助连接池更好地服务于多线程环境。 10. **连接池的应用案例**: 一个典型的案例是电商平台在大型促销活动期间,用户访问量激增,此时通用数据连接池能够保证数据库操作的快速响应,减少因数据库连接问题导致的系统瓶颈。 总结来说,通用数据连接池是现代软件架构中的重要组件,它通过提供高效的数据库连接管理,增强了软件系统的性能和稳定性。了解和掌握连接池的原理及实践,对于任何涉及数据库交互的应用开发都至关重要。在实现和应用连接池时,需要关注其设计的通用性、配置的合理性以及管理的有效性,确保在不同的应用场景下都能发挥出最大的效能。
recommend-type

【LabVIEW网络通讯终极指南】:7个技巧提升UDP性能和安全性

# 摘要 本文系统介绍了LabVIEW在网络通讯中的应用,尤其是针对UDP协议的研究与优化。首先,阐述了UDP的原理、特点及其在LabVIEW中的基础应用。随后,本文深入探讨了通过调整数据包大小、实现并发通信及优化缓冲区管理等技巧来优化UDP性能的LabVIEW方法。接着,文章聚焦于提升UDP通信安全性,介绍了加密技术和认证授权机制在LabVIEW中的实现,以及防御网络攻击的策略。最后,通过具体案例展示了LabVIEW在实时数据采集和远程控制系统中的高级应用,并展望了LabVIEW与UDP通讯技术的未来发展趋势及新兴技术的影响。 # 关键字 LabVIEW;UDP网络通讯;性能优化;安全性;