活动介绍

device = torch.device("cuda" if torch.cuda.is_available() else "cpu") os.makedirs("output", exist_ok=True) os.makedirs("checkpoints", exist_ok=True) # Get data configuration data_config = parse_data_config(opt.data_config) train_path = data_config["train"] valid_path = data_config["valid"] class_names = load_classes(data_config["names"]) # Initiate model model = Darknet(opt.model_def).to(device) model.apply(weights_init_normal) 让我详细的理解每一行

时间: 2025-07-14 07:51:18 浏览: 9
<think>好的,我现在需要详细理解用户提供的这段代码。首先,用户希望逐行解释代码的功能和作用。代码看起来是关于PyTorch深度学习模型训练的初始化部分,可能涉及到设备选择、目录创建、数据配置和模型初始化。 首先,第一行是`device = torch.device("cuda" if torch.cuda.is_available() else "cpu")`。这里的作用是检查是否有可用的CUDA GPU,如果有的话使用GPU进行计算,否则使用CPU。这在深度学习训练中非常常见,因为GPU可以加速计算。但需要确认是否正确处理了不同环境下的设备选择,比如当有多个GPU时是否默认选择第一个,或者是否需要更明确的指定。 接下来是`os.makedirs("output", exist_ok=True)`和`os.makedirs("checkpoints", exist_ok=True)`。这两行创建了两个目录,output和checkpoints,用于保存训练过程中的输出文件和模型检查点。exist_ok参数设置为True可以避免目录已存在时抛出异常。需要确认是否有必要处理创建目录时的其他潜在错误,比如权限问题,但通常exist_ok=True已经足够。 然后是数据配置部分:`data_config = parse_data_config(opt.data_config)`。这里调用了parse_data_config函数,并传入opt.data_config作为参数,可能是解析数据配置文件的路径。用户需要知道parse_data_config的具体实现,但假设它返回一个包含训练、验证数据和类别名称路径的字典。接着从data_config中获取train_path、valid_path和class_names。这里需要确认这些路径是否正确,以及load_classes函数如何加载类别名称,比如从文本文件逐行读取。 接下来是模型初始化部分:`model = Darknet(opt.model_def).to(device)`。Darknet可能是一个自定义的模型类,继承自PyTorch的Module,opt.model_def是模型定义的配置文件路径。.to(device)将模型移动到之前选择的设备(GPU或CPU)。需要确认Darknet类的实现是否正确,特别是是否支持所选设备。然后应用权重初始化`model.apply(weights_init_normal)`,这里假设weights_init_normal是一个函数,用于初始化模型的权重,比如使用正态分布。需要确认该初始化函数是否适用于所有层类型,比如卷积层和批归一化层可能有不同的初始化方式。 用户提供的代码可能是一个训练脚本的初始化部分,可能还有后续的训练循环、数据加载等步骤。需要检查是否存在潜在问题,比如是否在数据加载部分正确使用了train_path和valid_path,模型是否在训练前正确初始化,以及设备是否一致(比如数据是否也移动到了相应设备)。 可能需要注意的地方包括:是否在训练循环中将输入数据也移动到了device上,checkpoints目录是否正确保存和加载模型状态,以及输出目录是否用于保存日志或结果。此外,异常处理是否足够,比如在创建目录时是否有权限问题,或者数据配置文件是否存在等问题。 总结来说,这段代码负责设置训练环境(设备选择、目录创建)、加载数据配置和初始化模型。需要确保各部分正确协同工作,比如模型在GPU上训练时数据也在GPU上,目录存在且可写,数据路径正确等。如果用户能提供更多上下文或相关函数的具体实现(如parse_data_config、Darknet类、weights_init_normal),可以进一步验证这些部分的正确性。</think>以下是逐行详细解析代码功能的说明: --- ### **1. 设备选择** ```python device = torch.device("cuda" if torch.cuda.is_available() else "cpu") ``` - **功能**:自动选择计算设备(GPU或CPU)。 - **解析**: - `torch.cuda.is_available()` 检测当前环境是否支持CUDA(即是否有NVIDIA GPU且驱动、CUDA库已正确安装)。 - 如果支持,`device` 被设置为 `"cuda"`(使用GPU加速);否则设置为 `"cpu"`(仅使用CPU)。 - **注意**: - GPU能显著加速深度学习模型训练。 - 多GPU环境下默认使用第一个GPU,可通过 `"cuda:0"`、`"cuda:1"` 等指定具体设备。 --- ### **2. 创建输出目录** ```python os.makedirs("output", exist_ok=True) os.makedirs("checkpoints", exist_ok=True) ``` - **功能**:创建存储训练输出和模型检查点的目录。 - **参数解析**: - `"output"`:保存训练过程中的日志、可视化结果等。 - `"checkpoints"`:保存训练过程中定期备份的模型权重。 - `exist_ok=True`:若目录已存在则跳过,避免报错。 - **注意**: - 若目录权限不足可能导致创建失败,需确保程序有写入权限。 --- ### **3. 加载数据配置** ```python data_config = parse_data_config(opt.data_config) train_path = data_config["train"] valid_path = data_config["valid"] class_names = load_classes(data_config["names"]) ``` - **功能**:解析数据配置文件,获取训练集/验证集路径及类别名称。 - **逐行解析**: - **`parse_data_config(opt.data_config)`**: - 假设 `opt.data_config` 是数据配置文件的路径(如 `data.yaml`)。 - `parse_data_config` 是自定义函数,解析文件内容并返回字典。示例文件内容可能为: ```yaml train: ./data/train.txt valid: ./data/valid.txt names: ./data/classes.names ``` - **`train_path` 和 `valid_path`**: - 分别指向训练集和验证集的文件列表(如每行包含图像路径和标签)。 - **`load_classes(data_config["names"])`**: - 假设 `data_config["names"]` 是类别名称文件路径(如 `classes.names`)。 - `load_classes` 是自定义函数,从文本文件中逐行读取类别名称。示例文件内容: ``` cat dog person ``` --- ### **4. 初始化模型** ```python model = Darknet(opt.model_def).to(device) model.apply(weights_init_normal) ``` - **功能**:构建Darknet模型并初始化权重。 - **逐行解析**: - **`Darknet(opt.model_def)`**: - `Darknet` 是自定义的PyTorch模型类,通常用于实现YOLO系列网络。 - `opt.model_def` 是模型定义的配置文件路径(如 `yolov3.cfg`),包含网络层结构、参数等。 - **`.to(device)`**: - 将模型移动到指定设备(GPU或CPU),确保计算在选定设备上进行。 - **`model.apply(weights_init_normal)`**: - `apply()` 方法递归地对所有子模块应用函数 `weights_init_normal`。 - `weights_init_normal` 是自定义的权重初始化函数,通常对卷积层使用正态分布初始化。示例实现: ```python def weights_init_normal(m): if isinstance(m, nn.Conv2d): nn.init.normal_(m.weight, mean=0.0, std=0.02) if m.bias is not None: nn.init.constant_(m.bias, 0.0) elif isinstance(m, nn.BatchNorm2d): nn.init.normal_(m.weight, mean=1.0, std=0.02) nn.init.constant_(m.bias, 0.0) ``` --- ### **完整流程总结** 1. **选择计算设备**:优先使用GPU加速训练。 2. **创建存储目录**:确保训练输出和模型检查点有保存位置。 3. **加载数据配置**:获取数据集路径和类别信息,为后续数据加载做准备。 4. **构建并初始化模型**:根据配置文件定义网络结构,初始化权重以提升训练稳定性。 --- ### **潜在问题与改进建议** 1. **多GPU支持**: - 若需使用多GPU训练,可改用 `torch.nn.DataParallel(model)`。 2. **目录创建安全性**: - 添加异常处理,防止因权限不足导致程序终止: ```python try: os.makedirs("output", exist_ok=True) except PermissionError: print("Error: No write permission for 'output' directory.") ``` 3. **配置文件校验**: - 检查 `opt.data_config` 和 `opt.model_def` 文件是否存在,避免路径错误。 4. **权重初始化兼容性**: - 确保 `weights_init_normal` 覆盖所有层类型(如LSTM、Transformer等特殊层)。 --- ### **典型应用场景** ```python # 假设已定义数据加载器 train_loader 和 valid_loader optimizer = torch.optim.Adam(model.parameters()) for epoch in range(opt.epochs): for batch_idx, (inputs, targets) in enumerate(train_loader): inputs = inputs.to(device) # 数据移动到设备 targets = targets.to(device) outputs = model(inputs) loss = compute_loss(outputs, targets) loss.backward() optimizer.step() # 定期保存检查点 if batch_idx % 100 == 0: torch.save(model.state_dict(), f"checkpoints/model_epoch{epoch}_batch{batch_idx}.pth") ``` 这段代码是深度学习训练流程的初始化环节,为后续训练循环、数据加载、损失计算等步骤奠定基础。
阅读全文

