活动介绍

可是在比较 Timestamp 类型的变量和 float 类型的变量时需要对其进行转换。怎么该这段代码import pandas as pdfrom mlxtend.preprocessing import TransactionEncoderfrom mlxtend.frequent_patterns import apriori# 读取 Excel 文件数据df = pd.read_excel('D:/shujuji/zhizaoye.xls', sheet_name='制造业')dataset = df.values.tolist()print(dataset)te = TransactionEncoder()te_data = te.fit(dataset).transform(dataset)df_encoded = pd.DataFrame(te_data, columns=te.columns_)# 应用 Apriori 算法检测频繁项集frequent_itemsets = apriori(df_encoded, min_support=0.2, use_colnames=True)# 输出结果print(frequent_itemsets)

时间: 2024-03-17 13:41:03 浏览: 100
如果需要比较 Timestamp 类型的变量和 float 类型的变量,可以使用 Timestamp.to_pydatetime() 方法将 Timestamp 转换为 datetime 对象,再使用 datetime.timestamp() 方法将其转换为 float 类型。修改后的代码如下: ``` import pandas as pd from mlxtend.preprocessing import TransactionEncoder from mlxtend.frequent_patterns import apriori # 读取 Excel 文件数据 df = pd.read_excel('D:/shujuji/zhizaoye.xls', sheet_name='制造业') dataset = df.values.tolist() print(dataset) te = TransactionEncoder() te_data = te.fit(dataset).transform(dataset) df_encoded = pd.DataFrame(te_data, columns=te.columns_) # 将 Timestamp 类型的变量转换为 float 类型 df_encoded['timestamp'] = df_encoded['timestamp'].apply(lambda x: x.to_pydatetime().timestamp()) # 应用 Apriori 算法检测频繁项集 frequent_itemsets = apriori(df_encoded, min_support=0.2, use_colnames=True) # 输出结果 print(frequent_itemsets) ```
阅读全文

相关推荐

from tabnanny import verbose import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns import xgboost from sklearn.model_selection import train_test_split, GridSearchCV from sklearn.ensemble import RandomForestRegressor from xgboost import XGBRegressor from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error, confusion_matrix, accuracy_score from 实验1 import param_grid # 解决中文显示问题 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False # ----------------------------- 1. 数据加载与预处理 ----------------------------- def load_and_preprocess_data(): """加载数据并执行预处理""" df = pd.read_excel('.che22.xlsx') # 处理缺失值(将'暂无'替换为NaN) df.replace('暂无', np.nan, inplace=True) # 字段格式转换 df['售价'] = df['售价'].str.replace('万', '').astype(float) df['新车指导价'] = df['新车指导价'].str.replace('万', '').astype(float) df['里程'] = df['里程'].str.replace('万公里', '').astype(float) df['过户次数'] = df['过户次数'].str.replace('次', '').astype(float) # 提取品牌前4字符(如“奔驰GLC” → “奔驰”) df['品牌'] = df['品牌'].str[:4] # 更合理的品牌提取方式 # 计算车龄(修复空值问题) current_year = pd.Timestamp.now().year df['上牌时间'] = pd.to_numeric(df['上牌时间'].str[:4], errors='coerce') # 转换为年份数字 df['车龄'] = current_year - df['上牌时间'] # 排量处理(正则提取数值 + 填充均值) df['排量'] = df['排量'].astype(str).str.extract(r'(\d+\.?\d*)', expand=False).astype(float) df['排量'].fillna(df['排量'].mean(), inplace=True) # 删除重复项 df.drop_duplicates(inplace=True) df.reset_index(drop=True, inplace=True) return df # 加载数据 df = pd.read_excel('./che22.xlsx') # ----------------------------- 2. 探索性数据分析(EDA) ----------------------------- # 箱线图:检测异常值 fig, ax = plt.subplots(1, 2, figsize=(16, 6)) df.boxplot(column=['里程'], ax=ax[0], flierprops={'marker': 'o', 'markerfacecolor': 'red', 'markersize': 4}) df.boxplot(column=['售价'], ax=ax[1], flierprops={'marker': 'o', 'markerfacecolor': 'red', 'markersize': 4}) plt.suptitle('里程与售价异常值检测') plt.show()

import pandas as pd import numpy as np import os import matplotlib.pyplot as plt from tqdm import tqdm from concurrent.futures import ProcessPoolExecutor # ========== 配置参数 ========== D_TYPE = {0: np.float32, 1: np.float32} CHUNK_SIZE = 10000 # ========== 数据处理函数 ========== def process_sheet(df: pd.DataFrame, target: float) -> float: """处理单个工作表的核心逻辑""" valid_mask = df.iloc[:, [0,1]].notna().all(axis=1) a_col = df.iloc[valid_mask, 0].astype(np.float32) b_col = df.iloc[valid_mask, 1].astype(np.float32) mask = a_col.between( target - 0.003, target + 0.003, inclusive='both' ) return b_col[mask].sum() def process_excel(file_path: str, target: float) -> float: """处理单个Excel文件的并行单元""" try: all_sheets = pd.read_excel( file_path, sheet_name=None, skiprows=5, usecols="A:B", header=None, dtype=D_TYPE, engine='openpyxl' ) total = sum( process_sheet(df, target) for df in all_sheets.values() ) return total / len(all_sheets) except Exception as e: print(f"处理文件 {os.path.basename(file_path)} 时出错: {str(e)}") return 0.0 # ========== 主处理流程 ========== if __name__ == "__main__": # 初始化中文字体配置(核心修复) plt.rcParams['font.sans-serif'] = ['Microsoft YaHei', 'SimHei', 'KaiTi'] # 多字体备选 plt.rcParams['axes.unicode_minus'] = False # 用户输入 target_value = float(input("请输入要筛选的A列目标值:")) input_dir = input("请输入Excel文件所在的文件夹路径:").strip() output_dir = input("请输入结果图表输出路径:").strip() os.makedirs(output_dir, exist_ok=True) # 获取文件列表 files = [ os.path.join(input_dir, f) for f in os.listdir(input_dir) if f.endswith(".xlsx") ] # 并行处理 with ProcessPoolExecutor() as executor: futures = { executor.submit(process_excel, file, target_value): os.path.basename(file) for file in files } results = [] with tqdm(total=len(files), desc="处理进度") as pbar: for future in futures: filename = futures[future] try: avg_sum = future.result() results.append((filename, avg_sum)) except Exception as e: print(f"文件 {filename} 处理异常: {str(e)}") finally: pbar.update(1) # ========== 可视化优化 ========== plt.style.use('seaborn-v0_8-darkgrid') plt.figure(figsize=(12, 6), dpi=100) plt.barh( [x[0] for x in results], [x[1] for x in results], color='#4C72B0' ) plt.xlabel("B列总和平均值", fontsize=12, fontweight='bold') plt.ylabel("文件名", fontsize=12, fontweight='bold') plt.title(f"目标值 {target_value}±0.003 数据统计\n({len(files)}个文件)", fontsize=14, pad=20) plt.grid(axis='x', linestyle='--', alpha=0.7) # 调整布局和保存 plt.tight_layout() output_path = os.path.join(output_dir, "optimized_result.png") plt.savefig(output_path, dpi=300, bbox_inches='tight') print(f"\n处理完成!优化结果图表已保存至:{output_path}") 这段代码的文字路径在哪修改

from bertopic import BERTopic import numpy as np import pandas as pd from umap import UMAP from hdbscan import HDBSCAN from sklearn.feature_extraction.text import CountVectorizer from bertopic.vectorizers import ClassTfidfTransformer import re import nltk from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS from nltk.tokenize import word_tokenize from wordcloud import WordCloud import matplotlib.pyplot as plt # 加载原始文本数据(仍需用于主题表示) df = pd.read_csv('tokenized_abstract.csv', encoding='utf-8') sentences = df['Tokenized_Abstract'].tolist() print('文本条数: ', len(sentences)) print('预览第一条: ', sentences[0]) # 检查缺失值 print("缺失值数量:", df['Tokenized_Abstract'].isna().sum()) # 检查非字符串类型 non_str_mask = df['Tokenized_Abstract'].apply(lambda x: not isinstance(x, str)) print("非字符串样本:\n", df[non_str_mask]['Tokenized_Abstract'].head()) vectorizer_model = None from sentence_transformers import SentenceTransformer # 加载时间数据 df['Date'] = pd.to_datetime(df['Date']) # 从Date列提取年份 years = df['Date'].dt.year print(years) # Step 1 - Extract embeddings embedding_model = SentenceTransformer("C:\\Users\\18267\\.cache\\huggingface\\hub\\models--sentence-transformers--all-mpnet-base-v2\\snapshots\\9a3225965996d404b775526de6dbfe85d3368642") embeddings = np.load('clean_emb_last.npy') print(f"嵌入的形状: {embeddings.shape}") # Step 2 - Reduce dimensionality umap_model = UMAP(n_neighbors=7, n_components=10, min_dist=0.0, metric='cosine',random_state=42) # Step 3 - Cluster reduced embeddings hdbscan_model = HDBSCAN(min_samples=7, min_cluster_size=60,metric='euclidean', cluster_selection_method='eom', prediction_data=True) # Step 4 - Tokenize topics # Combine custom stop words with scikit-learn's English stop words custom_stop_words = ['h2', 'storing', 'storage', 'include', 'comprise', 'utility', 'model', 'disclosed', 'embodiment', 'invention', 'prior', 'art', 'according', 'present', 'method', 'system', 'device', 'may', 'also', 'use', 'used', 'provide', 'wherein', 'configured', 'predetermined', 'plurality', 'comprising', 'consists', 'following', 'characterized', 'claim', 'claims', 'said', 'first', 'second', 'third', 'fourth', 'fifth', 'one', 'two', 'three','hydrogen'] # Create combined stop words set all_stop_words = set(custom_stop_words).union(ENGLISH_STOP_WORDS) vectorizer_model = CountVectorizer(stop_words=list(all_stop_words)) # Step 5 - Create topic representation ctfidf_model = ClassTfidfTransformer() # All steps together topic_model = BERTopic( embedding_model=embedding_model, # Step 1 - Extract embeddings umap_model=umap_model, # Step 2 - Reduce dimensionality hdbscan_model=hdbscan_model, # Step 3 - Cluster reduced embeddings vectorizer_model=vectorizer_model, # Step 4 - Tokenize topics ctfidf_model=ctfidf_model, # Step 5 - Extract topic words top_n_words=50 ) # 拟合模型 topics, probs = topic_model.fit_transform(documents=sentences, # 仍需提供文档用于主题词生成 embeddings=embeddings # 注入预计算嵌入) ) # 获取主题聚类信息 topic_info = topic_model.get_topic_info() print(topic_info)使用BERTopic的动态主题功能,传入时间戳参数,并在拟合后分析主题随时间的变化。设置时间戳 t1=2000-2010 年,t2=2011-2018 年,t3=2019-2024 年。最终,将当前阶段和前一阶段的 c-TF-IDF 平均值作为当前阶段的权重分数,取权重分数前 15 的单词作为动态主题的关键词,形成动态主题词列表。在动态主题方面,分别选取主题在每个阶段改进 c-TF-IDF 值排名前 15 的单词作为关键词绘制演化图分析主题在不同阶段的动态变化。请你提供方案实现上述操作。