相关推荐

import torch import os import torch.optim as optim from torch.utils.data import DataLoader from tqdm import tqdm import matplotlib.pyplot as plt import numpy as np from UNet_3Plus import UNet_3Plus_DeepSup from dataset import NumpyDataset from torch.amp import GradScaler,autocast # 数据路径 samples_dir = r'D:\桌面\U_Net3\开源数据集训练CODE\训练集\noisy_npy' labels_dir = r'D:\桌面\U_Net3\开源数据集训练CODE\训练集\normalized_npy' # test_samples_dir = r'D:\桌面\U_Net3\开源数据集训练CODE\测试集2\noisy_npy' # test_labels_dir = r'D:\桌面\U_Net3\开源数据集训练CODE\测试集2\normalized_npy' # 创建完整数据集 train_dataset = NumpyDataset(samples_dir, labels_dir) # test_dataset = NumpyDataset(test_samples_dir, test_labels_dir) # 创建 DataLoader train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) # test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False) test = np.load(r"D:\桌面\U_Net3\开源数据集训练CODE\data\noisy_data.npy") test = torch.tensor(test, dtype=torch.float32) test = test.unsqueeze(0).unsqueeze(0) clean = np.load(r"D:\桌面\U_Net3\开源数据集训练CODE\data\clean_data.npy") clean = torch.tensor(clean, dtype=torch.float32) clean = clean.unsqueeze(0).unsqueeze(0) # 初始化网络 model = UNet_3Plus_DeepSup(in_channels=1, n_classes=1) # 选择优化器 optimizer = optim.Adam(model.parameters(), lr=1e-4) # 设置损失函数 criterion_mse = torch.nn.MSELoss() # 训练参数 num_epochs = 500 # AMP 训练 scaler = GradScaler() # 初始化保存训练和测试损失、SNR的变量 train_losses = [] test_losses = [] snr_values = [] # 初始化模型保存文件夹 model_dir = "U_Net3+_model_3_25" os.makedirs(model_dir, exist_ok=True) # 计算信噪比(SNR)的函数 def calculate_snr(predictions, targets): noise = predictions - targets signal_power = torch.mean(targets**2) noise_power = torch.mean(noise**2) return 10 * torch.log10(signal_power / noise_power) # 将模型和数据迁移到设备(GPU 或 CPU) device_type = "cuda" if torch.cuda.is_available() else "cpu" device = torch.device(device_type) model = model.to(device) # 训练和测试循环 for epoch in range(num_epochs): model.train() train_loss = 0.0 for inputs, targ

# -*- coding: gbk -*- import cv2 import torch import numpy as np import os def enhance_depth_estimation(img_path, mask_path): # 加载模型 midas = torch.hub.load("/home/lichuang/project/Opencv-main/sam_yolo/MiDaS/", "DPT_Large", source='local') device = torch.device("cuda" if torch.cuda.is_available() else "cpu") midas.to(device) midas.eval() # 预处理 img = cv2.imread(img_path) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) / 255.0 mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE) # 确保 mask 中的值为 0 和 1,白色部分为 1 mask = (mask > 0).astype(np.uint8) # 应用 mask img_masked = cv2.bitwise_and(img, img, mask=mask) # 转换为模型输入格式,使用图像原始尺寸 input_batch = torch.from_numpy(img_masked).permute(2, 0, 1).unsqueeze(0).float().to(device) # 不再进行插值到 (384, 384) # 深度预测 with torch.no_grad(): prediction = midas(input_batch) prediction = torch.nn.functional.interpolate( prediction.unsqueeze(1), size=img.shape[:2], # 使用图像原始尺寸 mode="bicubic", align_corners=False ).squeeze().cpu().numpy() # 边缘修复 depth_map = (prediction - prediction.min()) / (prediction.max() - prediction.min()) inpainted_depth = cv2.inpaint((depth_map * 255).astype(np.uint8), 255 - mask, 3, cv2.INPAINT_TELEA) return inpainted_depth def process_folders(segmented_folder, mask_folder, output_folder): # 确保输出文件夹存在 os.makedirs(output_folder, exist_ok=True) # 递归遍历分割图文件夹 for root, _, files in os.walk(segmented_folder): for filename in files: if filename.endswith(('.png', '.jpg', '.jpeg')): img_path = os.path.join(root, filename) # 计算相对路径 relative_path = os.path.relpath(root, segmented_folder) mask_path = os.path.join(mask_folder, relative_path, filename) # 检查对应的掩码文件是否存在 if not os.path.exists(mask_path): print(f"未找到对应的掩码文件: {mask_path},跳过 {img_path}。") continue # 生成深度图 depth_map = enhance_depth_estimation(img_path, mask_path) # 构建输出路径,保持子文件夹结构 output_subfolder = os.path.join(output_folder, relative_path) os.makedirs(output_subfolder, exist_ok=True) output_path = os.path.join(output_subfolder, filename) # 保存深度图 cv2.imwrite(output_path, depth_map) print(f"已为 {img_path} 生成深度图并保存到 {output_path}。") if __name__ == "__main__": segmented_folder = "/home/lichuang/project/Opencv-main/sam_yolo/dataset/input_images/" # 替换为实际的分割图文件夹路径 mask_folder = "/home/lichuang/project/Opencv-main/sam_yolo/dataset/masks/" # 替换为实际的掩码文件夹路径 output_folder = "/home/lichuang/project/Opencv-main/sam_yolo/dataset/depth_maps1/" # 替换为实际的输出深度图文件夹路径 process_folders(segmented_folder, mask_folder, output_folder)修改深度图的生成代码,尝试解决此问题

import cv2 import torch import numpy as np from pathlib import Path from models.fgd_net import FGDNet # 确保模型定义可访问 class SimpleDeploy: def __init__(self, weight_path="weights/best_model.pth"): self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.model = FGDNet().to(self.device) self.model.load_state_dict(torch.load(weight_path, map_location=self.device)) self.model.eval() # 切换到推理模式 @staticmethod def preprocess(img_path, target_size=(1024, 1024)): """ 图像预处理:灰度化 -> 归一化 -> 张量转换 """ img = cv2.imread(str(img_path), cv2.IMREAD_GRAYSCALE) img = cv2.resize(img, target_size) # 确保与训练尺寸一致 tensor = torch.from_numpy(img.astype(np.float32) / 255.0) return tensor.unsqueeze(0).unsqueeze(0) # (1,1,H,W) def process_pair(self, ir_path, fused_path, output_dir="output"): """ 处理单对图像 """ # 加载数据 ir_tensor = self.preprocess(ir_path).to(self.device) fused_tensor = self.preprocess(fused_path).to(self.device) # 生成引导特征 with torch.no_grad(): guide_feat = self.model.enc2(self.model.enc1(fused_tensor)) # 执行推理 output = self.model(ir_tensor, guide_feat) # 后处理与保存 output_np = output.squeeze().detach().cpu().numpy() * 255 output_np = np.clip(output_np, 0, 255).astype(np.uint8) # 确保像素值在 [0, 255] 范围内 output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) # 确保输出目录存在 output_path = output_dir / f"result_{Path(ir_path).name}" try: cv2.imwrite(str(output_path), output_np) print(f"处理完成 ➜ {output_path}") except Exception as e: print(f"写入图像时出错: {e}") if __name__ == "__main__": deploy = SimpleDeploy() # 自动处理input目录下所有匹配文件 input_dir = Path("input") for ir_file in input_dir.glob("ir_*.jpg"): fused_file = input_dir / f"fused_{ir_file.stem.split('_')[-1]}.jpg" if