条件组合MySQL数据库密码是220603,我提供一个excel文档,我可以将这个文档进行导入到我的HTML页面里面的导入按钮,端口号为5008 导入功能:店铺管理(添加,修改,删除店铺的功能),通过输入店铺码和店铺名称来添加,添加完成之后会有一个进行导入店铺数据功能,这个就通过一个选择文件(如excel和csv),我认为在MySQL数据库里面创建表太麻烦了,只想创建一个数据库名字为shop_order,然后根据导入文件的第一行的所有表名字作为关键字创建所对应的MySQL表放在shop——oeder下面(如将Paid Time自动识别为DATETIME类型),切记这个关键字就是导入文档表的第一行,并且下面都是该关键字所对应的所有数据,由于我所做的系统需要导入三个文档,所以要设置三个导入数据的按钮,分别为All order,in come,creater order这三个表所对应了所有订单数据,收入订单数据和达人订单数据(也就是通过达人带货卖出去的订单)。 查询功能:在导入需要的数据之后,我们需要进行数据查询,目前先通过店铺码和店铺名称查询店铺,查询成功后进入店铺,这个时候我们需要再查询店铺的数据,这个时候就需要通过查询我们目前需要的数据,有最开始得到的关键字:Order ID,Paid Time(付款时间),还要给这个数据排列,也就是在这两个查询数据前面还需要建立一个Shop ID作为编号如01,02,03等等,查询后显示的完整数据的是[Shop ID:Order ID,Paid Time],实现上面功能还不够,我希望还可以增加一个时间查询也就是可以输入某个时间段到某个时间段来进行查询这段时间所有订单的数据,列如Start time[年月日]——End timr[年月日],这个时间查询是通过最开始得到的关键字里面的Paid Time所对应的时间数据("10/07/2025 10:24:31 " )来决定,因为不是每个订单号Order ID都会被进行付款(paid_time),我们只需要提前这个付款时间不需要其他时间,所以只需要查询有paid_time的Order ID并且提取出来即可,若没有paid_time数据就直接跳过放弃,继续查询有paid_time的数据订单即可,根据以上要求给出这个项目的开发教程,要求保姆级教程,再说一遍,在里面必须有可以导入excel和csv文档的功能,并且是点击导入数据就可以在我的电脑里面查找选择文档。要注意下面几个问题,第一:日期时间列识别问题:系统错误地将非日期列(如 Order_Status、Quantity)识别为日期列,日期转换失败率高(如 Created_Time 只有33.4%的转换成功率),错误提示:列 XXX 保持原样: 日期格式识别率低 (0.0%)原因是:仅通过列名关键词(如包含"time")判断是否为日期列,缺乏数据内容验证,未排除明确非日期列的字段(如状态、数量、金额等),未处理多种日期格式(如 10/07/2023 10:24:31 和 2023-07-10T15:45:00),一定要增强列识别逻辑,还有多格式日期解析和分阶段验证。 第二:数据类型推断错误:避免数值列(如 Quantity)被误识别为日期,数据库插入时报错:Incorrect datetime value: '579567909575820736' for column 'Order ID',原因是因为类型推断仅基于列名,未结合数据内容,未处理大整数和特殊格式数字。所以要设置一个优先级调整和安全转换。 第三:数据库交互问题,特别是RuntimeError: The session is unavailable because no secret key was set,事务回滚失败导致表残留,数据类型不匹配(如DATETIME和VARCHAR冲突)原因是Flask会话密钥未配置未正确处理MySQL错误代码(如1292=日期格式错误)和表结构创建与数据插入逻辑分离。所以要做到基础配置和事务管理和原子性操作这三个方面。 第四:前端模板渲染问题: 问题表现: jinja2.exceptions.TemplateNotFound jinja2.exceptions.UndefinedError: 'dict object' has no attribute 'id' JavaScript中 const report = {{ date_report|tojson|safe }} 报错 根本原因: 模板文件未放在正确目录(templates/),变量未定义或为None时直接调用方法,未处理JSON序列化中的None值,解决方案:设置目录结构规范和安全变量访问以及默认值处理。 第五: 文件上传与数据处理问题: 问题表现: 上传的Excel/CSV文件解析失败 临时文件未清理 列名含空格导致SQL错误 根本原因: 未验证文件内容格式 未处理文件编码问题(如UTF-8 vs GBK) 未规范化列名(如 Paid Time → Paid_Time) 解决方案:设置文件预处理和资源清理和编码指定。 第六:分页与性能问题: 问题表现: 大数据量查询时内存溢出 分页逻辑错误(LIMIT/OFFSET计算) 解决方案:设置流式查询和分页优化。 最后做一些优化比如:系统架构改进建议 日志监控: python代码: app.logger.addHandler(logging.FileHandler('import_errors.log')) 配置分离: python代码: class DevelopmentConfig(Config): DEBUG = True UPLOAD_FOLDER = '/tmp/uploads' 单元测试: python代码: def test_datetime_detection(): assert is_potential_datetime_column("Paid_Time", pd.Series(["2023-01-01"])) is True 根据上面所有要求和问题还有解决方法制定一个Python电商数据分析系统教程,要求保姆级教程!!!请仔细分析我给出的所有问题并且一一解决生成一个完整教程,要求准确并且代码质量高可以运行

import sys import io import os import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt import scapy.all as scapy from collections import defaultdict from sklearn.preprocessing import StandardScaler from sklearn.cluster import KMeans from sklearn.metrics import silhouette_score import pandas as pd from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.dns import DNS, DNSRR from datetime import datetime # 设置Matplotlib中文显示支持 plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'WenQuanYi Micro Hei'] plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题 # 设置系统输出编码为UTF-8 sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') if os.name == 'nt': # Windows系统专用设置 os.system('chcp 65001') # 切换到UTF-8代码页 # ====================== # 1. PCAP文件解析与流重组 # ====================== name_pcap = input("请输入想要解析的PCAP文件: ") print("正在解析PCAP文件...") # 新增:DNS解析缓存 dns_cache = {} # 流式解析大PCAP文件,避免内存溢出 def parse_large_pcap(pcap_path): flows = defaultdict(list) # 使用字典存储网络流 packet_count = 0 # 总包计数器 ip_packet_count = 0 # IP包计数器 global dns_cache # 使用全局DNS缓存 # 使用PcapReader流式读取,避免一次性加载大文件 with scapy.PcapReader(pcap_path) as pcap_reader: for pkt in pcap_reader: packet_count += 1 # 进度显示(每10000个包) if packet_count % 10000 == 0: print(f"已处理 {packet_count} 个包...") # 新增:DNS响应解析 - 修复索引越界问题 if DNS in pkt and pkt[DNS].qr == 1: # 只处理DNS响应 # 获取实际可用的回答记录数量 actual_ancount = len(pkt[DNS].an) if hasattr(pkt[DNS], 'an') else 0 # 使用实际数量而不是ancount字段 for i in range(min(pkt[DNS].ancount, actual_ancount)): rr = pkt[DNS].an[i] if isinstance(rr, DNSRR) and rr.type == 1: # A记录 try: ip = rr.rdata # 正确处理域名格式 domain = rr.rrname.decode('utf-8', errors='ignore').rstrip('.') # 添加到DNS缓存 if ip not in dns_cache: dns_cache[ip] = set() dns_cache[ip].add(domain) except Exception as e: # 忽略解析错误 pass # 仅处理IP层数据包 if IP in pkt: ip_packet_count += 1 ip_layer = pkt[IP] proto_num = ip_layer.proto # 获取协议号 # 初始化端口变量 src_port, dst_port = 0, 0 # 根据协议类型提取端口信息 if proto_num == 6 and TCP in pkt: # TCP协议 src_port = pkt[TCP].sport dst_port = pkt[TCP].dport elif proto_num == 17 and UDP in pkt: # UDP协议 src_port = pkt[UDP].sport dst_port = pkt[UDP].dport elif proto_num == 1 and ICMP in pkt: # ICMP协议 # 使用类型和代码代替端口 src_port = pkt[ICMP].type dst_port = pkt[ICMP].code # 创建五元组作为流的唯一标识 flow_key = (ip_layer.src, ip_layer.dst, src_port, dst_port, proto_num) flows[flow_key].append(pkt) # 将包添加到对应流 print(f"共处理 {packet_count} 个包,其中 {ip_packet_count} 个IP包") print(f"DNS缓存大小: {len(dns_cache)} 个IP到域名的映射") return flows # 解析PCAP文件 flows = parse_large_pcap(name_pcap) print(f"共识别出 {len(flows)} 个网络流") # ====================== # 2. 特征工程 # ====================== print("正在提取流量特征...") features = [] # 存储特征向量 valid_flows = [] # 存储有效流的标识 # 初始化时间列表 src_start_times = [] src_end_times = [] dst_start_times = [] dst_end_times = [] # 特征名称列表 feature_names = [ '包数量', '总字节数', '平均包间隔', '包间隔标准差', '平均包长', '包长标准差', '最小包长', '最大包长', '上行比例' ] min_packets = 5 # 流的最小包数阈值 for flow_key, packets in flows.items(): packet_count = len(packets) # 过滤包数不足的流 if packet_count < min_packets: continue # 1. 基础统计特征 total_bytes = sum(len(p) for p in packets) # 总字节数 # 2. 时序特征(包到达间隔) timestamps = np.array([float(p.time) for p in packets]) if packet_count > 1: iat = np.diff(timestamps) # 包间隔 mean_iat = np.mean(iat) # 平均间隔 std_iat = np.std(iat) if len(iat) > 1 else 0 # 间隔标准差 else: mean_iat = std_iat = 0 # 3. 包长特征 pkt_lengths = np.array([len(p) for p in packets]) mean_pkt_len = np.mean(pkt_lengths) # 平均包长 std_pkt_len = np.std(pkt_lengths) if packet_count > 1 else 0 # 包长标准差 min_pkt_len = np.min(pkt_lengths) # 最小包长 max_pkt_len = np.max(pkt_lengths) # 最大包长 # 4. 方向特征(上行比例) src_ip = flow_key[0] # 流源IP dst_ip = flow_key[1] # 流目的IP # 判断数据包方向(1=上行,0=下行) directions = np.array([1 if p[IP].src == src_ip else 0 for p in packets]) up_ratio = np.mean(directions) # 上行包比例 # 源IP发送的包的时间戳 src_pkt_times = [float(p.time) for p in packets if p[IP].src == src_ip] # 目的IP发送的包的时间戳 dst_pkt_times = [float(p.time) for p in packets if p[IP].src == dst_ip] # 计算起始和终止时间 src_start = min(src_pkt_times) if src_pkt_times else np.nan src_end = max(src_pkt_times) if src_pkt_times else np.nan dst_start = min(dst_pkt_times) if dst_pkt_times else np.nan dst_end = max(dst_pkt_times) if dst_pkt_times else np.nan # 添加时间信息到列表 src_start_times.append(src_start) src_end_times.append(src_end) dst_start_times.append(dst_start) dst_end_times.append(dst_end) # 添加特征向量 features.append([ packet_count, total_bytes, mean_iat, std_iat, mean_pkt_len, std_pkt_len, min_pkt_len, max_pkt_len, up_ratio ]) valid_flows.append(flow_key) # 记录有效流 # 转换为特征矩阵 if features: feature_matrix = np.array(features) print(f"提取了 {len(features)} 个有效流的 {len(feature_names)} 维特征") else: # 错误处理:无有效流 print("错误:没有提取到任何有效网络流") print("可能原因:") print("1. 所有流都少于5个包(尝试减小过滤阈值)") print("2. 文件格式不兼容(尝试用Wireshark打开验证)") print("3. 没有IP流量(检查网络捕获配置)") sys.exit(1) # ====================== # 3. 数据标准化 # ====================== print("正在进行数据标准化...") scaler = StandardScaler() scaled_features = scaler.fit_transform(feature_matrix) # Z-score标准化 # ====================== # 4. 肘部法则确定最佳K值 # ====================== print("使用肘部法则确定最佳聚类数...") n_samples = scaled_features.shape[0] # 样本数量 # 动态调整K值范围 if n_samples < 5: print(f"警告:样本数量较少({n_samples}),将使用简化聚类策略") k_range = range(1, min(11, n_samples + 1)) # K上限不超过样本数 else: k_range = range(1, 11) # 测试1-10个聚类 sse = [] # 存储SSE(误差平方和) kmeans_models = {} # 存储不同K值的模型 for k in k_range: # 跳过超过样本数的K值 if k > n_samples: print(f"跳过K={k}(超过样本数{n_samples})") continue # 训练K-means模型 kmeans = KMeans(n_clusters=k, n_init=10, random_state=42) kmeans.fit(scaled_features) sse.append(kmeans.inertia_) # 记录SSE kmeans_models[k] = kmeans # 存储模型 # 样本不足时的处理 if len(sse) < 2: optimal_k = min(2, n_samples) # 至少2类(不超过样本数) print(f"样本过少,直接设置K={optimal_k}") else: # 绘制肘部法则图 plt.figure(figsize=(10, 6)) plt.plot(list(k_range)[:len(sse)], sse, 'bo-') plt.xlabel('聚类数量 $ K$') plt.ylabel('SSE (误差平方和)') plt.title('肘部法则确定最佳聚类数') plt.grid(True) plt.savefig('elbow_method.png', dpi=300) plt.close() print("肘部法则图已保存为 elbow_method.png") # 自动检测拐点(二阶差分最小值) if len(sse) >= 3: knee_point = np.argmin(np.diff(sse, 2)) + 2 # 计算二阶差分 optimal_k = max(2, min(10, knee_point)) # 确保K在2-10之间 else: optimal_k = max(2, min(3, n_samples)) # 样本少时选择2或3 print(f"自动检测的最佳聚类数: K = {optimal_k}") # ====================== # 5. K-means聚类 # ====================== print(f"正在进行K-means聚类 (K={optimal_k})...") # 调整K值不超过样本数 if optimal_k > n_samples: optimal_k = max(1, n_samples) print(f"调整聚类数为样本数: K = {optimal_k}") # 使用已有模型或新建模型 if optimal_k in kmeans_models: kmeans = kmeans_models[optimal_k] cluster_labels = kmeans.labels_ else: kmeans = KMeans(n_clusters=optimal_k, n_init=10, random_state=42) cluster_labels = kmeans.fit_predict(scaled_features) # 计算轮廓系数(K>1时) if optimal_k > 1: silhouette_avg = silhouette_score(scaled_features, cluster_labels) print(f"轮廓系数: {silhouette_avg:.4f} (接近1表示聚类效果良好)") else: print("聚类数为1,无需计算轮廓系数") # ====================== # 6. 结果分析与可视化(新增域名显示) # ====================== # 创建结果DataFrame result_df = pd.DataFrame({ '源IP': [flow[0] for flow in valid_flows], '目的IP': [flow[1] for flow in valid_flows], '协议': [f"TCP(6)" if flow[4] == 6 else f"UDP(17)" for flow in valid_flows], '聚类标签': cluster_labels, '源起始时间': src_start_times, '源终止时间': src_end_times, '目的起始时间': dst_start_times, '目的终止时间': dst_end_times }) # 新增:添加域名信息 def get_domains(ip): """获取IP对应的域名列表""" domains = dns_cache.get(ip, set()) return ", ".join(domains) if domains else "N/A" result_df['目的域名'] = result_df['目的IP'].apply(get_domains) # 添加特征列 for i, name in enumerate(feature_names): result_df[name] = feature_matrix[:, i] # 新增:时间格式化函数 def format_timestamp(ts): """将时间戳格式化为可读字符串""" if np.isnan(ts): return "N/A" return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # 保留毫秒 # 应用时间格式化 result_df['源起始时间'] = result_df['源起始时间'].apply(format_timestamp) result_df['源终止时间'] = result_df['源终止时间'].apply(format_timestamp) result_df['目的起始时间'] = result_df['目的起始时间'].apply(format_timestamp) result_df['目的终止时间'] = result_df['目的终止时间'].apply(format_timestamp) # 保存结果到CSV result_df.to_csv('cluster_results.csv', index=False, encoding='utf_8_sig') print("聚类结果已保存为 cluster_results.csv") # 簇统计信息 cluster_stats = result_df.groupby('聚类标签').agg({ '包数量': 'mean', '总字节数': 'mean', '平均包间隔': 'mean', '上行比例': 'mean' }).rename(columns={ '包数量': '平均包数量', '总字节数': '平均总字节数', '平均包间隔': '平均包间隔(s)', '上行比例': '平均上行比例' }) print("\n各用户簇流量特征统计:") print(cluster_stats.round(2)) # 使用PCA降维可视化 from sklearn.decomposition import PCA pca = PCA(n_components=2) # 降维到2D principal_components = pca.fit_transform(scaled_features) plt.figure(figsize=(12, 8)) scatter = plt.scatter( principal_components[:, 0], principal_components[:, 1], c=cluster_labels, # 按簇着色 cmap='viridis', alpha=0.6 ) # 标记簇中心 centers = pca.transform(kmeans.cluster_centers_) plt.scatter( centers[:, 0], centers[:, 1], c='red', s=200, alpha=0.9, marker='X', edgecolor='black' ) plt.colorbar(scatter, label='用户簇') plt.xlabel('主成分 1') plt.ylabel('主成分 2') plt.title(f'校园网用户流量聚类 (K={optimal_k})') plt.grid(alpha=0.3) plt.savefig('cluster_visualization.png', dpi=300) plt.close() print("聚类可视化图已保存为 cluster_visualization.png") # 最终输出 print("\n聚类完成! 结果文件已保存:") print("- elbow_method.png: 肘部法则图") print("- cluster_results.csv: 详细聚类结果(包含域名信息)") print("- cluster_visualization.png: 聚类可视化") 现在我只需要源起始时间和终止时间,不需要输出目的起始时间和终止时间