import argparse import torch.backends.cudnn as cudnn from models.experimental import * from utils.datasets import * from utils.utils import * def detect(save_img=False): out, source, weights, view_img, save_txt, imgsz = \ opt.output, opt.source, opt.weights, opt.view_img, opt.save_txt, opt.img_size webcam = source == '0' or source.startswith('rtsp') or source.startswith('http') or source.endswith('.txt') # Initialize device = torch_utils.select_device(opt.device) if os.path.exists(out): shutil.rmtree(out) # delete output folder os.makedirs(out) # make new output folder half = device.type != 'cpu' # half precision only supported on CUDA # Load model model = attempt_load(weights, map_location=device) # load FP32 model imgsz = check_img_size(imgsz, s=model.stride.max()) # check img_size if half: model.half() # to FP16 # Second-stage classifier classify = False if classify: modelc = torch_utils.load_classifier(name='resnet101', n=2) # initialize modelc.load_state_dict(torch.load('weights/resnet101.pt', map_location=device)['model']) # load weights modelc.to(device).eval() # Set Dataloader vid_path, vid_writer = None, None if webcam: view_img = True cudnn.benchmark = True # set True to speed up constant image size inference dataset = LoadStreams(source, img_size=imgsz) else: save_img = True dataset = LoadImages(source, img_size=imgsz) # Get names and colors names = model.module.names if hasattr(model, 'module') else model.names colors = [[random.randint(0, 255) for _ in range(3)] for _ in range(len(names))] # Run inference t0 = time.time() img = torch.zeros((1, 3, imgsz, imgsz), device=device) # init img _ = model(img.half() if half else img) if device.type != 'cpu' else None # run once for path, img, im0s, vid_cap in dataset: img = torch.from_numpy(img).to(device) img = img.half() if half els

import os import torch import transformers from transformers import ( AutoModelForCausalLM, AutoTokenizer, TrainingArguments, DataCollatorForLanguageModeling, BitsAndBytesConfig, Trainer ) from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training from datasets import load_dataset import logging import psutil import gc from datetime import datetime # === 配置区域 === MODEL_NAME = "/home/vipuser/ai_writer_project_final_with_fixed_output_ui/models/Yi-6B" DATASET_PATH = "./data/train_lora_formatted.jsonl" OUTPUT_DIR = "./yi6b-lora-optimized" DEVICE_MAP = "auto" # 使用自动设备映射 # 确保输出目录存在 os.makedirs(OUTPUT_DIR, exist_ok=True) # === 内存优化配置 === os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" # 减少内存碎片 torch.backends.cuda.cufft_plan_cache.clear() # 清理CUDA缓存 # === 增强的日志系统 === def setup_logging(output_dir): """配置日志系统,支持文件和TensorBoard""" logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # 文件日志处理器 file_handler = logging.FileHandler(os.path.join(output_dir, "training.log")) file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logger.addHandler(file_handler) # 控制台日志处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(console_handler) # TensorBoard日志目录 tensorboard_log_dir = os.path.join(output_dir, "logs", datetime.now().strftime("%Y%m%d-%H%M%S")) os.makedirs(tensorboard_log_dir, exist_ok=True) # 安装TensorBoard回调 tb_writer = None try: from torch.utils.tensorboard import SummaryWriter tb_writer = SummaryWriter(log_dir=tensorboard_log_dir) logger.info(f"TensorBoard日志目录: {tensorboard_log_dir}") except ImportError: logger.warning("TensorBoard未安装,可视化功能不可用") return logger, tb_writer logger, tb_writer = setup_logging(OUTPUT_DIR) # === 量化配置 - 使用更高效的配置 === quant_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True, ) # === 加载模型 === logger.info("加载预训练模型...") model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, device_map=DEVICE_MAP, quantization_config=quant_config, torch_dtype=torch.bfloat16, trust_remote_code=True, attn_implementation="flash_attention_2" # 使用FlashAttention优化内存 ) # === 分词器处理 === logger.info("加载分词器...") tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True) tokenizer.padding_side = "right" if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token tokenizer.pad_token_id = tokenizer.eos_token_id # === 准备模型训练 === model = prepare_model_for_kbit_training( model, use_gradient_checkpointing=True # 启用梯度检查点以节省内存 ) # === LoRA 配置 - 优化内存使用 === logger.info("配置LoRA...") lora_config = LoraConfig( r=64, # 降低rank以减少内存使用 lora_alpha=32, # 降低alpha值 target_modules=["q_proj", "v_proj"], # 减少目标模块 lora_dropout=0.05, bias="none", task_type="CAUSAL_LM" ) model = get_peft_model(model, lora_config) # 记录可训练参数 trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad) total_params = sum(p.numel() for p in model.parameters()) logger.info(f"可训练参数: {trainable_params:,} / 总参数: {total_params:,} ({trainable_params / total_params:.2%})") # === 加载并预处理数据集 === logger.info("加载和预处理数据集...") dataset = load_dataset("json", data_files=DATASET_PATH, split="train") # 文本过滤函数 def is_valid_text(example): text = example.get("text", "") return text is not None and len(text.strip()) > 200 # 增加最小长度要求 dataset = dataset.filter(is_valid_text) logger.info(f"过滤后数据集大小: {len(dataset)} 条") # 动态填充的分词函数 - 节省内存 def tokenize_function(examples): tokenized = tokenizer( examples["text"], padding=True, # 使用动态填充 truncation=True, max_length=1024, # 降低上下文长度以减少内存使用 ) # 创建 labels - 因果语言建模需要 labels = input_ids tokenized["labels"] = tokenized["input_ids"].copy() return tokenized tokenized_dataset = dataset.map( tokenize_function, batched=True, remove_columns=["text"], batch_size=64, # 降低批处理大小以减少内存峰值 num_proc=4, # 减少进程数以降低内存开销 ) # === 数据整理器 === data_collator = DataCollatorForLanguageModeling( tokenizer=tokenizer, mlm=False # 因果语言建模 ) # === 训练参数 - 优化内存使用 === report_to_list = ["tensorboard"] if tb_writer else [] training_args = TrainingArguments( output_dir=OUTPUT_DIR, per_device_train_batch_size=4, # 大幅降低批次大小 gradient_accumulation_steps=4, # 增加梯度累积步数以保持有效批次大小 learning_rate=2e-5, num_train_epochs=3, logging_steps=50, save_strategy="steps", save_steps=500, bf16=True, optim="paged_adamw_32bit", report_to=report_to_list, warmup_ratio=0.05, gradient_checkpointing=True, # 启用梯度检查点 fp16=False, max_grad_norm=0.3, # 降低梯度裁剪阈值 remove_unused_columns=True, # 移除未使用的列以节省内存 dataloader_num_workers=4, # 减少数据加载工作线程 evaluation_strategy="steps", eval_steps=500, save_total_limit=2, # 减少保存的检查点数量 logging_dir=os.path.join(OUTPUT_DIR, "logs"), load_best_model_at_end=True, ddp_find_unused_parameters=False, logging_first_step=True, group_by_length=True, lr_scheduler_type="cosine", weight_decay=0.01, ) # === GPU监控工具 === def monitor_gpu(): """监控GPU使用情况""" if torch.cuda.is_available(): device = torch.device("cuda") mem_alloc = torch.cuda.memory_allocated(device) / 1024 ** 3 mem_reserved = torch.cuda.memory_reserved(device) / 1024 ** 3 mem_total = torch.cuda.get_device_properties(device).total_memory / 1024 ** 3 return { "allocated": f"{mem_alloc:.2f} GB", "reserved": f"{mem_reserved:.2f} GB", "total": f"{mem_total:.2f} GB", "utilization": f"{mem_alloc / mem_total * 100:.1f}%" } return {} # === 创建训练器 === eval_dataset = None if len(tokenized_dataset) > 100: eval_dataset = tokenized_dataset.select(range(100)) trainer = Trainer( model=model, tokenizer=tokenizer, args=training_args, train_dataset=tokenized_dataset, eval_dataset=eval_dataset, data_collator=data_collator, ) # === 训练前验证 === def validate_data_and_model(): """验证数据和模型是否准备好训练""" logger.info("\n=== 训练前验证 ===") # 检查样本格式 sample = tokenized_dataset[0] logger.info(f"样本键: {list(sample.keys())}") logger.info(f"input_ids 长度: {len(sample['input_ids'])}") # 创建单个样本测试批次 test_batch = data_collator([sample]) # 移动数据到设备 test_batch = {k: v.to(model.device) for k, v in test_batch.items()} # 前向传播测试 model.train() outputs = model(**test_batch) loss_value = outputs.loss.item() logger.info(f"测试批次损失: {loss_value:.4f}") # 记录到TensorBoard if tb_writer: tb_writer.add_scalar("debug/test_loss", loss_value, 0) # 反向传播测试 outputs.loss.backward() logger.info("反向传播成功!") # 重置梯度 model.zero_grad() logger.info("验证完成,准备开始训练\n") # 记录初始GPU使用情况 gpu_status = monitor_gpu() logger.info(f"初始GPU状态: {gpu_status}") # 记录到TensorBoard if tb_writer: tb_writer.add_text("system/initial_gpu", str(gpu_status), 0) validate_data_and_model() # === 自定义回调 - 监控资源使用 === class ResourceMonitorCallback(transformers.TrainerCallback): def __init__(self, tb_writer=None): self.tb_writer = tb_writer self.start_time = datetime.now() self.last_log_time = datetime.now() def on_step_end(self, args, state, control, **kwargs): current_time = datetime.now() time_diff = (current_time - self.last_log_time).total_seconds() # 每分钟记录一次资源使用情况 if time_diff > 60: self.last_log_time = current_time # GPU监控 gpu_status = monitor_gpu() logger.info(f"Step {state.global_step} - GPU状态: {gpu_status}") # CPU和内存监控 cpu_percent = psutil.cpu_percent() mem = psutil.virtual_memory() logger.info( f"CPU使用率: {cpu_percent}%, 内存使用: {mem.used / 1024 ** 3:.2f}GB/{mem.total / 1024 ** 3:.2f}GB") # 记录到TensorBoard if self.tb_writer: # GPU显存使用 if torch.cuda.is_available(): device = torch.device("cuda") mem_alloc = torch.cuda.memory_allocated(device) / 1024 ** 3 self.tb_writer.add_scalar("system/gpu_mem", mem_alloc, state.global_step) # CPU使用率 self.tb_writer.add_scalar("system/cpu_usage", cpu_percent, state.global_step) # 系统内存使用 self.tb_writer.add_scalar("system/ram_usage", mem.used / 1024 ** 3, state.global_step) def on_log(self, args, state, control, logs=None, **kwargs): """记录训练指标到TensorBoard""" if self.tb_writer and logs is not None: for metric_name, metric_value in logs.items(): if "loss" in metric_name or "lr" in metric_name or "grad_norm" in metric_name: self.tb_writer.add_scalar(f"train/{metric_name}", metric_value, state.global_step) def on_train_end(self, args, state, control, **kwargs): """训练结束时记录总时间""" training_time = datetime.now() - self.start_time logger.info(f"训练总时间: {training_time}") if self.tb_writer: self.tb_writer.add_text("system/total_time", str(training_time)) # 添加回调 trainer.add_callback(ResourceMonitorCallback(tb_writer=tb_writer)) # === 内存清理函数 === def clear_memory(): """清理内存和GPU缓存""" gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.ipc_collect() logger.info("内存清理完成") # === 启动训练 === try: logger.info("开始训练...") # 分阶段训练以减少内存峰值 num_samples = len(tokenized_dataset) chunk_size = 1000 # 每次处理1000个样本 for i in range(0, num_samples, chunk_size): end_idx = min(i + chunk_size, num_samples) logger.info(f"训练样本 {i} 到 {end_idx - 1} / {num_samples}") # 创建子数据集 chunk_dataset = tokenized_dataset.select(range(i, end_idx)) # 更新训练器 trainer.train_dataset = chunk_dataset # 训练当前块 trainer.train() # 清理内存 clear_memory() # 保存训练指标 metrics = trainer.evaluate() trainer.log_metrics("train", metrics) trainer.save_metrics("train", metrics) # 保存最佳模型 trainer.save_model(OUTPUT_DIR) tokenizer.save_pretrained(OUTPUT_DIR) logger.info(f"训练完成! 模型保存在: {OUTPUT_DIR}") # 记录最终指标到TensorBoard if tb_writer: for metric_name, metric_value in metrics.items(): tb_writer.add_scalar(f"final/{metric_name}", metric_value) tb_writer.close() except Exception as e: logger.error(f"训练出错: {e}") import traceback logger.error(traceback.format_exc()) # 尝试更小批量训练 logger.info("\n尝试更小批量训练...") small_dataset = tokenized_dataset.select(range(50)) trainer.train_dataset = small_dataset trainer.train() # 保存模型 trainer.save_model(f"{OUTPUT_DIR}_small") tokenizer.save_pretrained(f"{OUTPUT_DIR}_small") logger.info(f"小批量训练完成! 模型保存在: {OUTPUT_DIR}_small") # 记录错误到TensorBoard if tb_writer: tb_writer.add_text("error/exception", traceback.format_exc()) # 清理内存 clear_memory() # === 训练后验证 === def validate_final_model(): """验证训练后的模型""" logger.info("\n=== 训练后验证 ===") # 加载保存的模型 from peft import PeftModel # 仅加载基础模型配置 base_model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, device_map=DEVICE_MAP, quantization_config=quant_config, torch_dtype=torch.bfloat16, trust_remote_code=True, load_in_4bit=True ) # 加载LoRA适配器 peft_model = PeftModel.from_pretrained(base_model, OUTPUT_DIR) # 不再合并LoRA权重,直接使用 peft_model 推理 peft_model.eval() # 测试生成 prompt = "中国的首都是" inputs = tokenizer(prompt, return_tensors="pt").to(peft_model.device) outputs = peft_model.generate( **inputs, max_new_tokens=50, # 减少生成长度 temperature=0.7, top_p=0.9, repetition_penalty=1.2, do_sample=True ) generated = tokenizer.decode(outputs[0], skip_special_tokens=True) logger.info(f"提示: {prompt}") logger.info(f"生成结果: {generated}") # 记录到TensorBoard if tb_writer: tb_writer.add_text("validation/sample", f"提示: {prompt}\n生成: {generated}") # 更全面的测试 test_prompts = [ "人工智能的未来发展趋势是", "如何学习深度学习?", "写一个关于太空探索的短故事:" ] for i, test_prompt in enumerate(test_prompts): inputs = tokenizer(test_prompt, return_tensors="pt").to(peft_model.device) outputs = peft_model.generate( **inputs, max_new_tokens=100, # 减少生成长度 temperature=0.7, top_p=0.9, repetition_penalty=1.2, do_sample=True ) generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True) logger.info(f"\n提示: {test_prompt}\n生成: {generated_text}\n{'=' * 50}") # 记录到TensorBoard if tb_writer: tb_writer.add_text(f"validation/test_{i}", f"提示: {test_prompt}\n生成: {generated_text}") logger.info("验证完成") # 执行验证 validate_final_model() # 关闭TensorBoard写入器 if tb_writer: tb_writer.close() logger.info("TensorBoard日志已关闭")