import sys import io import os import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt import scapy.all as scapy from collections import defaultdict from sklearn.preprocessing import StandardScaler from sklearn.cluster import KMeans from sklearn.metrics import silhouette_score import pandas as pd import tkinter as tk from tkinter import filedialog, messagebox from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.dns import DNS, DNSRR from datetime import datetime import time from sklearn.decomposition import PCA # 设置Matplotlib中文显示支持 plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'WenQuanYi Micro Hei'] plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题 # 设置系统输出编码为UTF-8 sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') if os.name == 'nt': # Windows系统专用设置 os.system('chcp 65001') # 切换到UTF-8代码页 # ====================== # 文件选择对话框 # ====================== def select_files(): """创建文件选择对话框""" root = tk.Tk() root.withdraw() # 隐藏主窗口 # 选择PCAP文件 pcap_path = filedialog.askopenfilename( title="选择PCAP文件", filetypes=[("PCAP文件", "*.pcap"), ("PCAPNG文件", "*.pcapng"), ("所有文件", "*.*")] ) if not pcap_path: print("未选择PCAP文件,程序退出") sys.exit(0) # 选择输出目录 output_dir = filedialog.askdirectory(title="选择结果保存位置") if not output_dir: print("未选择输出目录,程序退出") sys.exit(0) return pcap_path, output_dir # ====================== # 1. PCAP文件解析与流重组 # ====================== print("请选择PCAP文件和输出位置...") pcap_path, output_dir = select_files() print(f"解析文件: {pcap_path}") print(f"保存位置: {output_dir}") # 新增:DNS解析缓存 dns_cache = {} # 流式解析大PCAP文件,避免内存溢出 def parse_large_pcap(pcap_path): flows = defaultdict(list) # 使用字典存储网络流 packet_count = 0 # 总包计数器 ip_packet_count = 0 # IP包计数器 global dns_cache # 使用全局DNS缓存 # 使用PcapReader流式读取,避免一次性加载大文件 with scapy.PcapReader(pcap_path) as pcap_reader: for pkt in pcap_reader: packet_count += 1 # 进度显示(每10000个包) if packet_count % 10000 == 0: print(f"已处理 {packet_count} 个包...") # 新增:DNS响应解析 - 修复索引越界问题 if DNS in pkt and pkt[DNS].qr == 1: # 只处理DNS响应 # 获取实际可用的回答记录数量 actual_ancount = len(pkt[DNS].an) if hasattr(pkt[DNS], 'an') else 0 # 使用实际数量而不是ancount字段 for i in range(min(pkt[DNS].ancount, actual_ancount)): rr = pkt[DNS].an[i] if isinstance(rr, DNSRR) and rr.type == 1: # A记录 try: ip = rr.rdata # 正确处理域名格式 domain = rr.rrname.decode('utf-8', errors='ignore').rstrip('.') # 添加到DNS缓存 if ip not in dns_cache: dns_cache[ip] = set() dns_cache[ip].add(domain) except Exception as e: # 忽略解析错误 pass # 仅处理IP层数据包 if IP in pkt: ip_packet_count += 1 ip_layer = pkt[IP] proto_num = ip_layer.proto # 获取协议号 # 初始化端口变量 src_port, dst_port = 0, 0 # 根据协议类型提取端口信息 if proto_num == 6 and TCP in pkt: # TCP协议 src_port = pkt[TCP].sport dst_port = pkt[TCP].dport elif proto_num == 17 and UDP in pkt: # UDP协议 src_port = pkt[UDP].sport dst_port = pkt[UDP].dport elif proto_num == 1 and ICMP in pkt: # ICMP协议 # 使用类型和代码代替端口 src_port = pkt[ICMP].type dst_port = pkt[ICMP].code # 创建五元组作为流的唯一标识 flow_key = (ip_layer.src, ip_layer.dst, src_port, dst_port, proto_num) flows[flow_key].append(pkt) # 将包添加到对应流 print(f"共处理 {packet_count} 个包,其中 {ip_packet_count} 个IP包") print(f"DNS缓存大小: {len(dns_cache)} 个IP到域名的映射") return flows # 解析PCAP文件 flows = parse_large_pcap(pcap_path) print(f"共识别出 {len(flows)} 个网络流") # ====================== # 2. 特征工程 # ====================== print("正在提取流量特征...") features = [] # 存储特征向量 valid_flows = [] # 存储有效流的标识 # 只初始化源时间列表 src_start_times = [] src_end_times = [] # 特征名称列表 feature_names = [ '包数量', '总字节数', '平均包间隔', '包间隔标准差', '平均包长', '包长标准差', '最小包长', '最大包长', '上行比例' ] min_packets = 5 # 流的最小包数阈值 for flow_key, packets in flows.items(): packet_count = len(packets) # 过滤包数不足的流 if packet_count < min_packets: continue # 1. 基础统计特征 total_bytes = sum(len(p) for p in packets) # 总字节数 # 2. 时序特征(包到达间隔) timestamps = np.array([float(p.time) for p in packets]) if packet_count > 1: iat = np.diff(timestamps) # 包间隔 mean_iat = np.mean(iat) # 平均间隔 std_iat = np.std(iat) if len(iat) > 1 else 0 # 间隔标准差 else: mean_iat = std_iat = 0 # 3. 包长特征 pkt_lengths = np.array([len(p) for p in packets]) mean_pkt_len = np.mean(pkt_lengths) # 平均包长 std_pkt_len = np.std(pkt_lengths) if packet_count > 1 else 0 # 包长标准差 min_pkt_len = np.min(pkt_lengths) # 最小包长 max_pkt_len = np.max(pkt_lengths) # 最大包长 # 4. 方向特征(上行比例) src_ip = flow_key[0] # 流源IP dst_ip = flow_key[1] # 流目的IP # 判断数据包方向(1=上行,0=下行) directions = np.array([1 if p[IP].src == src_ip else 0 for p in packets]) up_ratio = np.mean(directions) # 上行包比例 # 源IP发送的包的时间戳 src_pkt_times = [float(p.time) for p in packets if p[IP].src == src_ip] # 计算源的起始和终止时间 src_start = min(src_pkt_times) if src_pkt_times else np.nan src_end = max(src_pkt_times) if src_pkt_times else np.nan # 添加时间信息到列表 src_start_times.append(src_start) src_end_times.append(src_end) # 添加特征向量 features.append([ packet_count, total_bytes, mean_iat, std_iat, mean_pkt_len, std_pkt_len, min_pkt_len, max_pkt_len, up_ratio ]) valid_flows.append(flow_key) # 记录有效流 if not features: # 错误处理:无有效流 print("错误:没有提取到任何有效网络流") print("可能原因:") print("1. 所有流都少于5个包(尝试减小过滤阈值)") print("2. 文件格式不兼容(尝试用Wireshark打开验证)") print("3. 没有IP流量(检查网络捕获配置)") sys.exit(1) feature_matrix = np.array(features) print(f"提取了 {len(features)} 个有效流的 {len(feature_names)} 维特征") # ====================== # 3. 数据标准化 # ====================== print("正在进行数据标准化...") scaler = StandardScaler() scaled_features = scaler.fit_transform(feature_matrix) # Z-score标准化 # ====================== # 4. 肘部法则确定最佳K值 # ====================== print("使用肘部法则确定最佳聚类数...") n_samples = scaled_features.shape[0] # 样本数量 # 动态调整K值范围 if n_samples < 5: print(f"警告:样本数量较少({n_samples}),将使用简化聚类策略") k_range = range(1, min(11, n_samples + 1)) # K上限不超过样本数 else: k_range = range(1, 11) # 测试1-10个聚类 sse = [] # 存储SSE(误差平方和) kmeans_models = {} # 存储不同K值的模型 for k in k_range: # 跳过超过样本数的K值 if k > n_samples: print(f"跳过K={k}(超过样本数{n_samples})") continue # 训练K-means模型 kmeans = KMeans(n_clusters=k, n_init=10, random_state=42) kmeans.fit(scaled_features) sse.append(kmeans.inertia_) # 记录SSE kmeans_models[k] = kmeans # 存储模型 # 样本不足时的处理 if len(sse) < 2: optimal_k = min(2, n_samples) # 至少2类(不超过样本数) print(f"样本过少,直接设置K={optimal_k}") else: # 绘制肘部法则图 plt.figure(figsize=(10, 6)) plt.plot(list(k_range)[:len(sse)], sse, 'bo-') plt.xlabel('聚类数量 $ K$') plt.ylabel('SSE (误差平方和)') plt.title('肘部法则确定最佳聚类数') plt.grid(True) elbow_path = os.path.join(output_dir, 'elbow_method.png') plt.savefig(elbow_path, dpi=300) plt.close() print(f"肘部法则图已保存为 {elbow_path}") # 自动检测拐点(二阶差分最小值) if len(sse) >= 3: knee_point = np.argmin(np.diff(sse, 2)) + 2 # 计算二阶差分 optimal_k = max(2, min(10, knee_point)) # 确保K在2-10之间 else: optimal_k = max(2, min(3, n_samples)) # 样本少时选择2或3 print(f"自动检测的最佳聚类数: K = {optimal_k}") # ====================== # 5. K-means聚类 # ====================== print(f"正在进行K-means聚类 (K={optimal_k})...") # 调整K值不超过样本数 if optimal_k > n_samples: optimal_k = max(1, n_samples) print(f"调整聚类数为样本数: K = {optimal_k}") # 使用已有模型或新建模型 if optimal_k in kmeans_models: kmeans = kmeans_models[optimal_k] cluster_labels = kmeans.labels_ else: kmeans = KMeans(n_clusters=optimal_k, n_init=10, random_state=42) cluster_labels = kmeans.fit_predict(scaled_features) # 计算轮廓系数(K>1时) if optimal_k > 1: silhouette_avg = silhouette_score(scaled_features, cluster_labels) print(f"轮廓系数: {silhouette_avg:.4f} (接近1表示聚类效果良好)") else: print("聚类数为1,无需计算轮廓系数") # ====================== # 6. 结果分析与可视化(新增域名显示) # ====================== # 创建结果DataFrame result_df = pd.DataFrame({ '源IP': [flow[0] for flow in valid_flows], '目的IP': [flow[1] for flow in valid_flows], '协议': [f"TCP(6)" if flow[4] == 6 else f"UDP(17)" for flow in valid_flows], '聚类标签': cluster_labels, '源起始时间': src_start_times, '源终止时间': src_end_times, '包数量': feature_matrix[:, 0], '总字节数': feature_matrix[:, 1] }) # 新增:添加域名信息 def get_domains(ip): """获取IP对应的域名列表""" domains = dns_cache.get(ip, set()) return ", ".join(domains) if domains else "N/A" result_df['目的域名'] = result_df['目的IP'].apply(get_domains) # 新增:时间格式化函数 def format_timestamp(ts): """将时间戳格式化为可读字符串""" if np.isnan(ts): return "N/A" return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # 保留毫秒 # 应用时间格式化 result_df['源起始时间'] = result_df['源起始时间'].apply(format_timestamp) result_df['源终止时间'] = result_df['源终止时间'].apply(format_timestamp) # 只保留需要的列 required_columns = ['源IP', '目的IP', '协议', '源起始时间', '源终止时间', '目的域名', '包数量', '总字节数'] result_df = result_df[required_columns] # 生成结果文件名(带时间戳) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_filename = f'cluster_result_{timestamp}.csv' output_path = os.path.join(output_dir, output_filename) try: result_df.to_csv(output_path, index=False, encoding='utf_8_sig') print(f"聚类结果已保存为 {output_path}") except Exception as e: print(f"保存结果文件失败: {str(e)}") # 尝试临时目录作为备选 temp_dir = os.path.join(os.environ.get('TEMP', ''), 'netflow_clustering') os.makedirs(temp_dir, exist_ok=True) temp_path = os.path.join(temp_dir, output_filename) result_df.to_csv(temp_path, index=False, encoding='utf_8_sig') print(f"聚类结果已保存为 {temp_path}") # 簇统计信息(使用聚类标签列) if '聚类标签' in result_df.columns: cluster_stats = result_df.groupby('聚类标签').agg({ '包数量': 'mean', '总字节数': 'mean' }).rename(columns={ '包数量': '平均包数量', '总字节数': '平均总字节数' }) print("\n各用户簇流量特征统计:") print(cluster_stats.round(2)) else: print("警告:结果中缺少聚类标签列,无法进行簇统计") # 使用PCA降维可视化 pca = PCA(n_components=2) # 降维到2D principal_components = pca.fit_transform(scaled_features) plt.figure(figsize=(12, 8)) scatter = plt.scatter( principal_components[:, 0], principal_components[:, 1], c=cluster_labels, # 按簇着色 cmap='viridis', alpha=0.6 ) # 标记簇中心 centers = pca.transform(kmeans.cluster_centers_) plt.scatter( centers[:, 0], centers[:, 1], c='red', s=200, alpha=0.9, marker='X', edgecolor='black' ) plt.colorbar(scatter, label='用户簇') plt.xlabel('主成分 1') plt.ylabel('主成分 2') plt.title(f'校园网用户流量聚类 (K={optimal_k})') plt.grid(alpha=0.3) # 可视化文件保存到输出目录 vis_filename = f'cluster_visualization_{timestamp}.png' vis_path = os.path.join(output_dir, vis_filename) plt.savefig(vis_path, dpi=300) plt.close() print(f"聚类可视化图已保存为 {vis_path}") # 最终输出 print("\n聚类完成! 结果文件已保存:") print(f"- {elbow_path}: 肘部法则图") print(f"- {output_path}: 详细聚类结果") print(f"- {vis_path}: 聚类可视化") 根据这段程序画出pcap分包流程

import os import time import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split from sklearn.metrics import mean_absolute_error, mean_squared_error import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader, TensorDataset from torch.optim.lr_scheduler import ReduceLROnPlateau # 设置随机种子确保结果可复现 torch.manual_seed(42) np.random.seed(42) sns.set_style('whitegrid') # 设备配置 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"使用设备: {device}") # 1. 数据加载与预处理函数 # -------------------------------------------------- def load_and_preprocess_data(): """加载并预处理所有数据源""" print("开始数据加载与预处理...") start_time = time.time() # 加载EC气象数据 ec_df = pd.read_csv('阿拉山口风电场_EC_data.csv', parse_dates=['生成日期', '预测日期']) ec_df = ec_df[ec_df['场站名'] == '阿拉山口风电场'] # 计算EC风速和风向 ec_df['EC风速(m/s)'] = np.sqrt(ec_df['U风分量(m/s)']**2 + ec_df['V风分量(m/s)']**2) ec_df['EC风向(度)'] = np.degrees(np.arctan2(ec_df['V风分量(m/s)'], ec_df['U风分量(m/s)'])) % 360 # 添加EC数据可用时间(生成时间+12小时) ec_df['可用时间'] = ec_df['生成日期'] + pd.Timedelta(hours=12) # 选择关键特征 ec_features = [ '可用时间', '预测日期', 'EC风速(m/s)', 'EC风向(度)', '位势高度_850hPa(gpm)', '温度_850hPa(K)', '相对湿度_850hPa(%)', '位势高度_500hPa(gpm)', '温度_500hPa(K)' ] ec_df = ec_df[ec_features] # 加载风机数据 turbine_df = pd.read_csv('阿拉山口风电场风机数据.csv', encoding='gbk', parse_dates=[0]) turbine_df.columns = ['timestamp', 'wind_speed', 'active_power'] # 加载远动数据 scada_df = pd.read_csv('阿拉山口风电场远动数据.csv', encoding='gbk', parse_dates=[0]) scada_df.columns = ['timestamp', 'active_power_total'] # 合并风机和远动数据 power_df = pd.merge(turbine_df[['timestamp', 'wind_speed']], scada_df, on='timestamp', how='outer') # 按时间排序并填充缺失值 power_df.sort_values('timestamp', inplace=True) power_df['active_power_total'].ffill(inplace=True) power_df['wind_speed'].ffill(inplace=True) # 创建完整的时间序列索引(15分钟间隔) full_range = pd.date_range( start=power_df['timestamp'].min(), end=power_df['timestamp'].max(), freq='15T' ) power_df = power_df.set_index('timestamp').reindex(full_range).reset_index() power_df.rename(columns={'index': 'timestamp'}, inplace=True) power_df[['wind_speed', 'active_power_total']] = power_df[['wind_speed', 'active_power_total']].ffill() # 合并EC数据到主数据集 ec_data = [] for idx, row in power_df.iterrows(): ts = row['timestamp'] # 获取可用的EC预测(可用时间 <= 当前时间) available_ec = ec_df[ec_df['可用时间'] <= ts] if not available_ec.empty: # 获取最近发布的EC数据 latest_gen = available_ec['可用时间'].max() latest_ec = available_ec[available_ec['可用时间'] == latest_gen] # 找到最接近当前时间点的预测 time_diff = (latest_ec['预测日期'] - ts).abs() closest_idx = time_diff.idxmin() ec_point = latest_ec.loc[closest_idx].copy() ec_point['timestamp'] = ts ec_data.append(ec_point) # 创建EC数据DataFrame并合并 ec_ts_df = pd.DataFrame(ec_data) merged_df = pd.merge(power_df, ec_ts_df, on='timestamp', how='left') # 填充缺失的EC数据 ec_cols = [col for col in ec_ts_df.columns if col not in ['timestamp', '可用时间', '预测日期']] for col in ec_cols: merged_df[col] = merged_df[col].interpolate(method='time') # 添加时间特征 merged_df['hour'] = merged_df['timestamp'].dt.hour merged_df['day_of_week'] = merged_df['timestamp'].dt.dayofweek merged_df['day_of_year'] = merged_df['timestamp'].dt.dayofyear merged_df['month'] = merged_df['timestamp'].dt.month # 计算实际风向(如果有测风塔数据,这里使用EC风向) merged_df['风向(度)'] = merged_df['EC风向(度)'] # 移除包含NaN的行 merged_df.dropna(inplace=True) # 特征选择 feature_cols = [ 'wind_speed', 'active_power_total', 'EC风速(m/s)', '风向(度)', '位势高度_850hPa(gpm)', '温度_850hPa(K)', '相对湿度_850hPa(%)', '位势高度_500hPa(gpm)', '温度_500hPa(K)', 'hour', 'day_of_week', 'day_of_year', 'month' ] target_col = 'active_power_total' print(f"数据处理完成! 耗时: {time.time()-start_time:.2f}秒") print(f"数据集形状: {merged_df.shape}") print(f"特征数量: {len(feature_cols)}") return merged_df[feature_cols], merged_df[target_col], merged_df['timestamp'] # 2. 数据准备类 (PyTorch Dataset) # -------------------------------------------------- class WindPowerDataset(Dataset): """风功率预测数据集类""" def __init__(self, X, y, look_back, forecast_steps): """ :param X: 特征数据 (n_samples, n_features) :param y: 目标数据 (n_samples,) :param look_back: 回溯时间步长 :param forecast_steps: 预测步长 """ self.X = X self.y = y self.look_back = look_back self.forecast_steps = forecast_steps self.n_samples = len(X) - look_back - forecast_steps + 1 def __len__(self): return self.n_samples def __getitem__(self, idx): # 获取历史序列 x_seq = self.X[idx:idx+self.look_back] # 获取未来目标序列 y_seq = self.y[idx+self.look_back:idx+self.look_back+self.forecast_steps] # 转换为PyTorch张量 x_tensor = torch.tensor(x_seq, dtype=torch.float32) y_tensor = torch.tensor(y_seq, dtype=torch.float32) return x_tensor, y_tensor # 3. LSTM模型 (PyTorch实现) # -------------------------------------------------- class WindPowerLSTM(nn.Module): """风功率预测LSTM模型""" def __init__(self, input_size, hidden_size, num_layers, output_steps): """ :param input_size: 输入特征维度 :param hidden_size: LSTM隐藏层大小 :param num_layers: LSTM层数 :param output_steps: 输出步长 """ super(WindPowerLSTM, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.output_steps = output_steps # LSTM层 self.lstm = nn.LSTM( input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, dropout=0.2 if num_layers > 1 else 0 ) # 全连接层 self.fc = nn.Sequential( nn.Linear(hidden_size, 128), nn.ReLU(), nn.Dropout(0.3), nn.Linear(128, 64), nn.ReLU(), nn.Dropout(0.2), nn.Linear(64, output_steps) ) def forward(self, x): # 初始化隐藏状态 h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) # 前向传播LSTM out, _ = self.lstm(x, (h0, c0)) # 只取最后一个时间步的输出 out = out[:, -1, :] # 全连接层 out = self.fc(out) return out # 4. 训练和评估函数 # -------------------------------------------------- def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, epochs, model_name): """训练模型""" print(f"\n开始训练 {model_name} 模型...") start_time = time.time() best_val_loss = float('inf') history = {'train_loss': [], 'val_loss': []} for epoch in range(epochs): # 训练阶段 model.train() train_loss = 0.0 for inputs, targets in train_loader: inputs, targets = inputs.to(device), targets.to(device) # 前向传播 outputs = model(inputs) loss = criterion(outputs, targets) # 反向传播和优化 optimizer.zero_grad() loss.backward() optimizer.step() train_loss += loss.item() * inputs.size(0) # 验证阶段 model.eval() val_loss = 0.0 with torch.no_grad(): for inputs, targets in val_loader: inputs, targets = inputs.to(device), targets.to(device) outputs = model(inputs) loss = criterion(outputs, targets) val_loss += loss.item() * inputs.size(0) # 计算平均损失 train_loss = train_loss / len(train_loader.dataset) val_loss = val_loss / len(val_loader.dataset) history['train_loss'].append(train_loss) history['val_loss'].append(val_loss) # 更新学习率 if scheduler: scheduler.step(val_loss) # 保存最佳模型 if val_loss < best_val_loss: best_val_loss = val_loss torch.save(model.state_dict(), f'best_{model_name}_model.pth') # 打印进度 if (epoch + 1) % 5 == 0: print(f"Epoch [{epoch+1}/{epochs}] - Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}") print(f"训练完成! 耗时: {time.time()-start_time:.2f}秒") print(f"最佳验证损失: {best_val_loss:.6f}") return history def evaluate_model(model, test_loader, scaler, model_name): """评估模型性能""" model.eval() actuals = [] predictions = [] with torch.no_grad(): for inputs, targets in test_loader: inputs = inputs.to(device) # 预测 outputs = model(inputs) # 反归一化 outputs_np = outputs.cpu().numpy() targets_np = targets.numpy() # 反归一化 outputs_inv = scaler.inverse_transform(outputs_np) targets_inv = scaler.inverse_transform(targets_np.reshape(-1, 1)).flatten() # 收集结果 actuals.extend(targets_inv) predictions.extend(outputs_inv.flatten()) # 转换为numpy数组 actuals = np.array(actuals) predictions = np.array(predictions) # 计算性能指标 mae = mean_absolute_error(actuals, predictions) rmse = np.sqrt(mean_squared_error(actuals, predictions)) print(f"\n{model_name} 模型评估结果:") print(f"MAE: {mae:.2f} kW") print(f"RMSE: {rmse:.2f} kW") return actuals, predictions, mae, rmse # 5. 可视化函数 # -------------------------------------------------- def plot_predictions(actuals, predictions, timestamps, model_name, forecast_steps, mae): """可视化预测结果""" # 创建结果DataFrame results = pd.DataFrame({ 'timestamp': timestamps, 'actual': actuals, 'predicted': predictions }) # 设置时间索引 results.set_index('timestamp', inplace=True) # 选择一段代表性的时间序列展示 sample = results.iloc[1000:1300] plt.figure(figsize=(15, 7)) # 绘制实际值 plt.plot(sample.index, sample['actual'], label='实际功率', color='blue', alpha=0.7, linewidth=2) # 绘制预测值 plt.plot(sample.index, sample['predicted'], label='预测功率', color='red', alpha=0.7, linestyle='--', linewidth=2) plt.title(f'{model_name}风功率预测 (预测步长: {forecast_steps}步, MAE: {mae:.2f} kW)', fontsize=14) plt.xlabel('时间', fontsize=12) plt.ylabel('有功功率 (kW)', fontsize=12) plt.legend(fontsize=12) plt.grid(True, linestyle='--', alpha=0.7) plt.xticks(rotation=45) plt.tight_layout() plt.savefig(f'{model_name}_prediction_plot.png', dpi=300) plt.show() return results def plot_training_history(history, model_name): """绘制训练过程中的损失曲线""" plt.figure(figsize=(12, 6)) # 绘制训练损失 plt.plot(history['train_loss'], label='训练损失') # 绘制验证损失 if 'val_loss' in history: plt.plot(history['val_loss'], label='验证损失') plt.title(f'{model_name} 训练过程', fontsize=14) plt.xlabel('训练轮次', fontsize=12) plt.ylabel('损失 (MSE)', fontsize=12) plt.legend(fontsize=12) plt.grid(True, linestyle='--', alpha=0.7) plt.tight_layout() plt.savefig(f'{model_name}_training_history.png', dpi=300) plt.show() # 6. 主函数 # -------------------------------------------------- def main(): # 加载数据 X, y, timestamps = load_and_preprocess_data() # 定义预测配置 ULTRA_SHORT_CONFIG = { 'name': '超短期', 'look_back': 24, # 6小时历史 (24*15min) 'forecast_steps': 16, # 4小时预测 (16*15min) 'batch_size': 64, 'hidden_size': 128, 'num_layers': 2, 'epochs': 100, 'lr': 0.001 } SHORT_TERM_CONFIG = { 'name': '短期', 'look_back': 96, # 24小时历史 (96*15min) 'forecast_steps': 288, # 72小时预测 (288*15min) 'batch_size': 32, 'hidden_size': 256, 'num_layers': 3, 'epochs': 150, 'lr': 0.0005 } # 准备超短期预测数据集 print("\n准备超短期预测数据集...") # 数据标准化 X_scaler = StandardScaler() y_scaler = StandardScaler() X_scaled = X_scaler.fit_transform(X) y_scaled = y_scaler.fit_transform(y.values.reshape(-1, 1)).flatten() # 创建数据集 dataset = WindPowerDataset(X_scaled, y_scaled, ULTRA_SHORT_CONFIG['look_back'], ULTRA_SHORT_CONFIG['forecast_steps']) # 划分数据集 train_size = int(0.8 * len(dataset)) val_size = int(0.1 * len(dataset)) test_size = len(dataset) - train_size - val_size train_dataset, val_dataset, test_dataset = torch.utils.data.random_split( dataset, [train_size, val_size, test_size] ) # 创建数据加载器 train_loader = DataLoader(train_dataset, batch_size=ULTRA_SHORT_CONFIG['batch_size'], shuffle=True) val_loader = DataLoader(val_dataset, batch_size=ULTRA_SHORT_CONFIG['batch_size']) test_loader = DataLoader(test_dataset, batch_size=ULTRA_SHORT_CONFIG['batch_size']) # 创建模型 model_ultra = WindPowerLSTM( input_size=X.shape[1], hidden_size=ULTRA_SHORT_CONFIG['hidden_size'], num_layers=ULTRA_SHORT_CONFIG['num_layers'], output_steps=ULTRA_SHORT_CONFIG['forecast_steps'] ).to(device) # 损失函数和优化器 criterion = nn.MSELoss() optimizer = optim.Adam(model_ultra.parameters(), lr=ULTRA_SHORT_CONFIG['lr']) scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True) # 训练模型 history_ultra = train_model( model_ultra, train_loader, val_loader, criterion, optimizer, scheduler, ULTRA_SHORT_CONFIG['epochs'], 'ultra_short' ) # 评估模型 actuals_ultra, preds_ultra, mae_ultra, rmse_ultra = evaluate_model( model_ultra, test_loader, y_scaler, '超短期' ) # 可视化结果 plot_training_history(history_ultra, '超短期模型') # 获取对应的时间戳 ultra_timestamps = timestamps[ULTRA_SHORT_CONFIG['look_back']:][:len(actuals_ultra)] results_ultra = plot_predictions( actuals_ultra, preds_ultra, ultra_timestamps, '超短期', ULTRA_SHORT_CONFIG['forecast_steps'], mae_ultra ) # 准备短期预测数据集 print("\n准备短期预测数据集...") # 创建数据集 dataset_short = WindPowerDataset(X_scaled, y_scaled, SHORT_TERM_CONFIG['look_back'], SHORT_TERM_CONFIG['forecast_steps']) # 划分数据集 train_size_short = int(0.8 * len(dataset_short)) val_size_short = int(0.1 * len(dataset_short)) test_size_short = len(dataset_short) - train_size_short - val_size_short train_dataset_short, val_dataset_short, test_dataset_short = torch.utils.data.random_split( dataset_short, [train_size_short, val_size_short, test_size_short] ) # 创建数据加载器 train_loader_short = DataLoader(train_dataset_short, batch_size=SHORT_TERM_CONFIG['batch_size'], shuffle=True) val_loader_short = DataLoader(val_dataset_short, batch_size=SHORT_TERM_CONFIG['batch_size']) test_loader_short = DataLoader(test_dataset_short, batch_size=SHORT_TERM_CONFIG['batch_size']) # 创建模型 model_short = WindPowerLSTM( input_size=X.shape[1], hidden_size=SHORT_TERM_CONFIG['hidden_size'], num_layers=SHORT_TERM_CONFIG['num_layers'], output_steps=SHORT_TERM_CONFIG['forecast_steps'] ).to(device) # 损失函数和优化器 optimizer_short = optim.Adam(model_short.parameters(), lr=SHORT_TERM_CONFIG['lr']) scheduler_short = ReduceLROnPlateau(optimizer_short, mode='min', factor=0.5, patience=10, verbose=True) # 训练模型 history_short = train_model( model_short, train_loader_short, val_loader_short, criterion, optimizer_short, scheduler_short, SHORT_TERM_CONFIG['epochs'], 'short_term' ) # 评估模型 actuals_short, preds_short, mae_short, rmse_short = evaluate_model( model_short, test_loader_short, y_scaler, '短期' ) # 可视化结果 plot_training_history(history_short, '短期模型') # 获取对应的时间戳 short_timestamps = timestamps[SHORT_TERM_CONFIG['look_back']:][:len(actuals_short)] results_short = plot_predictions( actuals_short, preds_short, short_timestamps, '短期', SHORT_TERM_CONFIG['forecast_steps'], mae_short ) # 最终报告 print("\n" + "="*50) print("风功率预测模型训练完成!") print("="*50) print(f"超短期模型 (4小时预测):") print(f" - 回溯步长: {ULTRA_SHORT_CONFIG['look_back']} (6小时)") print(f" - 预测步长: {ULTRA_SHORT_CONFIG['forecast_steps']} (4小时)") print(f" - 测试集MAE: {mae_ultra:.2f} kW") print(f" - 测试集RMSE: {rmse_ultra:.2f} kW") print(f"\n短期模型 (72小时预测):") print(f" - 回溯步长: {SHORT_TERM_CONFIG['look_back']} (24小时)") print(f" - 预测步长: {SHORT_TERM_CONFIG['forecast_steps']} (72小时)") print(f" - 测试集MAE: {mae_short:.2f} kW") print(f" - 测试集RMSE: {rmse_short:.2f} kW") print("="*50) # 保存预测结果 results_df = pd.DataFrame({ 'timestamp': short_timestamps, '实际功率': actuals_short, '超短期预测': results_ultra['predicted'].values[:len(actuals_short)], '短期预测': preds_short }) results_df.to_csv('风功率预测结果.csv', index=False) print("预测结果已保存到 '风功率预测结果.csv'") if __name__ == "__main__": main() 请在上述的代码基础上修改,本次修改要求只使用"阿拉山口风电场_EC_data"和"阿拉山口风电场风机数据"这两个数据集进行训练和预测。