运行import cv2 import torch import numpy as np from basicsr.archs.basicvsr_arch import BasicVSR import os def load_pretrained_model(weight_path): """加载REDS4预训练权重""" if not os.path.exists(weight_path): raise FileNotFoundError(f"权重文件不存在: {weight_path}") model = BasicVSR(num_feat=64, num_block=30) state_dict = torch.load(weight_path)['params'] model.load_state_dict(state_dict) return model.eval() def test_with_pretrained(video_path, weight_path): """使用预训练权重测试6帧视频""" device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 1. 加载预训练模型 model = load_pretrained_model(weight_path).to(device) print("✅ 预训练权重加载成功") # 2. 提取6帧(保持原始分辨率) cap = cv2.VideoCapture(video_path) frames = [] for _ in range(6): ret, frame = cap.read() if not ret: raise ValueError("视频不足6帧!") frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) cap.release() # 3. 准备输入张量 [1,6,3,H,W] input_tensor = torch.stack([ torch.from_numpy(f).permute(2, 0, 1).float() / 255.0 for f in frames ]).unsqueeze(0).to(device) # 4. 执行推理 with torch.no_grad(): output = model(input_tensor) # 5. 保存结果 os.makedirs("output", exist_ok=True) for i in range(6): sr_frame = output[0, i].cpu().numpy() sr_frame = np.clip(sr_frame * 255, 0, 255).astype('uint8') sr_frame = sr_frame.transpose(1, 2, 0) cv2.imwrite(f"output/sr_frame_{i}.png", cv2.cvtColor(sr_frame, cv2.COLOR_RGB2BGR)) print(f"\n🔍 测试结果:") print(f"输入分辨率: {frames[0].shape[1]}x{frames[0].shape[0]}") print(f"输出分辨率: {output.shape[-1]}x{output.shape[-2]}") print(f"超分结果已保存至 /output 目录") if __name__ == "__main__": # 配置路径(根据实际修改) video_path = r"inputs/test_6frames.mp4" # 相对路径 weight_path = r"models/basicvsr_reds4.pth" # 执行测试 test_with_pretrained(video_path, weight_path) 报错KeyError: 'params'