帮我检查优化代码,尤其是减少内存占用:import pandas as pd import numpy as np import lightgbm as lgb from lightgbm import early_stopping, log_evaluation import gc import os import chardet from sklearn.model_selection import train_test_split from tqdm import tqdm import joblib from datetime import datetime from scipy.sparse import hstack, csr_matrix, save_npz, load_npz import sys import psutil from sklearn.metrics import log_loss, mean_absolute_error # 内存优化函数 def optimize_dtypes(df): """优化DataFrame的数据类型以减少内存占用""" if df.empty: return df # 转换整数列为最小可用类型 int_cols = df.select_dtypes(include=['int']).columns if not int_cols.empty: df[int_cols] = df[int_cols].apply(pd.to_numeric, downcast='integer') # 转换浮点列为最小可用类型 float_cols = df.select_dtypes(include=['float']).columns if not float_cols.empty: df[float_cols] = df[float_cols].apply(pd.to_numeric, downcast='float') # 转换对象列为分类类型 obj_cols = df.select_dtypes(include=['object']).columns for col in obj_cols: num_unique = df[col].nunique() num_total = len(df) if num_unique / num_total < 0.5: # 如果唯一值比例小于50% df[col] = df[col].astype('category') return df # 内存监控函数 def memory_monitor(step_name=""): """监控内存使用情况""" process = psutil.Process(os.getpid()) mem_info = process.memory_info() print(f"{step_name} 内存使用: {mem_info.rss / (1024 ** 2):.2f} MB") return mem_info.rss / (1024 ** 2) # 返回MB # 增强数据加载函数 def load_data_safely(file_path, usecols=None, dtype=None, chunksize=100000): """安全加载大型CSV文件,优化内存使用""" try: if not os.path.exists(file_path): print(f"⚠️ 文件不存在: {file_path}") return pd.DataFrame() # 自动检测编码 with open(file_path, 'rb') as f: result = chardet.detect(f.read(100000)) encoding = result['encoding'] if result['confidence'] > 0.7 else 'latin1' # 分批读取并优化内存 chunks = [] reader = pd.read_csv( file_path, encoding=encoding, usecols=usecols, dtype=dtype, chunksize=chunksize, low_memory=False ) for chunk in tqdm(reader, desc=f"加载 {os.path.basename(file_path)}"): # 优化分类列内存 for col in chunk.columns: if dtype and col in dtype and dtype[col] == 'category': chunk[col] = chunk[col].astype('category').cat.as_ordered() # 优化数据类型 chunk = optimize_dtypes(chunk) chunks.append(chunk) if chunks: result = pd.concat(chunks, ignore_index=True) # 再次整体优化 result = optimize_dtypes(result) return result return pd.DataFrame() except Exception as e: print(f"⚠️ 加载 {file_path} 失败: {str(e)}") return pd.DataFrame() # 稀疏矩阵转换函数 def to_sparse_matrix(df, columns): """将分类特征转换为稀疏矩阵表示""" sparse_matrices = [] for col in columns: if col in df.columns: # 处理NaN值 df[col] = df[col].fillna('MISSING') # 创建稀疏矩阵 sparse_mat = csr_matrix(pd.get_dummies(df[col], sparse=True).values) sparse_matrices.append(sparse_mat) # 水平堆叠所有稀疏矩阵 if sparse_matrices: return hstack(sparse_matrices) return None # 增量训练函数 def train_incremental(X, y, categorical_features, params, num_rounds=1000, chunk_size=100000): """分块增量训练模型以减少内存占用""" model = None for i in tqdm(range(0, len(X), chunk_size), desc="增量训练"): chunk_end = min(i + chunk_size, len(X)) X_chunk = X.iloc[i:chunk_end] y_chunk = y.iloc[i:chunk_end] train_data = lgb.Dataset( X_chunk, label=y_chunk, categorical_feature=categorical_features ) if model is None: model = lgb.train( params, train_data, num_boost_round=num_rounds, keep_training_booster=True ) else: model = lgb.train( params, train_data, num_boost_round=num_rounds, init_model=model, keep_training_booster=True ) return model # 主处理流程 def main(): """主处理流程,包含完整的内存优化策略""" # 初始内存监控 start_mem = memory_monitor("初始内存") # 定义内存优化的数据类型 dtypes = { 'did': 'category', 'vid': 'category', 'play_time': 'float32' } # 可选特征 optional_features = { 'item_cid': 'category', 'item_type': 'category', 'item_assetSource': 'category', 'item_classify': 'category', 'item_isIntact': 'category', 'sid': 'category', 'stype': 'category' } # 添加特征字段 for i in range(88): dtypes[f'f{i}'] = 'float32' # 加载核心数据 print("开始加载核心数据...") did_features = load_data_safely('did_features_table.csv', dtype=dtypes) vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) memory_monitor("加载核心数据后") # 添加可选特征到dtypes for feature, dtype in optional_features.items(): if not vid_info.empty and feature in vid_info.columns: dtypes[feature] = dtype # 重新加载数据以确保所有列使用正确的数据类型 if os.path.exists('did_features_table.csv'): did_features = load_data_safely('did_features_table.csv', dtype=dtypes) else: print("⚠️ did_features_table.csv 不存在") did_features = pd.DataFrame() if os.path.exists('vid_info_table.csv'): vid_info = load_data_safely('vid_info_table.csv', dtype=dtypes) else: print("⚠️ vid_info_table.csv 不存在") vid_info = pd.DataFrame() memory_monitor("重新加载数据后") # 加载历史数据 print("开始加载历史数据...") hist_exposure, hist_click, hist_play = load_historical_data(days=32) memory_monitor("加载历史数据后") # 构建点击数据集 if not hist_exposure.empty and not hist_click.empty: print("构建点击数据集...") click_train_data = build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1) else: print("⚠️ 无法构建点击数据集") click_train_data = pd.DataFrame() memory_monitor("构建点击数据集后") # 添加特征 if not click_train_data.empty: print("开始构建点击特征...") click_train_data = add_click_features( click_train_data, did_features, vid_info, hist_click, hist_play ) else: print("⚠️ 点击数据集为空,跳过特征构建") memory_monitor("添加特征后") # 准备训练数据 if not click_train_data.empty: if 'date' in click_train_data.columns: X = click_train_data.drop(columns=['did', 'vid', 'label', 'date'], errors='ignore') else: X = click_train_data.drop(columns=['did', 'vid', 'label'], errors='ignore') y = click_train_data['label'] else: X, y = pd.DataFrame(), pd.Series() print("⚠️ 点击训练数据为空") # 划分数据集 if not X.empty and not y.empty: X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) else: print("⚠️ 训练数据为空,无法进行模型训练") X_train, X_val, y_train, y_val = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() memory_monitor("划分数据集后") # 训练模型参数 params = { 'objective': 'binary', 'metric': 'binary_logloss', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.05, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'min_child_samples': 100, 'verbosity': -1 } # 增量训练点击模型 if not X_train.empty: print("开始训练点击预测模型...") model_click = train_incremental(X_train, y_train, categorical_features, params, num_rounds=1500, chunk_size=100000) # 在验证集上评估 val_preds = model_click.predict(X_val) val_logloss = log_loss(y_val, val_preds) print(f"验证集LogLoss: {val_logloss:.4f}") else: model_click = None print("⚠️ 训练数据为空,跳过点击预测模型训练") memory_monitor("训练点击模型后") # 构建完播率数据集 print("开始构建完播率数据集...") play_train_data = build_play_dataset(hist_play, vid_info, did_features, hist_click) memory_monitor("构建完播率数据集后") # 训练完播率模型 if not play_train_data.empty: X_play = play_train_data.drop(columns=['did', 'vid', 'play_time', 'item_duration', 'completion_rate'], errors='ignore') y_play = play_train_data['completion_rate'] else: X_play, y_play = pd.DataFrame(), pd.Series() print("⚠️ 完播率训练数据为空") if not X_play.empty and not y_play.empty: X_train_play, X_val_play, y_train_play, y_val_play = train_test_split( X_play, y_play, test_size=0.2, random_state=42 ) else: print("⚠️ 完播率训练数据为空,无法进行模型训练") X_train_play, X_val_play, y_train_play, y_val_play = pd.DataFrame(), pd.DataFrame(), pd.Series(), pd.Series() # 训练参数 params_reg = { 'objective': 'regression', 'metric': 'mae', 'boosting_type': 'gbdt', 'num_leaves': 63, 'learning_rate': 0.03, 'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5, 'lambda_l1': 0.1, 'lambda_l2': 0.1, 'min_data_in_leaf': 50, 'verbosity': -1 } # 增量训练完播率模型 if not X_train_play.empty: print("开始训练完播率模型...") model_play = train_incremental(X_train_play, y_train_play, play_categorical_features, params_reg, num_rounds=2000, chunk_size=100000) # 在验证集上评估 val_preds = model_play.predict(X_val_play) val_mae = mean_absolute_error(y_val_play, val_preds) print(f"验证集MAE: {val_mae:.4f}") else: model_play = None print("⚠️ 训练数据为空,跳过完播率模型训练") memory_monitor("训练完播率模型后") # 保存模型 if model_click: model_click.save_model('click_model.txt') print("点击预测模型已保存") if model_play: model_play.save_model('play_model.txt') print("完播率预测模型已保存") # 预测流程 print("开始加载预测数据...") to_predict_users = load_data_safely('testA_pred_did.csv', dtype={'did': 'category'}) to_predict_exposure = load_data_safely('testA_did_show.csv', dtype={'did': 'category', 'vid': 'category'}) # 执行预测 if not to_predict_users.empty and not to_predict_exposure.empty: print("开始生成预测结果...") submission = predict_for_test_data(to_predict_users, to_predict_exposure, did_features, vid_info) # 保存结果 if not submission.empty: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f'submission_{timestamp}.csv' submission.to_csv(output_file, index=False) print(f"预测结果已保存至: {output_file}") else: print("⚠️ 预测结果为空,未保存文件") else: print("⚠️ 预测数据加载失败,无法生成结果") # 最终内存报告 end_mem = memory_monitor("处理完成") print(f"总内存消耗: {end_mem - start_mem:.2f} MB") # 历史数据加载函数 def load_historical_data(days=32): """高效加载历史数据,支持分批处理""" see_list, click_list, play_list = [], [], [] for day in tqdm(range(1, days + 1), desc="加载历史数据"): day_str = f"{day:02d}" # 加载曝光数据 see_path = f'see_{day_str}.csv' if os.path.exists(see_path): see = load_data_safely(see_path, usecols=['did', 'vid'], dtype={'did': 'category', 'vid': 'category'}) if not see.empty and 'did' in see.columns and 'vid' in see.columns: see_list.append(see) del see gc.collect() # 加载点击数据 click_path = f'click_{day_str}.csv' if os.path.exists(click_path): click = load_data_safely(click_path, usecols=['did', 'vid', 'click_time'], dtype={'did': 'category', 'vid': 'category'}) if not click.empty and 'click_time' in click.columns and 'did' in click.columns and 'vid' in click.columns: # 优化日期处理 click['date'] = pd.to_datetime(click['click_time'], errors='coerce').dt.date click = click.drop(columns=['click_time'], errors='ignore') click_list.append(click[['did', 'vid', 'date']]) del click gc.collect() # 加载播放数据 play_path = f'playplus_{day_str}.csv' if os.path.exists(play_path): play = load_data_safely(play_path, usecols=['did', 'vid', 'play_time'], dtype={'did': 'category', 'vid': 'category'}) if not play.empty and 'play_time' in play.columns and 'did' in play.columns and 'vid' in play.columns: play_list.append(play) del play gc.collect() gc.collect() # 确保返回三个DataFrame return ( pd.concat(see_list).drop_duplicates(['did', 'vid']) if see_list else pd.DataFrame(), pd.concat(click_list).drop_duplicates(['did', 'vid']) if click_list else pd.DataFrame(), pd.concat(play_list).drop_duplicates(['did', 'vid']) if play_list else pd.DataFrame() ) # 点击数据集构建 def build_click_dataset(hist_exposure, hist_click, sample_ratio=0.1): """构建点击数据集,包含负样本采样""" if hist_exposure.empty or hist_click.empty: print("⚠️ 历史曝光或点击数据为空,无法构建数据集") return pd.DataFrame() # 标记正样本 hist_click = hist_click.copy() hist_click['label'] = 1 # 高效标记负样本 exposure_set = set(zip(hist_exposure['did'], hist_exposure['vid'])) click_set = set(zip(hist_click['did'], hist_click['vid'])) # 找出未点击的曝光 negative_set = exposure_set - click_set # 创建负样本DataFrame if negative_set: negative_dids, negative_vids = zip(*negative_set) negative_samples = pd.DataFrame({ 'did': list(negative_dids), 'vid': list(negative_vids), 'label': 0 }) # 采样负样本 if sample_ratio < 1.0: negative_samples = negative_samples.sample(frac=sample_ratio, random_state=42) else: negative_samples = pd.DataFrame(columns=['did', 'vid', 'label']) # 合并数据集 click_data = pd.concat([ hist_click[['did', 'vid', 'label']], negative_samples ], ignore_index=True) # 释放内存 del exposure_set, click_set, negative_set, negative_samples gc.collect() return click_data # 特征工程函数 def add_click_features(df, did_features, vid_info, hist_click, hist_play): """添加关键特征,避免内存溢出""" if df.empty: return df # 基础特征 if not did_features.empty and 'did' in did_features.columns: # 只取需要的列 did_cols = [col for col in did_features.columns if col not in ['did'] or col == 'did'] df = df.merge(did_features[did_cols], on='did', how='left') if not vid_info.empty and 'vid' in vid_info.columns: vid_cols = [col for col in vid_info.columns if col not in ['vid'] or col == 'vid'] df = df.merge(vid_info[vid_cols], on='vid', how='left') # 用户行为统计 if not hist_click.empty and 'did' in hist_click.columns: user_click_count = hist_click.groupby('did').size().rename('user_click_count') df = df.merge(user_click_count, on='did', how='left') else: df['user_click_count'] = 0 if not hist_play.empty and 'did' in hist_play.columns and 'play_time' in hist_play.columns: user_total_play = hist_play.groupby('did')['play_time'].sum().rename('user_total_play') df = df.merge(user_total_play, on='did', how='left') else: df['user_total_play'] = 0 if not hist_click.empty and 'vid' in hist_click.columns: video_click_count = hist_click.groupby('vid').size().rename('video_click_count') df = df.merge(video_click_count, on='vid', how='left') else: df['video_click_count'] = 0 if not hist_play.empty and 'vid' in hist_play.columns and 'play_time' in hist_play.columns: avg_play_time = hist_play.groupby('vid')['play_time'].mean().rename('avg_play_time') df = df.merge(avg_play_time, on='vid', how='left') else: df['avg_play_time'] = 0 # 填充缺失值 fill_values = { 'user_click_count': 0, 'user_total_play': 0, 'video_click_count': df['video_click_count'].median() if 'video_click_count' in df else 0, 'avg_play_time': df['avg_play_time'].median() if 'avg_play_time' in df else 0 } for col, value in fill_values.items(): if col in df: df[col] = df[col].fillna(value) # 添加时间相关特征 if 'date' in df: df['day_of_week'] = pd.to_datetime(df['date']).dt.dayofweek.astype('int8') df['hour'] = pd.to_datetime(df['date']).dt.hour.astype('int8') return df # 预测函数 def predict_for_test_data(test_users, test_exposure, did_features, vid_info): """为测试数据生成预测结果""" if test_users.empty or test_exposure.empty: print("⚠️ 测试数据为空,无法进行预测") return pd.DataFrame() # 合并测试数据 test_data = test_exposure.merge(test_users, on='did', how='left') # 添加特征 test_data = add_click_features( test_data, did_features, vid_info, pd.DataFrame(), # 无历史点击 pd.DataFrame() # 无历史播放 ) # 预测点击率 X_test = test_data.drop(columns=['did', 'vid', 'date'], errors='ignore') click_probs = [] if model_click and not X_test.empty: # 分块预测避免内存问题 click_probs = [] chunk_size = 100000 for i in range(0, len(X_test), chunk_size): chunk = X_test.iloc[i:i+chunk_size] click_probs.extend(model_click.predict(chunk)) else: click_probs = [0.5] * len(test_data) # 默认值 # 预测完播率 completion_rates = [] if model_play and not X_test.empty: # 添加视频时长信息 if not vid_info.empty and 'vid' in vid_info.columns and 'item_duration' in vid_info.columns: test_data = test_data.merge(vid_info[['vid', 'item_duration']], on='vid', how='left') else: test_data['item_duration'] = 1.0 # 分块预测 completion_rates = [] for i in range(0, len(X_test), chunk_size): chunk = X_test.iloc[i:i+chunk_size] completion_rates.extend(model_play.predict(chunk)) else: completion_rates = [0.7] * len(test_data) # 默认值 # 计算综合得分 test_data['click_prob'] = click_probs test_data['completion_rate'] = completion_rates test_data['score'] = test_data['click_prob'] * test_data['completion_rate'] # 为每个用户选择得分最高的视频 submission = test_data.sort_values('score', ascending=False).groupby('did').head(1) # 选择需要的列 submission = submission[['did', 'vid', 'completion_rate']].copy() # 重命名列 submission.columns = ['did', 'vid', 'completion_rate'] # 确保数据格式正确 submission['did'] = submission['did'].astype(str) submission['vid'] = submission['vid'].astype(str) submission['completion_rate'] = submission['completion_rate'].round(4) return submission # 主程序入口 if __name__ == "__main__": main()

最新推荐

recommend-type

Webdiy.net新闻系统v1.0企业版发布:功能强大、易操作

标题中提到的"Webdiy.net新闻系统 v1.0 企业版"是一个针对企业级应用开发的新闻内容管理系统,是基于.NET框架构建的。从描述中我们可以提炼出以下知识点: 1. **系统特性**: - **易用性**:系统设计简单,方便企业用户快速上手和操作。 - **可定制性**:用户可以轻松修改网站的外观和基本信息,例如网页标题、页面颜色、页眉和页脚等,以符合企业的品牌形象。 2. **数据库支持**: - **Access数据库**:作为轻量级数据库,Access对于小型项目和需要快速部署的场景非常合适。 - **Sql Server数据库**:适用于需要强大数据处理能力和高并发支持的企业级应用。 3. **性能优化**: - 系统针对Access和Sql Server数据库进行了特定的性能优化,意味着它能够提供更为流畅的用户体验和更快的数据响应速度。 4. **编辑器功能**: - **所见即所得编辑器**:类似于Microsoft Word,允许用户进行图文混排编辑,这样的功能对于非技术人员来说非常友好,因为他们可以直观地编辑内容而无需深入了解HTML或CSS代码。 5. **图片管理**: - 新闻系统中包含在线图片上传、浏览和删除的功能,这对于新闻编辑来说是非常必要的,可以快速地为新闻内容添加相关图片,并且方便地进行管理和更新。 6. **内容发布流程**: - **审核机制**:后台发布新闻后,需经过审核才能显示到网站上,这样可以保证发布的内容质量,减少错误和不当信息的传播。 7. **内容排序与类别管理**: - 用户可以按照不同的显示字段对新闻内容进行排序,这样可以突出显示最新或最受欢迎的内容。 - 新闻类别的动态管理及自定义显示顺序,可以灵活地对新闻内容进行分类,方便用户浏览和查找。 8. **前端展示**: - 系统支持Javascript前端页面调用,这允许开发者将系统内容嵌入到其他网页或系统中。 - 支持iframe调用,通过这种HTML元素可以将系统内容嵌入到网页中,实现了内容的跨域展示。 9. **安全性**: - 提供了默认的管理账号和密码(webdiy / webdiy.net),对于企业应用来说,这些默认的凭证需要被替换,以保证系统的安全性。 10. **文件结构**: - 压缩包文件名称为"webdiynetnews",这可能是系统的根目录名称或主要安装文件。 11. **技术栈**: - 系统基于ASP.NET技术构建,这表明它使用.NET框架开发,并且可以利用.NET生态中的各种库和工具来实现功能的扩展和维护。 在实施和部署这样的系统时,企业可能还需要考虑以下方面: - **可扩展性**:随着业务的增长,系统应该能容易地扩展,以支持更多的用户和内容。 - **安全性**:除了更改为安全的管理员凭证外,还需考虑防止SQL注入、XSS攻击等网络安全问题。 - **备份和恢复**:定期备份数据库和内容是很重要的,以防数据丢失或需要恢复到先前的状态。 - **多用户和权限管理**:企业环境可能需要多级用户权限设置,以适应不同的团队和角色。 - **SEO优化**:系统应支持良好的搜索引擎优化实践,以便于内容在搜索引擎中的排名和发现。 - **移动适应性**:由于移动设备的广泛使用,系统应具备良好的响应式设计,以提供良好的移动用户体验。 根据描述,该系统似乎是一个成熟的解决方案,适用于需要快速搭建新闻网站的企业用户,但同时也应该注意定期的维护和更新,以确保系统的稳定性和安全性。
recommend-type