import os import time import torch import torch.nn as nn import torch.optim as optim import pandas as pd import numpy as np from torch.utils.data import Dataset, DataLoader, random_split from torchvision import transforms from PIL import Image from tqdm import tqdm import matplotlib.pyplot as plt os.system("") # 激活ANSI支持 torch.manual_seed(42) np.random.seed(42) # -------------------------- # 1. 数据集定义(保持不变) # -------------------------- class FluidImageDataset(Dataset): def __init__(self, blade_params, pressure_dir, velocity_u_dir, velocity_v_dir, velocity_w_dir, transform=None): self.blade_params = blade_params self.pressure_dir = pressure_dir self.velocity_u_dir = velocity_u_dir self.velocity_v_dir = velocity_v_dir self.velocity_w_dir = velocity_w_dir self.transform = transform or transforms.Compose([ transforms.Resize((256, 256)), transforms.ToTensor(), ]) def __len__(self): return len(self.blade_params) def __getitem__(self, idx): params = torch.tensor(self.blade_params.iloc[idx].values, dtype=torch.float32) img_id = idx + 1 # 简化图像加载 def load_img(path): try: return self.transform(Image.open(path).convert('L')) except: return torch.zeros(1, 256, 256) return {'params': params, 'outputs': { 'pressure': load_img(os.path.join(self.pressure_dir, f"Pressure_{img_id}.jpg")), 'velocity_u': load_img(os.path.join(self.velocity_u_dir, f"Velocity_u_{img_id}.jpg")), 'velocity_v': load_img(os.path.join(self.velocity_v_dir, f"Velocity_v_{img_id}.jpg")), 'velocity_w': load_img(os.path.join(self.velocity_w_dir, f"Velocity_w_{img_id}.jpg")), }} # -------------------------- # 2. 傅里叶KAN层(核心修改:使用傅里叶基函数替代原有非线性函数) # -------------------------- class FourierKANLayer(nn.Module): def __init__(self, input_dim, output_dim, grid_size=16, add_bias=True, smooth_initialization=True): super().__init__() self.input_dim = input_dim self.output_dim = output_dim self.grid_size = grid_size # 傅里叶频率数量 self.add_bias = add_bias # 傅里叶系数初始化(正弦和余弦) # 平滑初始化:高频系数衰减,保证初始函数平滑 grid_norm = (torch.arange(grid_size) + 1) **2 if smooth_initialization else np.sqrt(grid_size) self.fourier_coeffs = nn.Parameter( torch.randn(2, output_dim, input_dim, grid_size) / (np.sqrt(input_dim) * grid_norm) ) # 偏置项 if self.add_bias: self.bias = nn.Parameter(torch.zeros(1, output_dim)) def forward(self, x): # 处理向量数据 (B, input_dim) 或图像数据 (B, C, H, W) original_shape = x.shape if x.dim() == 4: # 图像数据展平为 (B*H*W, C) 便于处理 batch_size, channels, height, width = x.shape x = x.permute(0, 2, 3, 1).reshape(-1, channels) # (B*H*W, C) # 傅里叶基函数计算:cos(kx) 和 sin(kx) k = torch.arange(1, self.grid_size + 1, device=x.device).reshape(1, 1, 1, self.grid_size) # 频率 x_expanded = x.unsqueeze(1).unsqueeze(3) # (B, 1, input_dim, 1) cos_terms = torch.cos(k * x_expanded) # (B, 1, input_dim, grid_size) sin_terms = torch.sin(k * x_expanded) # (B, 1, input_dim, grid_size) # 傅里叶系数加权求和 y = torch.sum(cos_terms * self.fourier_coeffs[0:1], dim=(-2, -1)) # 余弦项贡献 y += torch.sum(sin_terms * self.fourier_coeffs[1:2], dim=(-2, -1)) # 正弦项贡献 # 添加偏置 if self.add_bias: y += self.bias # 恢复图像数据形状 if len(original_shape) == 4: y = y.reshape(batch_size, height, width, self.output_dim).permute(0, 3, 1, 2) # (B, output_dim, H, W) else: y = y.reshape(original_shape[:-1] + (self.output_dim,)) # 向量数据恢复形状 return y class KANFeatureExtractor(nn.Module): def __init__(self, input_dim): super().__init__() self.layers = nn.Sequential( FourierKANLayer(input_dim, 128, grid_size=16), # 傅里叶KAN层 nn.LayerNorm(128), FourierKANLayer(128, 128, grid_size=16), nn.LayerNorm(128), FourierKANLayer(128, 256, grid_size=16), nn.LayerNorm(256), FourierKANLayer(256, 256, grid_size=16) ) def forward(self, x): return self.layers(x) # -------------------------- # 3. U-Net结构(使用傅里叶KAN层替代原有激活函数) # -------------------------- class MiniUNetEncoder(nn.Module): def __init__(self, in_channels=8): super().__init__() self.conv1 = nn.Sequential( nn.Conv2d(in_channels, 16, kernel_size=3, padding=1), nn.GroupNorm(4, 16), FourierKANLayer(16, 16, grid_size=16), # 傅里叶KAN替代ReLU nn.Conv2d(16, 16, kernel_size=3, padding=1), nn.GroupNorm(4, 16), FourierKANLayer(16, 16, grid_size=16) ) self.pool1 = nn.MaxPool2d(2, 2) self.conv2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=3, padding=1), nn.GroupNorm(4, 32), FourierKANLayer(32, 32, grid_size=16), nn.Conv2d(32, 32, kernel_size=3, padding=1), nn.GroupNorm(4, 32), FourierKANLayer(32, 32, grid_size=16) ) self.pool2 = nn.MaxPool2d(2, 2) def forward(self, x): c1 = self.conv1(x) p1 = self.pool1(c1) c2 = self.conv2(p1) p2 = self.pool2(c2) return c1, c2, p2 class MiniUNetDecoder(nn.Module): def __init__(self, out_channels=1): super().__init__() self.up1 = nn.ConvTranspose2d(32, 16, kernel_size=2, stride=2) self.conv3 = nn.Sequential( nn.Conv2d(48, 16, kernel_size=3, padding=1), nn.GroupNorm(4, 16), FourierKANLayer(16, 16, grid_size=16) # 傅里叶KAN替代ReLU ) self.up2 = nn.ConvTranspose2d(16, 8, kernel_size=2, stride=2) self.final_conv = nn.Conv2d(24, out_channels, kernel_size=1) self.sigmoid = nn.Sigmoid() def forward(self, c1, c2, p2): u1 = self.up1(p2) cat1 = torch.cat([u1, c2], dim=1) d1 = self.conv3(cat1) u2 = self.up2(d1) cat2 = torch.cat([u2, c1], dim=1) out = self.final_conv(cat2) return self.sigmoid(out) class MiniUNetPredictor(nn.Module): def __init__(self): super().__init__() self.encoder = MiniUNetEncoder() self.decoder = MiniUNetDecoder() def forward(self, x): c1, c2, p2 = self.encoder(x) return self.decoder(c1, c2, p2) # -------------------------- # 4. 模型组装(保持结构不变,使用傅里叶KAN层) # -------------------------- class LightKANUNetModel(nn.Module): def __init__(self, input_dim): super().__init__() self.kan_feature = KANFeatureExtractor(input_dim) # 特征投影模块(使用傅里叶KAN) self.feature_proj = nn.Sequential( nn.Linear(256, 128), nn.LayerNorm(128), FourierKANLayer(128, 128, grid_size=16), # 傅里叶KAN替代ReLU # 生成初始特征图 nn.Linear(128, 8 * 16 * 16), nn.Unflatten(1, (8, 16, 16)), # 上采样卷积块 nn.Conv2d(8, 16, kernel_size=3, padding=1), nn.GroupNorm(4, 16), FourierKANLayer(16, 16, grid_size=16), nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True), nn.Conv2d(16, 32, kernel_size=3, padding=1), nn.GroupNorm(4, 32), FourierKANLayer(32, 32, grid_size=16), nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True), nn.Conv2d(32, 64, kernel_size=3, padding=1), nn.GroupNorm(4, 64), FourierKANLayer(64, 64, grid_size=16), nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True), nn.Conv2d(64, 32, kernel_size=3, padding=1), nn.GroupNorm(4, 32), FourierKANLayer(32, 32, grid_size=16), nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True), nn.Conv2d(32, 8, kernel_size=3, padding=1), nn.GroupNorm(4, 8), FourierKANLayer(8, 8, grid_size=16) ) self.predictors = nn.ModuleDict({ 'pressure': MiniUNetPredictor(), 'velocity_u': MiniUNetPredictor(), 'velocity_v': MiniUNetPredictor(), 'velocity_w': MiniUNetPredictor() }) def forward(self, x): features = self.kan_feature(x) feat_map = self.feature_proj(features) return {k: self.predictors[k](feat_map) for k in self.predictors} # -------------------------- # 5. 训练与工具函数(保持不变) # -------------------------- def load_data(blade_params_path, data_dir): blade_params = pd.read_excel(blade_params_path).iloc[:, 1:] print(f"叶片参数形状:{blade_params.shape}(样本数×参数数)") dataset = FluidImageDataset( blade_params, os.path.join(data_dir, "Pressure"), os.path.join(data_dir, "Velocity_u"), os.path.join(data_dir, "Velocity_v"), os.path.join(data_dir, "Velocity_w") ) total_size = len(dataset) train_size = int(0.8 * total_size) val_size = int(0.1 * total_size) test_size = total_size - train_size - val_size generator = torch.Generator().manual_seed(42) train_dataset, val_dataset, test_dataset = random_split( dataset, [train_size, val_size, test_size], generator=generator ) return ( DataLoader(train_dataset, batch_size=1, shuffle=True, num_workers=0), DataLoader(val_dataset, batch_size=1, shuffle=False, num_workers=0), DataLoader(test_dataset, batch_size=1, shuffle=False, num_workers=0), blade_params.shape[1] ) def train_model(model, train_loader, criterion, optimizer, device, epochs=3): model.train() if torch.cuda.is_available(): torch.cuda.reset_peak_memory_stats() print(f"初始GPU内存:{torch.cuda.memory_allocated() / 1e6:.1f} MB") for epoch in range(epochs): running_loss = 0.0 pbar = tqdm(train_loader, total=len(train_loader), desc=f"Epoch {epoch + 1}/{epochs}") for batch_idx, data in enumerate(pbar): start_time = time.time() params = data['params'].to(device) true_outputs = {k: v.to(device) for k, v in data['outputs'].items()} optimizer.zero_grad() with torch.amp.autocast('cuda', enabled=torch.cuda.is_available()): pred_outputs = model(params) loss = sum(criterion(pred_outputs[k], true_outputs[k]) for k in pred_outputs) loss.backward() optimizer.step() running_loss += loss.item() batch_time = time.time() - start_time if (batch_idx + 1) % 10 == 0 or batch_idx == len(train_loader) - 1: avg_loss = running_loss / (batch_idx + 1) gpu_mem = torch.cuda.memory_allocated() / 1e6 if torch.cuda.is_available() else 0 pbar.set_postfix({ "批损失": f"{loss.item():.4f}", "平均损失": f"{avg_loss:.4f}", "耗时": f"{batch_time:.2f}s", "GPU内存": f"{gpu_mem:.1f} MB" }) print(f"Epoch {epoch + 1} 完成,平均损失:{running_loss / len(train_loader):.4f}") return model def evaluate_model(model, test_loader, criterion, device): model.eval() metrics = {k: [] for k in ['pressure', 'velocity_u', 'velocity_v', 'velocity_w']} with torch.no_grad(): for data in test_loader: params = data['params'].to(device) true_outputs = {k: v.to(device) for k, v in data['outputs'].items()} pred_outputs = model(params) for k in metrics: metrics[k].append(criterion(pred_outputs[k], true_outputs[k]).item()) avg = {k: sum(v) / len(v) for k, v in metrics.items()} print("\n测试集结果:") print(f"总平均MSE:{sum(avg.values()) / 4:.4f}") for k, v in avg.items(): print(f" {k}:{v:.6f}") return avg def visualize_predictions(model, test_loader, device, num_samples=3): model.eval() save_dir = "KAN_UNet_Results" os.makedirs(save_dir, exist_ok=True) with torch.no_grad(): for i, data in enumerate(test_loader): if i >= num_samples: break params = data['params'].to(device) true = data['outputs'] pred = {k: v.cpu() for k, v in model(params).items()} fig, axes = plt.subplots(1, 9, figsize=(24, 5)) axes[0].text(0.1, 0.5, "参数:\n" + "\n".join( [f"p{j}: {v:.2f}" for j, v in enumerate(params[0].cpu().numpy().round(2))]), fontsize=9, va='center') axes[0].axis('off') for col, k in enumerate(['pressure', 'velocity_u', 'velocity_v', 'velocity_w']): axes[2 * col + 1].imshow(true[k][0].squeeze(), cmap='viridis') axes[2 * col + 1].set_title(f'真实{k}') axes[2 * col + 1].axis('off') axes[2 * col + 2].imshow(pred[k][0].squeeze(), cmap='viridis') axes[2 * col + 2].set_title(f'预测{k}') axes[2 * col + 2].axis('off') plt.tight_layout() plt.savefig(f"{save_dir}/样本_{i + 1}.png", dpi=200) plt.close() print(f"已保存样本 {i + 1} 可视化结果") def main(): save_dir = "KAN_UNet_Results" blade_params_path = r"C:\Users\ZYQ\PycharmProjects\2025.07.09\Data\Blade parameters.xlsx" data_dir = r"C:\Users\ZYQ\PycharmProjects\2025.07.09\Data" device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"使用设备:{device}") train_loader, val_loader, test_loader, input_dim = load_data(blade_params_path, data_dir) print(f"输入维度:{input_dim},训练批次:{len(train_loader)}") model = LightKANUNetModel(input_dim).to(device) total_params = sum(p.numel() for p in model.parameters()) print(f"模型参数量:{total_params / 1e6:.2f} M") criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5) print("\n开始训练...") model = train_model(model, train_loader, criterion, optimizer, device, epochs=3) evaluate_model(model, test_loader, criterion, device) visualize_predictions(model, test_loader, device) torch.save(model.state_dict(), f"{save_dir}/fourier_kan_unet.pth") print(f"模型保存至:{save_dir}/fourier_kan_unet.pth") if __name__ == "__main__": main()