【Windows 11用户的福音】:一步到位解决GX Works2安装问题,让兼容性不再是问题!

# 1. GX Works2简介与Windows 11兼容性问题 ## 简介GX Works2 GX Works2是由三菱电机开发的一款针对其PLC(可编程逻辑控制器)产品的编程和调试软件。该软件以其直观的用户界面和强大的编程功能闻名于工业自动化领域,支持从简单的逻辑控制到复杂的系统集成项目。随着Windows操作系统的迭代升级,尤其是新推
recommend-type

电磁加热双边谐振和单边谐振的区别

<think>我们正在处理关于电磁加热技术中双边谐振与单边谐振的工作原理及区别的问题。 首先,需要明确用户意图是了解这两种谐振方式的区别以及它们的工作原理。 我们将通过以下步骤进行: 1. 调用网络搜索工具,检索关于电磁加热中双边谐振和单边谐振的信息。 2. 整合检索到的内容,按照要求生成回答,包括工作原理和区别,并组织成多个方法或步骤(但避免使用步骤词汇,用换行分隔)。 3. 在回答中,如果有公式或代码,按照指定格式处理。 4. 最后,添加相关问题部分。 注意:避免使用第一人称,避免步骤词汇,引用内容不集中末尾,而是融入回答中。 根据搜索,电磁加热中的谐振通常指的是感应加
recommend-type

EnvMan源代码压缩包内容及功能解析

根据给定文件信息,我们需要生成关于“EnvMan-source.zip”这一压缩包的知识点。首先,由于提供的信息有限,我们无法直接得知EnvMan-source.zip的具体内容和功能,但可以通过标题、描述和标签中的信息进行推断。文件名称列表只有一个“EnvMan”,这暗示了压缩包可能包含一个名为EnvMan的软件或项目源代码。以下是一些可能的知识点: ### EnvMan软件/项目概览 EnvMan可能是一个用于环境管理的工具或框架,其源代码被打包并以“EnvMan-source.zip”的形式进行分发。通常,环境管理相关的软件用于构建、配置、管理和维护应用程序的运行时环境,这可能包括各种操作系统、服务器、中间件、数据库等组件的安装、配置和版本控制。 ### 源代码文件说明 由于只有一个名称“EnvMan”出现在文件列表中,我们可以推测这个压缩包可能只包含一个与EnvMan相关的源代码文件夹。源代码文件夹可能包含以下几个部分: - **项目结构**:展示EnvMan项目的基本目录结构,通常包括源代码文件(.c, .cpp, .java等)、头文件(.h, .hpp等)、资源文件(图片、配置文件等)、文档(说明文件、开发者指南等)、构建脚本(Makefile, build.gradle等)。 - **开发文档**:可能包含README文件、开发者指南或者项目wiki,用于说明EnvMan的功能、安装、配置、使用方法以及可能的API说明或开发者贡献指南。 - **版本信息**:在描述中提到了版本号“-1101”,这表明我们所见的源代码包是EnvMan的1101版本。通常版本信息会详细记录在版本控制文件(如ChangeLog或RELEASE_NOTES)中,说明了本次更新包含的新特性、修复的问题、已知的问题等。 ### 压缩包的特点 - **命名规范**:标题、描述和标签中的一致性表明这是一个正式发布的软件包。通常,源代码包的命名会遵循一定的规范,如“项目名称-版本号-类型”,在这里类型是“source”。 - **分发形式**:以.zip格式的压缩包进行分发,是一种常见的软件源代码分发方式。虽然较现代的版本控制系统(如Git、Mercurial)通常支持直接从仓库克隆源代码,但打包成zip文件依然是一种便于存储和传输的手段。 ### 可能的应用场景 - **开发环境配置**:EnvMan可能是用于创建、配置和管理开发环境的工具,这种工具在开发人员设置新的开发机或新的项目环境时非常有用。 - **自动化部署**:EnvMan可能包含自动化部署环境的脚本或命令,使得部署流程变得快捷且高效。 - **监控与维护**:作为环境管理工具,EnvMan可能还支持对环境的监控功能,包括系统资源监控、服务状态检查等,以保证生产环境的稳定性。 ### 总结 尽管以上知识点是基于有限的信息进行的假设性推论,但EnvMan-source.zip包可能是一个用于环境管理的软件或项目的源代码包。该软件或项目可能包含构建和部署自动化环境的能力,以及对运行时环境的监控和维护。文件命名的一致性暗示这是一个正式的版本发布。如果要深入了解EnvMan的功能与用法,建议直接查看压缩包中的文档或源代码注释。同时,考虑到源代码的开发,我们还应该探究该项目所使用的技术栈、编程语言以及版本控制工具等,这将有助于进一步了解EnvMan的技术细节。
recommend-type

【Windows 11终极解决方案】:彻底攻克GX Works2安装中难缠的.Net Framework 3.5障碍!

# 1. Windows 11与GX Works2简介 ## 1.1 Windows 11操作系统概览 Windows 11,作为微软最新的操作系统,不仅仅提供了一种现代的用户体验,而且加强了在企业环境中的安全性与生产力工具。其引入了全新的界面设计、改进的多任务处理以及对Android应用的支持,使它成为IT专业人
recommend-type

贝叶斯gmm

<think>首先,用户的问题是关于贝叶斯高斯混合模型(Bayesian Gaussian Mixture Model, BGMM)的原理和应用。我需要根据系统指令来构建回答。 系统指令包括: - 所有行内数学表达式使用 `$...$` 格式。 - 独立公式使用 `$$...$$` 格式并单独成段。 - LaTeX语法正确。 - 使用中文回答。 - 生成相关问题。 - 在回答中引用的段落末尾自然地添加引用标识,如 `[^1]`。 - 回答结构清晰,帮助用户逐步解决问题。 - 保证回答真实可靠。 参考提供的引用: - 引用[1]:讨论了高斯混合模型在机器学习中的应用,包括其优点和挑战。 -
recommend-type

威海卫国旅游网美化版网站建设意向表下载

根据提供的文件信息,我们可以提取以下知识点: 1. 网站建设意向表概念: 网站建设意向表是指在网站开发过程中,客户或项目负责人填写的一份表单,用以明确表达网站建设的需求、目标、功能、风格偏好等关键信息。它是项目开发前期沟通的载体,确保开发团队能够准确理解客户需求并据此进行网站设计和功能实现。 2. 美化版的含义: 美化版通常指的是对原有产品、设计或界面进行视觉上的改进,使之更加吸引人和用户体验更佳。在网站建设的上下文中,美化版可能指对网站的设计元素、布局、色彩搭配等进行更新和优化,从而提高网站的美观度和用户交互体验。 3. 代码和CSS的优化: 代码优化:指的是对网站的源代码进行改进,包括但不限于提高代码的执行效率、减少冗余、提升可读性和可维护性。这可能涉及代码重构、使用更高效的算法、减少HTTP请求次数等技术手段。 CSS优化:层叠样式表(Cascading Style Sheets, CSS)是一种用于描述网页呈现样式的语言。CSS优化可能包括对样式的简化、合并、压缩,使用CSS预处理器、应用媒体查询以实现响应式设计,以及采用更高效的选择器减少重绘和重排等。 4. 网站建设实践: 网站建设涉及诸多实践,包括需求收集、网站规划、设计、编程、测试和部署。其中,前端开发是网站建设中的重要环节,涉及HTML、CSS和JavaScript等技术。此外,还需要考虑到网站的安全性、SEO优化、用户体验设计(UX)、交互设计(UI)等多方面因素。 5. 文件描述中提到的威海卫国旅游网: 威海卫国旅游网可能是一个以威海地区旅游信息为主题的网站。网站可能提供旅游景点介绍、旅游服务预订、旅游攻略分享等相关内容。该网站的这一项目表明,他们关注用户体验并致力于提供高质量的在线服务。 6. 文件标签的含义: 文件标签包括“下载”、“源代码”、“源码”、“资料”和“邮件管理类”。这些标签说明该压缩文件中包含了可以下载的资源,具体内容是网站相关源代码以及相关的开发资料。另外,提到“邮件管理类”可能意味着在网站项目中包含了用于处理用户邮件订阅、通知、回复等功能的代码或模块。 7. 压缩文件的文件名称列表: 该文件的名称为“网站建设意向表 美化版”。从文件名称可以推断出该文件是一个表单,用于收集网站建设相关需求,且经过了视觉和界面的改进。 综合上述内容,可以得出结论,本表单文件是一个为特定网站建设项目设计的需求收集工具,经过技术优化并美化了用户界面,旨在提升用户体验,并且可能包含了邮件管理功能,方便网站运营者与用户进行沟通。该文件是一份宝贵资源,尤其是对于需要进行网站建设或优化的开发者来说,可以作为参考模板或直接使用。
recommend-type

【FPGA设计高手必读】:高效除法的实现与基2 SRT算法优化

# 1. FPGA设计中的高效除法基础 ## 为何高效除法在FPGA设计中至关重要 在数字电路设计领域,尤其是在现场可编程门阵列(FPGA)中,高效的除法器设计对于实现高性能运算至关重要。由于除法运算相对复杂,其硬件实现往往涉及大量的逻辑门和触发器,消耗的资源和执行时间较多。因此,开发者必须设计出既高效又节省资源的除法器,以适应FPGA设计的性能和资源限制。此外,随着应用领域对计算速度和精度要求的不断提升,传统算法无法满足新需求,这就推动了高效除法算法的研究与发展。 ## 高效除法实现的挑战 实现FPGA设计中的高效除法,面临着诸多挑战。首先,除法操作的固有延迟限制了整体电路的性能;其
recommend-type

单片机的NOP指令时间看单片机的什么速率

<think>嗯,用户问的是单片机中NOP指令的执行时间与什么速率有关。这个问题涉及到单片机最底层的时序特性,需要从时钟周期和机器周期的关系来解释。 用户可能正在学习单片机底层原理或者在做精确时序控制相关的开发。从问题表述来看,用户应该已经了解NOP指令的基本作用(空操作),现在想深入理解其时间特性。这类问题通常出现在需要精确延时或同步的场景中。 参考之前的引用内容,虽然引用[1]提到51单片机的RAM结构,引用[2]展示串口通信,引用[3]涉及定时器中断,但这些都与当前问题没有直接关联。需要完全从单片机时钟体系的角度回答。 核心要点在于: - 所有指令执行时间都取决于时钟源 - NOP
recommend-type

Delphi7视频教学:如何制作多页窗口

Delphi7是Borland公司推出的一个集成开发环境(IDE),用于开发多种类型的应用程序,特别是本地Windows应用程序。Delphi使用一种名为Object Pascal的编程语言,并提供丰富的组件库,使得开发工作更加高效和方便。在Delphi7时代,Delphi是许多开发者的首选工具,特别是在数据库和桌面应用程序开发方面。 在Delphi7视频教学第十九讲中,重点是教授如何制作多页窗口。多页窗口是一种用户界面元素,允许用户在多个页面之间切换,每个页面可以展示不同的信息或功能,类似于一个标签页式布局。这种界面设计在很多应用中都有应用,如设置面板、用户配置文件编辑器、电子商务网站的商品展示等。 在本讲中,教师可能会讲解以下几个关键知识点: 1. 使用TPageControl组件:TPageControl是Delphi提供的一个组件,专门用于实现多页窗口功能。它允许用户添加、删除和管理多个页面,每个页面是一个TTabSheet对象。 2. 页面的添加和管理:如何在TPageControl中添加新的页面,修改每个页面的属性(如标题、图标等),以及如何通过编程方式管理页面的切换。 3. 事件处理:在多页窗口中,每个页面可能需要不同的事件处理逻辑,比如按钮点击事件、输入框数据修改事件等。如何针对不同的页面编写合适的事件处理代码是本讲的一个重要部分。 4. 用户界面设计:如何设计用户友好的多页界面,如何利用Delphi的可视化设计器来拖放组件、布局和设计页面。 5. 切换和访问页面:实现页面间的切换可以有多种方法,例如通过按钮点击、菜单选择等。此外,如何通过代码访问和操作页面对象,例如获取当前活动页面或选择特定页面。 6. 数据管理:如果多页窗口是用于展示或输入数据,如何在各个页面间共享和管理数据,以及如何确保数据的一致性和同步更新。 7. 性能优化:多页窗口可能会包含许多组件和资源,需要考虑性能优化的问题,如减少页面切换时的闪烁、提高加载速度等。 8. 兼容性和国际化:制作的应用程序可能需要在不同的操作系统和语言环境中运行,如何确保多页窗口在不同环境下都能正确显示和工作,以及支持多语言界面。 通过这些内容的讲解和示例演示,学员可以掌握在Delphi7中创建和管理多页窗口的方法,进一步提升他们的应用程序开发能力。这不仅限于桌面应用程序,甚至对于理解Web应用中的多标签页面布局也有帮助。 教学视频中可能会包含示例项目“制作多页窗口”,通过实例操作,学员可以更直观地理解如何使用TPageControl组件来创建多页窗口,并在实际项目中应用这些技术。这样的实践是巩固学习成果的重要方式,也有利于提高学员解决实际开发问题的能力。 总结来看,Delphi7视频教学第十九讲——制作多页窗口是帮助学员深入理解Delphi IDE在用户界面设计方面的一个具体应用场景,通过本课程的学习,学员不仅能够掌握基本的多页窗口设计技巧,还能增强处理复杂用户界面和应用程序逻辑的能力。这对于提高个人在Delphi开发方面的专业水平以及面向未来的软件开发实践都是大有裨益的。