import os import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from sklearn.preprocessing import MinMaxScaler, StandardScaler import matplotlib import matplotlib.pyplot as plt if torch.cuda.is_available(): torch.backends.cudnn.deterministic = True from scipy.io import loadmat from CNN import * from plot_utils import * from thop import profile from sklearn.metrics import precision_score, recall_score, f1_score import warnings warnings.filterwarnings("ignore") matplotlib.use('TkAgg') device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") import os from scipy.io import loadmat import torch # 定义本地路径 model_base_path = r'E:\Anaconda3\CNN\save_fig_model\model' data_base_path = r'E:\Anaconda3\CNN\datasets\CWRU_Noised_0.mat' # 确保文件夹存在 os.makedirs(model_base_path, exist_ok=True) # 定义具体文件路径 model_path = os.path.join(model_base_path, 'CNN_BJTU20(1).pt') data_path = os.path.join(data_base_path, r'E:\Anaconda3\CNN\datasets\CWRU_Noised_0.mat') # 检查文件是否存在 if not os.path.exists(model_path): raise FileNotFoundError(f'模型文件 {model_path} 不存在') if not os.path.exists(data_path): raise FileNotFoundError(f'数据文件 {data_path} 不存在') model = torch.load(model_path) data = loadmat(data_path) num_classes = 10 # 类别 learning_rate = 0.01 # 学习率 batch_size = 100 # 利用训练好的模型 对测试集进行分类 # 提取测试集 x_train1 = data['train_X'] print(x_train1.shape) ss1 = StandardScaler().fit(x_train1) # MinMaxScaler StandardScaler x_test1 = data['test_X'] print(x_test1.shape) y_test = data['test_Y'].argmax(axis=1) y_train = data['train_Y'].argmax(axis=1) x_test1 = ss1.transform(x_test1) x_test1 = x_test1.reshape(-1, 1, 1024) test_features1 = torch.tensor(x_test1).type(torch.FloatTensor) test_labels = torch.tensor(y_test).type(torch.LongTensor) N = test_features1.size(0) # 获取特征数据的样本数量 total_batch = int(np.ceil(N / batch_size)) # 计算总的批次数量,采用向上取整的方式 indices = np.arange(N) # 创建一个包含0到N-1的索引数组 # np.random.shuffle(indices) # 将索引数组随机打乱 for i in range(total_batch): # 循环遍历每个批次 rand_index = indices[batch_size * i: batch_size * (i + 1)] # 从打乱后的索引数组中选择当前批次的索引 features1 = test_features1[rand_index, :] # 根据批次的索引从特征数据中获取对应的批次数据 targets = test_labels[rand_index] # 根据批次的索引从标签数据中获取对应的批次数据 features1 = features1.to(device) # 将特征数据移动到指定的设备上 # 提取特征 y = model(features1) y_fea = model.get_data() # 输出是提取到的特征 y_fea = y_fea.detach().cpu().numpy() # 检查形状 print('y_fea shape before reshape:', y_fea.shape) # 动态计算重塑形状 num_samples = 100 features_per_sample = y_fea.size // num_samples # 检查是否能够整除 if y_fea.size % num_samples != 0: raise ValueError(f"Cannot reshape array of size {y_fea.size} into shape ({num_samples}, -1)") y_fea = y_fea.reshape(num_samples, features_per_sample) print('y_fea shape after reshape:', y_fea.shape) # 获取标签 label = targets.detach().cpu().numpy() # t-sne可视化 t_sne(input_data=y_fea, input_label=label, classes=num_classes) Traceback (most recent call last): File "E:\Anaconda3\CNN\t-sne_CNN.py", line 88, in <module> raise ValueError(f"Cannot reshape array of size {y_fea.size} into shape ({num_samples}, -1)") ValueError: Cannot reshape array of size 3840 into shape (100, -1)

import os import argparse import yaml import torch import torch.nn.functional as F import torch.nn as nn import numpy as np from tqdm import tqdm from natsort import natsorted from glob import glob from skimage import img_as_ubyte import utils from basicsr.models.archs.kbnet_l_arch import KBNet_l try: from yaml import CLoader as Loader except ImportError: from yaml import Loader parser = argparse.ArgumentParser(description='Image Deraining using Restormer') parser.add_argument('--input_dir', default='./Datasets/', type=str, help='Directory of validation images') parser.add_argument('--result_dir', default='./results/', type=str, help='Directory for results') parser.add_argument('--yml', default='Deraining/Options/kbnet_l.yml', type=str) args = parser.parse_args() ####### Load yaml ####### yaml_file = args.yml name = os.path.basename(yaml_file).split('.')[0] x = yaml.load(open(yaml_file, mode='r'), Loader=Loader) s = x['network_g'].pop('type') pth_path = x['path']['pretrain_network_g'] print('**', yaml_file, pth_path) ########################## model_restoration = eval(s)(**x['network_g']) checkpoint = torch.load(pth_path) model_restoration.load_state_dict(checkpoint['params']) print("===>Testing using weights: ", pth_path) model_restoration.cuda() model_restoration = nn.DataParallel(model_restoration) model_restoration.eval() factor = 8 datasets = ['Test1200', 'Test2800'] for dataset in datasets: result_dir = os.path.join(args.result_dir, dataset) os.makedirs(result_dir, exist_ok=True) inp_dir = os.path.join(args.input_dir, 'test', dataset, 'input') files = natsorted(glob(os.path.join(inp_dir, '*.png')) + glob(os.path.join(inp_dir, '*.jpg'))) with torch.no_grad(): for file_ in tqdm(files): torch.cuda.ipc_collect() torch.cuda.empty_cache() img = np.float32(utils.load_img(file_)) / 255. img = torch.from_numpy(img).permute(2, 0, 1) input_ = img.unsqueeze(0).c

最新推荐

recommend-type

微软解决方案面向服务的架构.doc

微软解决方案面向服务的架构.doc
recommend-type

Huawei S6780-H-V600R024SPH120

Huawei S6780-H_V600R024SPH120,里面包含补丁说明书和补丁安装指导书,该补丁支持哪些型号,支持哪些版本可以安装当前补丁,请参考补丁说明书和补丁安装指导书。
recommend-type

网络营销案例分析概述.pptx

网络营销案例分析概述.pptx
recommend-type

2025广西省道路路网矢量数据图层Shp数据最新版下载

2025广西省道路路网矢量数据图层,shp格式,包含多级道路分类属性,路名等属性,包含全省几十万条道路,坐标系为WGS1984坐标系统
recommend-type

最新中国移动通信年度报告样本 (1)(1).doc

最新中国移动通信年度报告样本 (1)(1).doc
recommend-type

VC图像编程全面资料及程序汇总

【标题】:"精通VC图像编程资料全览" 【知识点】: VC即Visual C++,是微软公司推出的一个集成开发环境(IDE),专门用于C++语言的开发。VC图像编程涉及到如何在VC++开发环境中处理和操作图像。在VC图像编程中,开发者通常会使用到Windows API中的GDI(图形设备接口)或GDI+来进行图形绘制,以及DirectX中的Direct2D或DirectDraw进行更高级的图形处理。 1. GDI(图形设备接口): - GDI是Windows操作系统提供的一套应用程序接口,它允许应用程序通过设备无关的方式绘制图形。 - 在VC图像编程中,主要使用CDC类(设备上下文类)来调用GDI函数进行绘制,比如绘制线条、填充颜色、显示文本等。 - CDC类提供了很多函数,比如`MoveTo`、`LineTo`、`Rectangle`、`Ellipse`、`Polygon`等,用于绘制基本的图形。 - 对于图像处理,可以使用`StretchBlt`、`BitBlt`、`TransparentBlt`等函数进行图像的位块传输。 2. GDI+: - GDI+是GDI的后继技术,提供了更丰富的图形处理功能。 - GDI+通过使用`Graphics`类来提供图像的绘制、文本的渲染、图像的处理和颜色管理等功能。 - GDI+引入了对矢量图形、渐变色、复杂的文本格式和坐标空间等更高级的图形处理功能。 - `Image`类是GDI+中用于图像操作的基础类,通过它可以进行图像的加载、保存、旋转、缩放等操作。 3. DirectX: - DirectX是微软推出的一系列API集合,用于在Windows平台上进行高性能多媒体编程。 - DirectX中的Direct2D是用于硬件加速的二维图形API,专门用于UI元素和简单的图形渲染。 - DirectDraw主要用于硬件加速的位图操作,比如全屏游戏开发中的画面渲染。 4. 位图操作: - 在VC图像编程中,位图操作是一个重要的部分。需要了解如何加载、保存和处理位图(BMP)文件。 - 可以使用位图文件格式的解析,来访问位图的像素数据,进行像素级别的图像处理和修改。 5. 高级图像处理技术: - 包括图像滤镜、图像转换、图像压缩和解压缩技术。 - 需要掌握一些图像处理算法,比如卷积、FFT(快速傅里叶变换)、DCT(离散余弦变换)等。 - 了解图像的色彩空间转换,比如RGB到YUV的转换,这在视频处理中非常重要。 6. 图像库的使用: - 除了直接使用API进行图像处理之外,还可以使用开源的图像处理库,如OpenCV。 - OpenCV是一个跨平台的计算机视觉和机器学习软件库,它提供了很多高级的图像处理功能。 【压缩包子文件的文件名称列表】: VC++ 文件名称列表显示为"VC++",这表明文件包中可能包含了Visual C++的项目、源代码文件、动态链接库(DLLs)、编译器设置、链接器设置等。如果要使用这些文件进行VC图像编程,需要确保Visual C++开发环境已经安装并且配置好,同时需要安装好所有依赖的库和工具。 要开始VC图像编程,开发者需要具备C++编程基础,熟悉Windows编程概念,并且对图形学有一定的了解。掌握VC图像编程技巧,对于进行桌面应用程序开发、图像处理软件开发以及游戏开发等都是至关重要的。
recommend-type

Pokemmo响应速度翻倍:多线程处理的高级技巧

# 摘要 多线程处理是提高软件并发性能和响应速度的关键技术,本文从基础概念与优势出发,深入探讨了多线程的实现机制,包括线程模型、调度算法、编程关键技术、线程安全性最佳实践、死锁预防与处理等方面。通
recommend-type

人名列表滚动抽奖

<think>嗯,用户想实现一个滚动抽奖功能,需要前端动画效果和随机选择逻辑。首先,我得考虑前端怎么实现滚动效果。可能用CSS动画或者JavaScript的requestAnimationFrame?比如,使用CSS的@keyframes来控制位移,或者用JS动态更新样式。然后,随机选择算法,可能需要确保公平性,比如用Fisher-Yates洗牌算法,或者用Math.random()来生成随机索引。然后,用户可能需要平滑的滚动动画,比如先快速滚动,然后逐渐减速,最后停在选中的人名上。这可能需要设置定时器,逐步改变位置,或者使用CSS过渡效果。另外,还要考虑性能,避免页面卡顿,可能需要使用硬件加
recommend-type

一站式JSF开发环境:即解压即用JAR包

标题:“jsf开发完整JAR包”所指的知识点: 1. JSF全称JavaServer Faces,是Java EE(现EE4J)规范之一,用于简化Java Web应用中基于组件的用户界面构建。JSF提供了一种模型-视图-控制器(MVC)架构的实现,使得开发者可以将业务逻辑与页面表示分离。 2. “开发完整包”意味着这个JAR包包含了JSF开发所需的所有类库和资源文件。通常来说,一个完整的JSF包会包含核心的JSF库,以及一些可选的扩展库,例如PrimeFaces、RichFaces等,这些扩展库提供了额外的用户界面组件。 3. 在一个项目中使用JSF,开发者无需单独添加每个必要的JAR文件到项目的构建路径中。因为打包成一个完整的JAR包后,所有这些依赖都被整合在一起,极大地方便了开发者的部署工作。 4. “解压之后就可以直接导入工程中使用”表明这个JAR包是一个可执行的归档文件,可能是一个EAR包或者一个可直接部署的Java应用包。解压后,开发者只需将其内容导入到他们的IDE(如Eclipse或IntelliJ IDEA)中,或者将其放置在Web应用服务器的正确目录下,就可以立即进行开发。 描述中所指的知识点: 1. “解压之后就可以直接导入工程中使用”说明这个JAR包是预先配置好的,它可能包含了所有必要的配置文件,例如web.xml、faces-config.xml等,这些文件是JSF项目运行所必需的。 2. 直接使用意味着减少了开发者配置环境和处理依赖的时间,有助于提高开发效率。 标签“jsf jar包”所指的知识点: 1. 标签指明了JAR包的内容是专门针对JSF框架的。因此,这个JAR包包含了JSF规范所定义的API以及可能包含的具体实现,比如Mojarra或MyFaces。 2. “jar包”是一种Java平台的归档文件格式,用于聚合多个文件到一个文件中。在JSF开发中,JAR文件经常被用来打包和分发库或应用程序。 文件名称列表“jsf”所指的知识点: 1. “jsf”文件名可能意味着这是JSF开发的核心库,它应该包含了所有核心的JavaServer Faces类文件以及资源文件。 2. 如果是使用特定版本的JSF,例如“jsf-2.2.jar”,则表明文件内包含了对应版本的JSF实现。这种情况下,开发者必须确认他们所使用的Web服务器或应用程序服务器支持该版本的JSF。 3. 文件名称也可能是“jsf-components.jar”、“jsf-impl.jar”等,表明这个JAR包是JSF的一个子模块或特定功能组件。例如,“jsf-components.jar”可能包含了一系列用于在JSF应用中使用的自定义组件。 4. 对于开发者而言,了解文件名称中所蕴含的信息非常重要,因为这将决定他们需要下载哪些JAR包来满足特定项目的需求。 综合以上信息,开发者在使用JSF进行Java Web应用开发时,会通过一个预先配置好的JAR包来快速地搭建和启动项目。这样做不仅简化了项目初始化的过程,也使得开发者能够更加聚焦于业务逻辑的实现和界面设计,而不必深究底层框架配置的细节。
recommend-type

Pokemmo内存优化揭秘:专家教你如何降低50%资源消耗

# 摘要 本文综述了Pokemmo游戏的内存优化方法,从内存管理基础出发,探讨内存使用效率的影响因素,并介绍了性能监控与分析工具。在内存优化实践技巧章节中,详细讨论了代码层面的优化、数据结构和算法选择对内存效率的影响,并通过案例分析展示了实际的优化过程。针对Pokemmo游戏特点,分析了内存消耗特性并提出了特定优化技术。最后,本文展望了未来内存管理技术的发展方向,以及游戏开发中面临的新挑战,为Pokemmo及类似游戏提供了优化建议。 # 关键字 内存优化;内存管理;性能监控;数据结构;算法效率;游戏开发 参考资源链接:[Pokemmo必备资源包:四种ROM与汉化补丁](https://we