活动介绍

from pandas import Series import pandas as pd from numpy import NaN series_obj = Series([None, 4, NaN]) pd.isnull(series_obj)

时间: 2023-11-26 09:02:44 浏览: 136
你的代码有一些问题,应该是这样的: ```python from pandas import Series import pandas as pd from numpy import NaN series_obj = Series([None, 4, NaN]) pd.isnull(series_obj) ``` 这段代码的作用是创建一个Series对象,包含三个元素:None、4和NaN。然后使用pandas的isnull函数检查Series中的每个元素是否为缺失值(包括None和NaN),返回一个布尔类型的Series对象。 运行以上代码后,输出结果如下: ``` 0 True 1 False 2 True dtype: bool ``` 可以看到,第一个和第三个元素都是缺失值,因此对应的值为True,而第二个元素不是缺失值,对应的值为False。
相关问题

from pandas import Series import pandas as pd from numpy import NaN series_obj = Series([NaN, 3, None]) pd.isnull(series_obj)

这段代码的作用是创建一个Series对象,包含三个元素:NaN、3和None。然后使用pandas的isnull函数检查Series中的每个元素是否为缺失值(包括None和NaN),返回一个布尔类型的Series对象。 但是,这段代码有一个小错误,应该将NaN改为np.nan,因为在这个代码中,NaN并没有被定义。 因此,正确的代码应该是: ```python from pandas import Series import pandas as pd import numpy as np series_obj = Series([np.nan, 3, None]) pd.isnull(series_obj) ``` 运行以上代码后,输出结果如下: ``` 0 True 1 False 2 True dtype: bool ``` 可以看到,第一个和第三个元素都是缺失值,因此对应的值为True,而第二个元素不是缺失值,对应的值为False。

import oml import xml.etree.ElementTree as ET from datetime import datetime, timedelta import re import numpy as np import pandas as pd import warnings import os import matplotlib.pyplot as plt from sklearn.cluster import KMeans from sklearn.preprocessing import StandardScaler import seaborn as sns import matplotlib.dates as mdates warnings.filterwarnings('ignore') PIC_DIR = "pic" if not os.path.exists(PIC_DIR): os.makedirs(PIC_DIR) def format_timestamp(ts): """格式化时间戳为Oracle可接受的字符串""" return ts.strftime("%Y-%m-%d %H:%M:%S.%f")[:23] def is_holiday(date): """判断日期是否为休息日(简化实现)""" # 在实际应用中,这里应该连接到日历表或API # 周末判断(周六=5,周日=6) if date.weekday() in [5, 6]: return True # 这里可以添加特定假期的判断 # 例如:if date.month == 1 and date.day == 1: return True return False def find_previous_workday(target_date): """找到前一个工作日""" current = target_date - timedelta(days=1) while is_holiday(current): current -= timedelta(days=1) return current def calculate_std_dev(df, exclude_columns): """计算数据框的标准差(排除指定列)""" # 排除不需要的列 numeric_cols = [col for col in df.columns if col not in exclude_columns] if not numeric_cols: return 0, {} # 计算每列的标准差 col_std = {} for col in numeric_cols: if df[col].dtype in [np.int64, np.float64]: col_std[col] = df[col].std() # 计算所有数值列合并后的标准差 all_values = [] for col in numeric_cols: if df[col].dtype in [np.int64, np.float64]: all_values.extend(df[col].values) total_std = np.std(all_values) if all_values else 0 return total_std, col_std def get_relative_change(current, reference): """计算相对变化率,处理除零问题""" if reference == 0 and current == 0: return 0 # 两者都为零,无变化 elif reference == 0: return float('inf') # 参考值为零,当前值非零 else: return abs(current - reference) / reference * 100 def classify_change(change_rate): """分类变化率,处理无穷大情况""" if change_rate == float('inf'): return "极端变化(参考值为零)", "m4" elif change_rate < 5: return "正常波动(可忽略)", "m1" elif 5 <= change_rate < 10: return "值得关注的变化", "m2" elif 10 <= change_rate < 20: return "显著变化", "m3" else: return "重大变化", "m4" def format_change_rate(change_rate): """格式化变化率显示""" if change_rate == float('inf'): return "无穷大" else: return f"{change_rate:.2f}%" def get_data_samples(cursor, obj_name, inst_number, target_time, num_samples=10): """获取目标时间点之前的样本数据,处理标准差为零的情况""" # 获取目标时间点之前的10个采样点 query = f""" SELECT * FROM "{obj_name}" WHERE instance_number = {inst_number} AND snap_time < TO_TIMESTAMP('{format_timestamp(target_time)}', 'YYYY-MM-DD HH24:MI:SS.FF3') ORDER BY snap_time DESC FETCH FIRST {num_samples} ROWS ONLY """ cursor.execute(query) result = cursor.fetchall() if not result: return None, None, None # 获取列名 col_names = [col[0] for col in cursor.description] # 创建DataFrame df = pd.DataFrame(result, columns=col_names) # 排除不需要的列 exclude_cols = ['SNAP_TIME', 'INSTANCE_NUMBER', 'PRED', 'PROB', 'ANOMALYDETAILS'] # 计算标准差 total_std, col_std = calculate_std_dev(df, exclude_cols) # 如果标准差为零,添加微小噪声避免除零错误 if total_std == 0: total_std = 1e-6 for col in col_std: if col_std[col] == 0: col_std[col] = 1e-6 return df, total_std, col_std def generate_description(attr): """生成指标问题描述""" rank = attr['rank'] weight = attr['weight'] name = attr['name'] if rank == 1: return f"主要异常指标({weight*100:.0f}%权重),直接影响系统稳定性" elif weight > 0.7: return f"关键异常指标({weight*100:.0f}%权重),需要立即处理" elif weight > 0.3: return f"重要异常指标({weight*100:.0f}%权重),建议优先排查" else: return f"参考指标({weight*100:.0f}%权重),可作为辅助分析" def escape_sql(text): """转义SQL中的特殊字符""" return text.replace("'", "''").replace("\n", " ") def analyze_anomalies(): cursor = None conn = None obj_name, anom_time, anom_inst = None, None, None pic_paths = [] # 存储生成的图片路径 m4_indicators = [] # 存储m4级别的指标名称 try: # 1. 连接数据库 conn = oml.connect(user='aiopsoraoem', password='oracle', host='localhost', port=1521, service_name='aiopspdb') print("数据库连接成功") # 创建游标对象用于执行SQL cursor = oml.cursor() # 2. 获取待分析的异常记录 query = """ SELECT object_name, anomalies_time, anomalies_inst_number FROM RT_RESULT WHERE alert_analysis_status IS NULL AND anomalies_time > SYSDATE - 10/1440 ORDER BY anomalies_time DESC FETCH FIRST 1 ROWS ONLY """ # 执行查询 cursor.execute(query) result = cursor.fetchall() if not result: print("没有需要分析的异常记录") return # 提取数据 obj_name = result[0][0] anom_time = result[0][1] anom_inst = int(result[0][2]) print(f"分析记录: {obj_name} at {anom_time} (实例 {anom_inst})") # 3. 更新状态为doing update_query = f""" UPDATE RT_RESULT SET alert_analysis_status = 'doing' WHERE object_name = '{escape_sql(obj_name)}' AND anomalies_time = TO_TIMESTAMP('{format_timestamp(anom_time)}', 'YYYY-MM-DD HH24:MI:SS.FF3') AND anomalies_inst_number = {anom_inst} """ cursor.execute(update_query) cursor.connection.commit() print("状态更新为doing") # 4. 获取异常详情 detail_query = f""" SELECT ANOMALYDETAILS FROM "{obj_name}" WHERE snap_time = TO_TIMESTAMP('{format_timestamp(anom_time)}', 'YYYY-MM-DD HH24:MI:SS.FF3') AND instance_number = {anom_inst} """ # 执行查询 cursor.execute(detail_query) details_result = cursor.fetchall() if not details_result: raise Exception(f"{obj_name}表中未找到匹配记录") # 获取XML数据 xml_data = details_result[0][0] # 处理XML命名空间问题 xml_data = re.sub(r'\sxmlns="[^"]+"', '', xml_data, count=1) root = ET.fromstring(xml_data) attributes = [] for attr in root.findall('.//Attribute'): try: attr_data = { 'name': attr.get('name'), 'value': float(attr.get('actualValue')), 'weight': float(attr.get('weight')), 'rank': int(attr.get('rank')) } attributes.append(attr_data) except (TypeError, ValueError) as e: print(f"解析属性时出错: {e}") # 按rank排序 attributes.sort(key=lambda x: x['rank']) # 5. 交叉验证 - 获取异常点前10个采样点 print("获取异常点前10个采样点数据...") anomaly_df, B_total_std, C_col_std = get_data_samples(cursor, obj_name, anom_inst, anom_time) if anomaly_df is None: raise Exception(f"未找到异常点前10个采样点数据: {obj_name} at {anom_time} (实例 {anom_inst})") # 6. 获取正常模式数据 print("获取正常模式数据...") # P1: 一天前相同时间点(跳过休息日) p1_time = anom_time - timedelta(days=1) if is_holiday(p1_time): p1_time = find_previous_workday(p1_time) day_ago_df, A1_total_std, A2_col_std = get_data_samples(cursor, obj_name, anom_inst, p1_time) # P2: 一周前相同时间点 p2_time = anom_time - timedelta(weeks=1) week_ago_df, B1_total_std, B2_col_std = get_data_samples(cursor, obj_name, anom_inst, p2_time) # P3: 一个月前相同日期(跳过休息日) p3_time = anom_time - timedelta(days=30) if is_holiday(p3_time): p3_time = find_previous_workday(p3_time) month_ago_df, C1_total_std, C2_col_std = get_data_samples(cursor, obj_name, anom_inst, p3_time) # 如果正常模式数据为空,使用默认值 if day_ago_df is None: print("警告: 未找到一天前相同时间点数据") day_ago_df = pd.DataFrame() A1_total_std, A2_col_std = 0, {} if week_ago_df is None: print("警告: 未找到一周前相同时间点数据") week_ago_df = pd.DataFrame() B1_total_std, B2_col_std = 0, {} if month_ago_df is None: print("警告: 未找到一个月前相同日期数据") month_ago_df = pd.DataFrame() C1_total_std, C2_col_std = 0, {} # 7. 生成验证结果 validation_results = "\n\n===== 交叉验证结果 =====\n" m4_changes = [] m4_indicator_set = set() # 用于存储m4级别的指标名称 # 第一轮验证:总标准差比较 validation_results += "\n第一轮验证:总标准差比较\n" validation_results += f"异常模式总标准差 (B): {B_total_std:.4f}\n" # 比较1:B vs A1 if A1_total_std is not None: change_rate = get_relative_change(B_total_std, A1_total_std) change_desc, change_class = classify_change(change_rate) rate_str = format_change_rate(change_rate) comp_result = f"与一天前相同时间点 (A1: {A1_total_std:.4f}) 比较: 变化率 {rate_str} - {change_desc} ({change_class})\n" validation_results += comp_result if change_class == "m4": m4_changes.append(comp_result.strip()) #m4_indicator_set.add("总标准差") # 比较2:B vs B1 if B1_total_std is not None: change_rate = get_relative_change(B_total_std, B1_total_std) change_desc, change_class = classify_change(change_rate) rate_str = format_change_rate(change_rate) comp_result = f"与一周前相同时间点 (B1: {B1_total_std:.4f}) 比较: 变化率 {rate_str} - {change_desc} ({change_class})\n" validation_results += comp_result if change_class == "m4": m4_changes.append(comp_result.strip()) #m4_indicator_set.add("总标准差") # 比较3:B vs C1 if C1_total_std is not None: change_rate = get_relative_change(B_total_std, C1_total_std) change_desc, change_class = classify_change(change_rate) rate_str = format_change_rate(change_rate) comp_result = f"与一个月前相同日期 (C1: {C1_total_std:.4f}) 比较: 变化率 {rate_str} - {change_desc} ({change_class})\n" validation_results += comp_result if change_class == "m4": m4_changes.append(comp_result.strip()) #m4_indicator_set.add("总标准差") # 第二轮验证:各列标准差比较 validation_results += "\n第二轮验证:各列标准差比较\n" # 只分析排名前3的指标 top_attributes = attributes[:3] for attr in top_attributes: col_name = attr['name'] validation_results += f"\n指标: {col_name} (当前值: {attr['value']}, 权重: {attr['weight']:.2f}, 排名: {attr['rank']})\n" # 异常模式该列标准差 col_std_b = C_col_std.get(col_name, 0) validation_results += f"异常模式标准差 (C): {col_std_b:.4f}\n" # 比较1:C vs A2 if A2_col_std and col_name in A2_col_std: col_std_a2 = A2_col_std[col_name] change_rate = get_relative_change(col_std_b, col_std_a2) change_desc, change_class = classify_change(change_rate) rate_str = format_change_rate(change_rate) comp_result = f"与一天前相同时间点 (A2: {col_std_a2:.4f}) 比较: 变化率 {rate_str} - {change_desc} ({change_class})\n" validation_results += comp_result if change_class == "m4": m4_changes.append(f"指标 {col_name}: {comp_result.strip()}") m4_indicator_set.add(col_name) # 比较2:C vs B2 if B2_col_std and col_name in B2_col_std: col_std_b2 = B2_col_std[col_name] change_rate = get_relative_change(col_std_b, col_std_b2) change_desc, change_class = classify_change(change_rate) rate_str = format_change_rate(change_rate) comp_result = f"与一周前相同时间点 (B2: {col_std_b2:.4f}) 比较: 变化率 {rate_str} - {change_desc} ({change_class})\n" validation_results += comp_result if change_class == "m4": m4_changes.append(f"指标 {col_name}: {comp_result.strip()}") m4_indicator_set.add(col_name) # 比较3:C vs C2 if C2_col_std and col_name in C2_col_std: col_std_c2 = C2_col_std[col_name] change_rate = get_relative_change(col_std_b, col_std_c2) change_desc, change_class = classify_change(change_rate) rate_str = format_change_rate(change_rate) comp_result = f"与一个月前相同日期 (C2: {col_std_c2:.4f}) 比较: 变化率 {rate_str} - {change_desc} ({change_class})\n" validation_results += comp_result if change_class == "m4": m4_changes.append(f"指标 {col_name}: {comp_result.strip()}") m4_indicator_set.add(col_name) # 7. 数据可视化 # 创建特定目录保存图片 timestamp_str = anom_time.strftime("%Y%m%d_%H%M%S") save_dir = os.path.join(PIC_DIR, f"{obj_name}_{anom_inst}_{timestamp_str}") if not os.path.exists(save_dir): os.makedirs(save_dir) # 准备所有样本数据用于聚类 all_data = [] labels = [] # 添加正常模式样本(只添加非空数据) if day_ago_df is not None and not day_ago_df.empty: all_data.append(day_ago_df) labels.extend(['Day Ago'] * len(day_ago_df)) if week_ago_df is not None and not week_ago_df.empty: all_data.append(week_ago_df) labels.extend(['Week Ago'] * len(week_ago_df)) if month_ago_df is not None and not month_ago_df.empty: all_data.append(month_ago_df) labels.extend(['Month Ago'] * len(month_ago_df)) # 添加异常样本 if anomaly_df is not None and not anomaly_df.empty: all_data.append(anomaly_df) labels.extend(['Anomaly'] * len(anomaly_df)) # 检查是否有足够数据 if len(all_data) == 0: print("警告: 没有足够的数据进行可视化") else: # 合并所有数据 combined_df = pd.concat(all_data, ignore_index=True) # 排除非数值列 exclude_cols = ['SNAP_TIME', 'INSTANCE_NUMBER', 'PRED', 'PROB', 'ANOMALYDETAILS'] numeric_cols = [col for col in combined_df.columns if col not in exclude_cols and combined_df[col].dtype in [np.int64, np.float64]] if not numeric_cols: print("警告: 没有数值列可用于可视化") else: # 提取数值数据 data = combined_df[numeric_cols].values # 标准化数据 scaler = StandardScaler() scaled_data = scaler.fit_transform(data) # 使用K-Means聚类 kmeans = KMeans(n_clusters=2, random_state=42) clusters = kmeans.fit_predict(scaled_data) # 使用PCA降维可视化 from sklearn.decomposition import PCA pca = PCA(n_components=2) reduced_data = pca.fit_transform(scaled_data) # 创建聚类图 plt.figure(figsize=(12, 8)) # 为不同标签使用不同颜色 colors = {'Day Ago': 'blue', 'Week Ago': 'green', 'Month Ago': 'purple', 'Anomaly': 'red'} # 绘制所有点 for label in set(labels): idx = [i for i, l in enumerate(labels) if l == label] plt.scatter(reduced_data[idx, 0], reduced_data[idx, 1], c=colors[label], label=label, alpha=0.6) # 标记异常点(红色) anomaly_idx = [i for i, l in enumerate(labels) if l == 'Anomaly'] plt.scatter(reduced_data[anomaly_idx, 0], reduced_data[anomaly_idx, 1], c='red', marker='x', s=100, label='Anomaly Points') plt.title('K-Means Clustering of System Metrics') plt.xlabel('Principal Component 1') plt.ylabel('Principal Component 2') plt.legend() plt.grid(True) # 保存聚类图 kmeans_path = os.path.join(save_dir, 'kmeans_clustering.png') plt.savefig(kmeans_path) plt.close() pic_paths.append(kmeans_path) print(f"保存K-Means聚类图到: {kmeans_path}") # 创建时间序列图 plt.figure(figsize=(15, 10)) # 为每个指标创建子图 num_plots = min(5, len(numeric_cols)) # 最多显示5个指标 fig, axes = plt.subplots(num_plots, 1, figsize=(15, 5*num_plots)) if num_plots == 1: axes = [axes] # 确保单图时axes是列表 for i, col in enumerate(numeric_cols[:num_plots]): ax = axes[i] # 绘制正常模式(只绘制非空数据) if day_ago_df is not None and not day_ago_df.empty: day_ago_df['SNAP_TIME'] = pd.to_datetime(day_ago_df['SNAP_TIME']) ax.plot(day_ago_df['SNAP_TIME'], day_ago_df[col], 'b-', label='Day Ago', alpha=0.7) if week_ago_df is not None and not week_ago_df.empty: week_ago_df['SNAP_TIME'] = pd.to_datetime(week_ago_df['SNAP_TIME']) ax.plot(week_ago_df['SNAP_TIME'], week_ago_df[col], 'g-', label='Week Ago', alpha=0.7) if month_ago_df is not None and not month_ago_df.empty: month_ago_df['SNAP_TIME'] = pd.to_datetime(month_ago_df['SNAP_TIME']) ax.plot(month_ago_df['SNAP_TIME'], month_ago_df[col], 'm-', label='Month Ago', alpha=0.7) # 绘制异常点 if anomaly_df is not None and not anomaly_df.empty: anomaly_df['SNAP_TIME'] = pd.to_datetime(anomaly_df['SNAP_TIME']) # 只绘制异常点 ax.scatter(anomaly_df['SNAP_TIME'], anomaly_df[col], c='red', s=50, label='Anomaly', zorder=5) ax.xaxis.set_major_formatter(mdates.DateFormatter('%m-%d %H:%M')) ax.xaxis.set_major_locator(mdates.AutoDateLocator()) ax.set_title(f'Time Series: {col}') ax.set_xlabel('Time') ax.set_ylabel('Value') ax.legend(loc='upper right') ax.grid(True, linestyle='--', alpha=0.7) plt.setp(ax.get_xticklabels(), rotation=45, ha='right') plt.tight_layout() # 保存时间序列图 timeseries_path = os.path.join(save_dir, 'timeseries_comparison.png') plt.savefig(timeseries_path, bbox_inches='tight') plt.close() pic_paths.append(timeseries_path) print(f"保存时间序列图到: {timeseries_path}") # 评估哪种可视化更直观 evaluation = """ === 可视化方法评估 === 1. K-Means聚类图: - 优点: 展示数据点在高维空间的分布,可以清晰看到异常点与正常模式的分离 - 缺点: 需要降维处理,可能丢失部分信息;不直观展示时间变化 2. 时间序列图: - 优点: 直观展示指标随时间的变化趋势,容易识别异常点 - 缺点: 当指标过多时难以在同一图中展示 结论: 时间序列图更直观展示异常点信息,特别是当需要分析指标随时间的变化趋势时。 建议将时间序列图作为主要可视化工具,聚类图作为辅助分析工具。 """ print(evaluation) # 保存评估结论 with open(os.path.join(save_dir, 'visualization_evaluation.txt'), 'w') as f: f.write(evaluation) # 8. 生成分析报告 analysis_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") summary = f"检测到{len(attributes)}个异常指标" if m4_changes: summary += ",检测到指标有重大变化,请及时关注!" m4_warning = "\n\n===== 重大变化警告 =====\n" for i, change in enumerate(m4_changes, 1): m4_warning += f"{i}. {change}\n" summary += m4_warning details = f"异常分析报告 ({analysis_time})\n" details += f"目标对象: {obj_name}\n" details += f"异常时间: {anom_time}\n" details += f"实例编号: {anom_inst}\n" details += "="*50 + "\n" # 添加指标详情 for attr in attributes: details += ( f"指标 {attr['rank']}: {attr['name']}\n" f" 当前值: {attr['value']}\n" f" 影响权重: {attr['weight']:.2f}\n" f" 问题描述: {generate_description(attr)}\n\n" ) # 添加交叉验证结果 details += validation_results # 添加数据样本信息 details += "\n\n===== 数据样本详情 =====\n" # 定义要排除的列 exclude_columns = ['ANOMALYDETAILS'] # 异常点前10个采样点 if anomaly_df is not None: # 排除指定列 anomaly_df_display = anomaly_df.drop(columns=exclude_columns, errors='ignore') details += f"异常点前10个采样点:\n{anomaly_df_display.to_string()}\n\n" else: details += "异常点前10个采样点: 无数据\n\n" # 一天前相同时间点采样点 if day_ago_df is not None: # 排除指定列 day_ago_df_display = day_ago_df.drop(columns=exclude_columns, errors='ignore') details += f"一天前相同时间点采样点:\n{day_ago_df_display.to_string()}\n\n" else: details += "一天前相同时间点采样点: 无数据\n\n" # 一周前相同时间点采样点 if week_ago_df is not None: # 排除指定列 week_ago_df_display = week_ago_df.drop(columns=exclude_columns, errors='ignore') details += f"一周前相同时间点采样点:\n{week_ago_df_display.to_string()}\n\n" else: details += "一周前相同时间点采样点: 无数据\n\n" # 一个月前相同日期采样点 if month_ago_df is not None: # 排除指定列 month_ago_df_display = month_ago_df.drop(columns=exclude_columns, errors='ignore') details += f"一个月前相同日期采样点:\n{month_ago_df_display.to_string()}\n" else: details += "一个月前相同日期采样点: 无数据\n" print(details) # 9. 更新分析结果 # 根据是否有重大变化设置alert_status alert_status = "sending" if m4_changes else "normal" # 准备图片路径字符串 pic_paths_str = ",".join(pic_paths) if pic_paths else "无图片" # 准备m4指标字符串 m4_indicators_str = ",".join(m4_indicator_set) if m4_indicator_set else "无" update_query = """ UPDATE RT_RESULT SET alert_analysis_status = :status, alert_analysis_summary = :summary, alert_analysis_detail = :detail, alert_status = :alert_status, alert_analysis_pic = :pic_paths, alert_analysis_dar = :m4_indicators WHERE object_name = :obj_name AND anomalies_time = TO_TIMESTAMP(:anom_time, 'YYYY-MM-DD HH24:MI:SS.FF3') AND anomalies_inst_number = :inst_number """ params = { 'status': 'done', 'summary': summary, 'detail': details, 'alert_status': alert_status, 'pic_paths': pic_paths_str, 'm4_indicators': m4_indicators_str, 'obj_name': obj_name, 'anom_time': format_timestamp(anom_time), 'inst_number': anom_inst } cursor.execute(update_query, params) cursor.connection.commit() print("分析结果保存成功") print(f"图片路径: {pic_paths_str}") print(f"m4指标: {m4_indicators_str}") print(details) except Exception as e: print(f"处理失败: {str(e)}") # 更新状态为error if obj_name and anom_time and anom_inst is not None: error_update = f""" UPDATE RT_RESULT SET alert_analysis_status = 'error' WHERE object_name = '{escape_sql(obj_name)}' AND anomalies_time = TO_TIMESTAMP('{format_timestamp(anom_time)}', 'YYYY-MM-DD HH24:MI:SS.FF3') AND anomalies_inst_number = {anom_inst} """ try: if cursor: cursor.execute(error_update) cursor.connection.commit() except Exception as inner_e: print(f"更新错误状态时出错: {str(inner_e)}") finally: # 关闭游标 if cursor: cursor.close() # 关闭连接 if conn: conn.close() print("数据库连接关闭") if __name__ == "__main__": analyze_anomalies() ====================== 以上代码在《创建时间序列图》这段代码中画出来的图上图所示 都是按天,我的采样点比较密集,就容易混到一起,是否能按照我采集的数据来做图, 没有日期显示的就不用画出图了,也就是绘制时间序列时跳过无数据的区间。 请修改以上的相关代码段

<think>我们面对的问题是如何修改时间序列图的绘制方式,使其只绘制实际存在数据点的地方,并跳过无数据的区间。根据引用内容,我们通常使用Matplotlib来绘制时间序列图,但默认的绘制方式会将所有点按顺序连接起来,即使中间有缺失数据也会用直线连接,这可能导致图表混乱。 解决方案: 1. 确保数据是时间序列,并且索引是时间类型(如pandas的DatetimeIndex)。 2. 我们可以通过只绘制有数据的点,并在缺失数据的地方断开连接来实现。这可以通过在数据缺失的地方插入NaN值,这样Matplotlib在绘制时遇到NaN就会自动断开。 然而,用户要求跳过无数据的区间,即只绘制实际采集的数据点。这意味着我们不需要对缺失值进行插值,而是希望图表在缺失数据的地方不画线。 具体步骤: 方法一:如果数据中已经用NaN表示缺失值,那么直接使用`plt.plot`绘制,Matplotlib会自动跳过NaN值,只绘制有数据的点,并且在NaN处断开连接。 方法二:如果数据中没有显式的NaN,但存在时间间隔不连续,我们可以将时间序列数据重新采样为连续的时间序列,并在缺失的时间点插入NaN,然后再绘制。 但是,用户希望根据实际采集的数据点进行绘制,即只绘制实际存在的点,不进行插值。因此,我们只需要确保在数据缺失的地方不连接即可。 然而,如果数据点本身是离散的,我们也可以考虑使用散点图(scatter plot)或者只绘制点而不连接(使用样式参数)。但用户要求的是时间序列图,通常还是以折线图为主,只是希望断开无数据的区间。 因此,我们采用方法一:确保缺失值用NaN表示,然后绘制折线图。 示例代码修改: 假设我们有一个时间序列DataFrame,其中某些时间段没有数据(即缺失)。我们首先需要将缺失的数据点表示为NaN。如果原始数据已经是按时间排序,并且有缺失的时间点,我们可以通过重新采样(resample)来插入NaN。 但是,用户要求跳过无数据的区间,所以我们可以不进行重新采样,而是直接使用原始数据(原始数据中可能某些时间点缺失)。在绘制时,如果数据点缺失(即该时间点没有数据),那么折线图就会在缺失点处断开。 然而,如果我们的数据不是按时间均匀采样的,而是有间隔的,那么直接绘制折线图会在有数据的时间点之间画线,即使中间有间隔。这可能导致折线图在时间上跨越无数据的区间,而用户希望跳过无数据的区间,即不在没有数据的时间段画线。 所以,我们需要确保在数据缺失的时间段内不画线。方法就是让连续的数据点之间没有时间间隔(即时间索引是连续的),然后在缺失的时间点插入NaN。这样,在绘制时,两个有数据的点之间如果有NaN,那么这两段就会断开。 因此,我们可以先构建一个完整的时间索引(连续的时间序列),然后将原始数据对齐到这个索引上,缺失的数据自动填充为NaN。这样,在绘制时,折线图就会在NaN处断开。 步骤: 1. 生成一个完整的时间索引,从最小时间到最大时间,频率根据数据情况设定(比如按天、按小时等)。 2. 将原始数据重新索引到这个完整的时间索引上,这样缺失的时间点就会自动填充NaN。 3. 然后绘制折线图,Matplotlib会自动跳过NaN。 但是,用户要求根据实际采集的数据点进行绘制,跳过无数据的区间。这意味着我们不需要填充所有缺失的时间点,只需要在原始数据点之间如果有时间间隔(即不连续)就断开。实际上,如果我们不重新索引,那么折线图会在两个连续的数据点之间画线,即使它们的时间间隔很大(比如中间隔了几天)。这就会导致折线图跨越了无数据的区间。 所以,我们确实需要将数据重新索引到一个连续的时间序列上,并填充NaN,这样在两个有数据的点之间如果有缺失的时间点,折线图就会断开。 然而,这种方法可能会产生大量的NaN,特别是当时间范围很大而数据点很少时。但这是必要的,因为这样我们才能确保在数据缺失的区间内不画线。 另一种方法是:将数据按照实际采集的时间点绘制,但是将折线图改为只绘制点(用标记点),而不连接线。但这样就不是折线图了。或者,我们可以将折线图分段绘制,即把连续的数据段分别绘制。但这种方法比较复杂。 综合考虑,最简单有效的方法就是重新索引到连续时间序列,并用NaN填充缺失值,然后绘制折线图。 示例代码: 假设原始数据df的索引是时间,但时间不连续。 步骤: 1. 创建完整的时间索引(从最小时间到最大时间,按需要的频率) 2. 使用reindex方法将原始数据重新索引到完整的时间索引,缺失值填充为NaN 3. 绘制折线图 注意:频率的选择很重要,需要根据原始数据的采集频率来确定。如果原始数据是分钟级,但有些分钟缺失,那么我们可以按分钟重新索引。如果原始数据采集时间不规则,那么我们可以选择一个最小的间隔,或者按实际需要选择。 但是,如果原始数据的时间间隔不规则,且没有固定的频率,那么我们可以将时间索引设置为按天、小时等,或者按实际数据点的时间差的最小公倍数?这通常不现实。所以,我们需要根据实际情况选择频率,或者按数据点的时间戳排序后,找出最小间隔作为频率。 然而,用户要求跳过无数据的区间,所以只要两个数据点之间的时间间隔大于某个阈值,我们就认为中间有缺失,需要断开。但折线图本身是连续的,所以我们需要在缺失处插入NaN。 因此,我们可以这样: 生成一个连续的时间索引,频率设置为原始数据时间间隔的最小值(如果原始数据时间间隔变化不大)或者根据需求设定一个频率(比如1分钟,1小时等)。然后重新索引。 代码示例: 假设原始数据df,索引为时间(datetime类型),列名为'Value'。 ```python import matplotlib.pyplot as plt import pandas as pd import numpy as np # 假设df是原始数据,索引为时间 # 生成一个完整的时间索引,频率按需设置(这里假设按天,因为示例数据是按天生成的,但实际中根据数据情况) start_date = df.index.min() end_date = df.index.max() # 生成按天的连续索引 full_index = pd.date_range(start=start_date, end=end_date, freq='D') # 重新索引,缺失值填充为NaN df_reindexed = df.reindex(full_index) # 绘制时间序列图 plt.figure(figsize=(10, 6)) plt.plot(df_reindexed.index, df_reindexed['Value'], label='Value') plt.title('Time Series Data (with gaps)') plt.xlabel('Date') plt.ylabel('Value') plt.legend() plt.grid(True) plt.show() ``` 这样,在缺失数据的日期,折线图就会断开。 但是,如果原始数据采集频率不是每天一次,而是更密集(比如每小时),那么我们需要将频率设置为小时('H')才能准确反映缺失。否则,如果按天重采样,那么一天内的小时缺失就会被忽略。 因此,关键是根据原始数据的实际采集频率来设置重采样频率。如果原始数据采集时间不规则,没有固定频率,那么我们可以选择尽可能小的频率(比如秒),但这会导致索引非常大。所以,我们需要权衡。 另一种思路:我们并不需要真正生成一个连续的时间索引,而是将原始数据中连续的时间点之间的时间间隔大于某个阈值的地方断开。我们可以通过计算时间差,然后找到时间差大于阈值的位置,在这些位置插入NaN。这样,折线图就会在时间间隔大的地方断开。 具体步骤: 1. 对数据按时间索引排序。 2. 计算相邻时间点的时间差。 3. 设定一个阈值(比如1小时),如果相邻时间点的时间差大于阈值,则在两个点之间插入一个NaN行(具体可以创建一个新的Series,在时间差大的位置插入NaN)。 4. 绘制折线图。 这种方法可以避免创建完整的连续索引,减少NaN的数量。 代码示例: ```python # 假设df已经按时间索引排序 # 计算时间差(相邻) time_diffs = df.index.to_series().diff() # 设定阈值,例如1小时 threshold = pd.Timedelta('1 hour') # 找出时间差大于阈值的位置(注意:第一个位置是NaT,我们跳过) break_indices = time_diffs[time_diffs > threshold].index # 创建一个新的Series,用于绘图 # 首先复制原始数据 series_to_plot = df['Value'].copy() # 在需要断开的位置插入NaN # 注意:我们需要在原始数据中插入NaN,但插入的位置是在break_indices之前(即在两个点之间断开) # 但是,我们无法直接在原始索引位置插入,因为原始索引是时间戳,我们需要在断开的位置(即两个时间点之间)添加一个新的时间戳(比如前一个时间戳+1秒)并设置值为NaN? # 实际上,我们可以创建一个新的DataFrame,包含原始数据以及需要插入NaN的位置。插入的位置可以是两个时间点的中间时间?但这样可能不必要。 # 另一种方法:构建一个新的时间序列,将原始数据的时间点以及需要断开的位置(用NaN)都放进去。但这样需要重新排序。 # 更简单的方法:将时间差大于阈值的位置对应的原始数据的前一个点的值设置为NaN(在绘制时,这个点会与后一个点断开),但这样会改变原始数据。 # 或者,我们可以将原始数据拆分成多个连续的子序列,然后分别绘制。 # 拆分连续子序列的方法: def split_continuous_segments(df, threshold): # 按时间排序 df = df.sort_index() # 计算时间差 time_diffs = df.index.to_series().diff() # 找出大于阈值的位置,这些位置是断点 break_indices = time_diffs[time_diffs > threshold].index # 断点对应的位置(在原始df中的位置)我们记录为需要断开的位置 # 然后,我们根据断点将数据分段 segments = [] start_index = df.index[0] for break_index in break_indices: # 从start_index到break_index之前为一个段 segment = df.loc[start_index:break_index - pd.Timedelta('1ns')] # 减去1纳秒,确保不包含break_index segments.append(segment) start_index = break_index # 添加最后一段 segments.append(df.loc[start_index:]) return segments # 然后分别绘制每一段 plt.figure(figsize=(10, 6)) for segment in segments: plt.plot(segment.index, segment['Value'], 'b-') # 用蓝色实线绘制 plt.title('Time Series Data (with gaps)') plt.xlabel('Date') plt.ylabel('Value') plt.grid(True) plt.show() ``` 这种方法将时间序列拆分成多个连续的子序列,然后分别绘制,这样在断开的区间就不画线。但是,如果数据量很大,分段很多,可能会影响性能。 考虑到简单性和通用性,我们通常采用第一种方法(重新索引到连续时间序列并插入NaN)。但第一种方法可能会因为连续索引过大而占用内存。因此,如果数据量不大,推荐第一种方法;如果数据量很大,且缺失区间较少,则推荐第二种分段方法。 根据用户需求,我们选择一种方法实现。这里我们给出两种方法的代码,并说明适用场景。 方法一:重新索引(适用于数据量不大,且可以确定频率的情况) 方法二:分段绘制(适用于数据量较大,且缺失区间较少的情况) 用户可以根据实际情况选择。 相关问题: 1. 如何确定时间序列数据的重采样频率? 2. 在时间序列图中,除了折线图,还有哪些方式可以表示数据点的变化? 3. 如何处理大规模时间序列数据的可视化性能问题?</think>### 如何绘制跳过无数据区间的时间序列图 要解决时间序列图中因采样密集导致图表混乱的问题,关键是根据实际采集的数据点绘制,并跳过无数据区间。以下是两种实现方法及代码修改: --- ### 方法一:使用NaN断开连接(推荐) 在数据缺失的位置插入NaN值,Matplotlib会自动断开连接线: ```python import matplotlib.pyplot as plt import pandas as pd import numpy as np # 生成含缺失区间的示例数据 dates = pd.date_range(start='2023-01-01', periods=100, freq='H') data = np.random.randn(100).cumsum() # 创建数据缺口(删除部分数据点) df = pd.DataFrame(data, index=dates, columns=['Value']) df = df.drop(df.between_time('03:00', '06:00').index) # 删除3-6点数据 df = df.drop(df.between_time('12:00', '15:00').index) # 删除12-15点数据 # 绘制时间序列图(自动跳过NaN) plt.figure(figsize=(12, 6)) plt.plot(df.index, df['Value'], 'o-', markersize=4, label='实际数据点') # 'o-'显示数据点 plt.title('带数据缺口的时间序列图') plt.xlabel('时间') plt.ylabel('数值') plt.legend() plt.grid(True, linestyle='--', alpha=0.7) plt.show() ``` **关键修改说明**: 1. `df = df.drop(...)` 创建数据缺口 2. `plt.plot(..., 'o-')` 中的 `o-` 同时显示点和连线 3. Matplotlib **自动跳过NaN区域**,无需额外处理 --- ### 方法二:分段绘制连续数据块 当数据量极大时,可显式分段绘制连续数据段: ```python # 接续前面的数据准备代码 # 识别连续数据段 df['group'] = (df.index.to_series().diff() > pd.Timedelta('1H')).cumsum() # 分段绘制 plt.figure(figsize=(12, 6)) for _, group in df.groupby('group'): plt.plot(group.index, group['Value'], 'b-', linewidth=1.5) # 添加数据点标记 plt.plot(df.index, df['Value'], 'ro', markersize=3, label='数据点') plt.title('分段绘制的时间序列图') plt.xlabel('时间') plt.ylabel('数值') plt.legend() plt.grid(True) plt.show() ``` **分段绘制原理**: 1. `diff() > Timedelta` 识别时间间隔过大的位置 2. `cumsum()` 创建连续数据段的标识 3. 通过`groupby`分别绘制每个连续数据段 --- ### 效果对比 | 原始图表问题 | 改进后效果 | |-----------------------------|------------------------------| | 无数据区间被直线连接 | 无数据区间自动断开 | | 密集采样点导致线条混乱 | 只连接实际存在的数据点 | | 无法直观识别数据采集间隔 | 缺口清晰显示实际采样情况 | > **最佳实践**:对于常规需求,方法一更简洁高效;当需要精细控制缺口边界时,方法二更灵活[^1][^2]。 --- ### 相关问题 1. 如何处理时间序列数据中的大范围缺失值? 2. 如何动态调整时间序列图的显示密度? 3. 在实时数据流中如何高效更新带缺口的时间序列图? 4. 除了折线图,还有哪些可视化方式适合带缺口的时间序列数据? [^1]: Matplotlib是Python中最基础的绘图库之一,适用于各种类型的图表绘制。 [^2]: Python时间序列数据绘制折线图(Matplotlib篇-04)展示了如何对时间序列数据绘制时序图。
阅读全文

相关推荐

# -*- coding: utf-8 -*- import pandas as pd import numpy as np import matplotlib.pyplot as plt import matplotlib as mpl from scipy import stats import sys import os # ===== 中文字体支持 ===== plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei'] # 解决中文显示问题 plt.rcParams['axes.unicode_minus'] = False # 修复负号显示 # ===== 数据预处理 ===== def preprocess_data(data): """ 预处理近6年月度水沙通量数据 输入:DataFrame格式原始数据(需包含日期、月均流量、月均含沙量) 输出:处理后的月度时间序列(月流量均值、月排沙量均值) """ # 修复链式赋值警告(直接赋值替代inplace) data = data.copy() data['月均流量'] = data['月均流量'].fillna(data['月均流量'].mean()) data['月均含沙量'] = data['月均含沙量'].fillna(data['月均含沙量'].mean()) # 计算月排沙量(亿吨)[1](@ref) data['日期'] = pd.to_datetime(data['日期']) # 确保日期格式正确 days_in_month = data['日期'].dt.days_in_month data['月排沙量'] = data['月均流量'] * data['月均含沙量'] * days_in_month * 86400 * 1e-9 # 按月聚合(使用'ME'替代弃用的'M')[1,9](@ref) monthly_data = data.groupby(pd.Grouper(key='日期', freq='ME')).agg({ '月均流量': 'mean', '月排沙量': 'sum' }).reset_index() return monthly_data # ===== 累积距平法 ===== def cumulative_anomaly(series): """ 计算累积距平序列 输入:时间序列(月流量均值或月排沙量) 输出:累积距平序列、突变点索引 """ try: if series.size == 0 or series.isnull().all(): raise ValueError("输入序列为空或全为NaN值") mean_val = series.mean() anomalies = series - mean_val cum_anomaly = anomalies.cumsum() # 检测突变点(累积距平曲线一阶差分变号点)[5](@ref) diff_sign = np.sign(np.diff(cum_anomaly)) turning_points = np.where(np.diff(diff_sign) != 0)[0] + 1 return cum_anomaly, turning_points except Exception as e: print(f"累积距平计算错误: {str(e)}") return pd.Series(dtype=float), np.array([]) # ===== Mann-Kendall突变检验 ===== def mk_test(series, alpha=0.05): """ Mann-Kendall突变检验 输入:时间序列、显著性水平alpha 输出:UF统计量序列、UB统计量序列、突变点位置 """ n = len(series) UF = np.zeros(n) s = 0 # 计算UF统计量(修正索引范围)[1,9](@ref) for i in range(1, n): for j in range(i): if series[i] > series[j]: s += 1 elif series[i] < series[j]: s -= 1 E = i * (i - 1) / 4 Var = i * (i - 1) * (2 * i + 5) / 72 UF[i] = (s - E) / np.sqrt(Var) if Var > 0 else 0 # 计算UB统计量(确保与UF等长)[5](@ref) UB = -UF[::-1] # 关键修复:直接反向取负 # 检测显著突变点(UF与UB在置信区间内的交点)[1](@ref) critical_value = stats.norm.ppf(1 - alpha / 2) cross_points = [] for i in range(1, n): if (UF[i] * UB[i] < 0) and (abs(UF[i]) > critical_value): cross_points.append(i) return UF, UB, np.array(cross_points) # ===== 主程序 ===== if __name__ == "__main__": try: print("=" * 60) print("水沙通量突变性分析程序启动...") print("=" * 60) # 1. 数据加载(替换为实际数据路径)[1](@ref) # 模拟数据(实际使用时替换此部分) dates = pd.date_range('2016-01-01', '2021-12-31', freq='ME') np.random.seed(42) flow = np.random.normal(loc=1000, scale=200, size=len(dates)) sand = np.random.normal(loc=2.5, scale=0.8, size=len(dates)) raw_data = pd.DataFrame({ '日期': dates, '月均流量': flow, '月均含沙量': sand }) print("✅ 模拟数据生成完成(共{}条记录)".format(len(raw_data))) # 2. 数据预处理 processed_data = preprocess_data(raw_data) print("✅ 数据预处理完成(含排沙量计算)") # 3. 突变性分析(以月流量为例) flow_series = processed_data['月均流量'].dropna() if flow_series.size == 0: raise ValueError("流量序列为空,请检查数据!") # 3.1 累积距平法 cum_anomaly, turning_points = cumulative_anomaly(flow_series) print("📊 累积距平法检测到{}个突变点".format(len(turning_points))) # 3.2 M-K检验 UF, UB, cross_points = mk_test(flow_series.values) print("📈 MK检验检测到{}个显著突变点".format(len(cross_points))) # 4. 可视化 fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10), dpi=100) # 4.1 累积距平曲线 ax1.set_title('月流量累积距平曲线', fontsize=14) ax1.plot(processed_data['日期'], cum_anomaly, 'b-', label='累积距平') ax1.set_ylabel('累积距平值', fontsize=12) ax1.grid(True, linestyle='--', alpha=0.7) # 标记突变点 if len(turning_points) > 0: ax1.scatter( processed_data['日期'].iloc[turning_points], cum_anomaly.iloc[turning_points], c='red', s=80, label='突变点' ) for idx in turning_points: ax1.annotate( processed_data['日期'].dt.strftime('%Y-%m').iloc[idx], (processed_data['日期'].iloc[idx], cum_anomaly.iloc[idx]), xytext=(0, 15), textcoords='offset points', ha='center', fontsize=9 ) ax1.legend() # 4.2 M-K检验曲线 ax2.set_title('Mann-Kendall突变检验', fontsize=14) ax2.plot(processed_data['日期'], UF, 'r-', label='UF统计量') ax2.plot(processed_data['日期'], UB, 'b--', label='UB统计量') ax2.axhline(y=1.96, color='gray', linestyle='--', label='95%置信区间') ax2.axhline(y=-1.96, color='gray', linestyle='--') ax2.axhline(y=0, color='black', linewidth=0.8) ax2.set_ylabel('统计量值', fontsize=12) ax2.set_xlabel('日期', fontsize=12) ax2.grid(True, linestyle='--', alpha=0.7) # 标记显著突变点 if len(cross_points) > 0: ax2.scatter( processed_data['日期'].iloc[cross_points], UF[cross_points], c='black', s=100, label='显著突变点' ) for idx in cross_points: ax2.annotate( processed_data['日期'].dt.strftime('%Y-%m').iloc[idx], (processed_data['日期'].iloc[idx], UF[idx]), xytext=(0, 10), textcoords='offset points', ha='center', fontsize=9 ) ax2.legend() plt.tight_layout() # 5. 保存结果 output_path = os.path.join(os.getcwd(), '水沙通量突变性分析结果.png') plt.savefig(output_path, dpi=300, bbox_inches='tight') print("=" * 60) print(f"✅ 分析结果已保存至: {output_path}") print("=" * 60) # 6. 结果显示 plt.show() except Exception as e: exc_type, exc_obj, exc_tb = sys.exc_info() print("❌" * 60) print(f"程序异常终止 (行:{exc_tb.tb_lineno}): {str(e)}") print("❌" * 60) 这个代码运行之后说不支持中文,就干脆改成英文吧

import numpy as np import pandas as pd import matplotlib.pyplot as plt import matplotlib.dates as mdates import pywt from statsmodels.tsa.seasonal import seasonal_decompose from scipy import signal class HydrologicalAnalysis: def __init__(self, data_path=None): """ 水文时间序列分析工具 :param data_path: 数据文件路径(CSV格式) """ self.data = None if data_path: self.load_data(data_path) def load_data(self, file_path): """ 加载水文监测数据 :param file_path: CSV文件路径 """ # 读取数据并处理时间格式 self.data = pd.read_csv(file_path, parse_dates=['时间'], index_col='时间') # 检查必要字段 required_columns = ['水位(m)', '流量(m3/s)', '含沙量(kg/m3)'] if not all(col in self.data.columns for col in required_columns): raise ValueError("数据文件缺少必要列:水位(m), 流量(m3/s), 含沙量(kg/m3)") print(f"成功加载数据:{len(self.data)}条记录") return self.data def preprocess_data(self): """ 数据预处理流程 """ if self.data is None: raise ValueError("未加载数据,请先调用load_data方法") print("开始数据预处理...") # 1. 缺失值处理(前向填充+线性插值) self.data = self.data.ffill().interpolate(method='linear') # 2. 异常值处理(3σ原则) for col in ['流量(m3/s)', '含沙量(kg/m3)']: mean = self.data[col].mean() std = self.data[col].std() self.data[col] = np.where( (self.data[col] < mean - 3 * std) | (self.data[col] > mean + 3 * std), mean, self.data[col] ) # 3. 重采样为日数据 daily_data = self.data.resample('D').mean() print("数据预处理完成") return daily_data def seasonal_decomposition(self, series, period=12): """ 时间序列季节性分解 :param series: 时间序列数据 :param period: 季节性周期(月数据默认为12) :return: 分解结果 """ result = seasonal_decompose(series, model='additive', period=period) return result def wavelet_analysis(self, series, title='水文序列'): """ 小波分析主函数 :param series: 时间序列数据 :param title: 分析标题 :return: (小波系数, 主周期) """ # 1. 参数设置 scales = np.arange(1, 365) # 1天到1年尺度 wavelet = 'morl' # Morlet小波 # 2. 小波变换 coef, freqs = pywt.cwt(series, scales, wavelet) # 3. 小波方差计算 variance = np.mean(np.abs(coef) ** 2, axis=1) main_scale = scales[np.argmax(variance)] # 4. 可视化 self.plot_wavelet_results(series, scales, coef, variance, main_scale, title) return coef, main_scale def plot_wavelet_results(self, series, scales, coef, variance, main_scale, title): """ 绘制小波分析结果 """ plt.figure(figsize=(15, 12)) # 1. 原始序列图 plt.subplot(3, 1, 1) plt.plot(series.index, series) plt.title(f'{title} - 原始时间序列') plt.xlabel('日期') plt.ylabel('数值') plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m')) # 2. 小波系数实部等值线图 plt.subplot(3, 1, 2) plt.contourf(series.index, scales, np.real(coef), cmap='jet', levels=100) plt.colorbar(label='小波系数实部') plt.title(f'{title} - 小波系数实部等值线图') plt.ylabel('时间尺度(天)') plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m')) # 3. 小波方差图 plt.subplot(3, 1, 3) plt.plot(scales, variance) plt.axvline(main_scale, color='red', linestyle='--', label=f'主周期: {main_scale}天') plt.title(f'{title} - 小波方差分析') plt.xlabel('时间尺度(天)') plt.ylabel('方差') plt.legend() plt.tight_layout() plt.savefig(f'{title}_小波分析.png', dpi=300) plt.show() print(f"{title}主周期: {main_scale}天") def full_analysis(self): """ 完整分析流程 """ # 1. 数据预处理 daily_data = self.preprocess_data() # 2. 季节性分解 plt.figure(figsize=(15, 10)) for i, col in enumerate(['流量(m3/s)', '含沙量(kg/m3)'], 1): plt.subplot(2, 1, i) result = self.seasonal_decomposition(daily_data[col]) result.plot() plt.title(f'{col}季节性分解') plt.tight_layout() plt.savefig('季节性分解.png', dpi=300) plt.show() # 3. 小波分析 flow_coef, flow_period = self.wavelet_analysis( daily_data['流量(m3/s)'], '流量' ) sediment_coef, sediment_period = self.wavelet_analysis( daily_data['含沙量(kg/m3)'], '含沙量' ) # 4. 交叉小波分析(流量与含沙量关系) self.cross_wavelet_analysis( daily_data['流量(m3/s)'], daily_data['含沙量(kg/m3)'], '流量-含沙量' ) return { 'flow_period': flow_period, 'sediment_period': sediment_period } def cross_wavelet_analysis(self, series1, series2, title='交叉分析'): """ 交叉小波分析 :param series1: 第一个时间序列 :param series2: 第二个时间序列 :param title: 分析标题 """ # 1. 计算小波变换 scales = np.arange(1, 365) wavelet = 'morl' coef1, _ = pywt.cwt(series1, scales, wavelet) coef2, _ = pywt.cwt(series2, scales, wavelet) # 2. 计算交叉小波谱 cross_spectrum = coef1 * np.conj(coef2) # 3. 可视化 plt.figure(figsize=(15, 8)) # 交叉小波谱实部 plt.subplot(2, 1, 1) plt.contourf(series1.index, scales, np.real(cross_spectrum), cmap='RdBu_r', levels=100) plt.colorbar(label='实部') plt.title(f'{title} - 交叉小波谱实部') plt.ylabel('时间尺度(天)') plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m')) # 交叉小波谱相位 plt.subplot(2, 1, 2) phase = np.angle(cross_spectrum) plt.contourf(series1.index, scales, phase, cmap='hsv', levels=100) plt.colorbar(label='相位(弧度)') plt.title(f'{title} - 交叉小波谱相位') plt.xlabel('日期') plt.ylabel('时间尺度(天)') plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m')) plt.tight_layout() plt.savefig(f'{title}_交叉小波分析.png', dpi=300) plt.show() # ====================== # 使用示例 # ====================== if __name__ == "__main__": # 1. 创建分析对象 analyzer = HydrologicalAnalysis() # 2. 加载数据(替换为实际文件路径) # analyzer.load_data('水文监测数据.csv') # 3. 生成模拟数据(实际使用时请注释掉) print("生成模拟数据...") dates = pd.date_range(start='2016-01-01', end='2021-12-31', freq='D') flow = np.sin(2 * np.pi * dates.dayofyear / 365) * 100 + 500 + np.random.normal(0, 50, len(dates)) sediment = np.cos(2 * np.pi * dates.dayofyear / 365) * 2 + 5 + np.random.normal(0, 1, len(dates)) analyzer.data = pd.DataFrame({ '水位(m)': np.random.uniform(40, 45, len(dates)), '流量(m3/s)': flow, '含沙量(kg/m3)': sediment }, index=dates) # 4. 执行完整分析 results = analyzer.full_analysis() print("\n分析结果摘要:") print(f"流量主周期: {results['flow_period']}天") print(f"含沙量主周期: {results['sediment_period']}天") 这个运行结果说不支持中文,就改成英文吧

D:\tools\python\python.exe D:\历史项目留存2\诺褀2025\python加工浦发模型模拟\py搭建\pythonProject1\KS指标计算\KS02进阶版本.py 中文字体配置成功 数据库连接成功 成功读取 160 条记录 开始计算分组KS值... Traceback (most recent call last): File "D:\tools\python\Lib\site-packages\pandas\core\groupby\groupby.py", line 1825, in apply result = self._python_apply_general(f, self._selected_obj) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\tools\python\Lib\site-packages\pandas\core\groupby\groupby.py", line 1886, in _python_apply_general values, mutated = self._grouper.apply_groupwise(f, data, self.axis) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\tools\python\Lib\site-packages\pandas\core\groupby\ops.py", line 919, in apply_groupwise res = f(group) ^^^^^^^^ File "D:\历史项目留存2\诺褀2025\python加工浦发模型模拟\py搭建\pythonProject1\KS指标计算\KS02进阶版本.py", line 97, in calculate_ks total_good = len(group) - total_bad ~~~~~~~~~~~^~~~~~~~~~~ TypeError: unsupported operand type(s) for -: 'int' and 'str' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "D:\历史项目留存2\诺褀2025\python加工浦发模型模拟\py搭建\pythonProject1\KS指标计算\KS02进阶版本.py", line 206, in <module> results = main() ^^^^^^ File "D:\历史项目留存2\诺褀2025\python加工浦发模型模拟\py搭建\pythonProject1\KS指标计算\KS02进阶版本.py", line 134, in main grouped_results = df.groupby('group_key').apply(calculate_ks).reset_index() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\tools\python\Lib\site-packages\pandas\core\groupby\groupby.py", line 1847, in apply return self._python_apply_general(f, self._obj_with_exclusions) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\tools\python\Lib\site-packages\pandas\core\groupby\groupby.py", line 1886, in _python_apply_general values, mutated = self._grouper.apply_groupwise(f, data, self.axis) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "D:\tools\python\Lib\site-packages\pandas\core\groupby\ops.py", line 919, in apply_groupwise res = f(group) ^^^^^^^^ File "D:\历史项目留存2\诺褀2025\python加工浦发模型模拟\py搭建\pythonProject1\KS指标计算\KS02进阶版本.py", line 97, in calculate_ks total_good = len(group) - total_bad ~~~~~~~~~~~^~~~~~~~~~~ TypeError: unsupported operand type(s) for -: 'int' and 'str' Process finished with exit code 1

最新推荐

recommend-type

PLC控制变频器:三菱与汇川PLC通过485通讯板实现变频器正反转及调速控制

内容概要:本文介绍了如何利用三菱和汇川PLC通过485通讯板实现变频器的正转、反转及调速控制。主要内容涵盖硬件配置、软件编程、具体控制逻辑及上机测试。文中详细描述了各个步骤的操作方法和注意事项,包括关键寄存器的设置及其含义。程序中有详细的中文注释,便于理解和维护。最终通过上机测试验证系统的稳定性和可靠性。 适合人群:从事工业自动化领域的工程师和技术人员,尤其是熟悉PLC编程和变频器控制的专业人士。 使用场景及目标:适用于需要对电机进行精确控制的工业应用场景,如生产线、机械设备等。目标是提高控制系统灵活性和效率,确保系统稳定可靠。 其他说明:本文不仅提供理论指导,还附带实际操作经验,有助于读者更好地掌握相关技术和应用。
recommend-type

Python桌面版数独(五版)-优化选择模式触发新棋盘生成

Python桌面版数独(五版)-优化选择模式触发新棋盘生成
recommend-type

jmeter 下载使用5.6.3

jmeter 下载使用5.6.3
recommend-type

数据工程ETL工程师全解析:从数据抽取到加载的技术要点与职业发展路径

内容概要:文章详细介绍了ETL工程师这一职业,解释了ETL(Extract-Transform-Load)的概念及其在数据处理中的重要性。ETL工程师负责将分散、不统一的数据整合为有价值的信息,支持企业的决策分析。日常工作包括数据整合、存储管理、挖掘设计支持和多维分析展现。文中强调了ETL工程师所需的核心技能,如数据库知识、ETL工具使用、编程能力、业务理解能力和问题解决能力。此外,还盘点了常见的ETL工具,包括开源工具如Kettle、XXL-JOB、Oozie、Azkaban和海豚调度,以及企业级工具如TASKCTL和Moia Comtrol。最后,文章探讨了ETL工程师的职业发展路径,从初级到高级的技术晋升,以及向大数据工程师或数据产品经理的横向发展,并提供了学习资源和求职技巧。 适合人群:对数据处理感兴趣,尤其是希望从事数据工程领域的人士,如数据分析师、数据科学家、软件工程师等。 使用场景及目标:①了解ETL工程师的职责和技能要求;②选择适合自己的ETL工具;③规划ETL工程师的职业发展路径;④获取相关的学习资源和求职建议。 其他说明:随着大数据技术的发展和企业数字化转型的加速,ETL工程师的需求不断增加,尤其是在金融、零售、制造、人工智能、物联网和区块链等领域。数据隐私保护法规的完善也使得ETL工程师在数据安全和合规处理方面的作用更加重要。
recommend-type

基于51单片机的嵌入式系统开发_DS18B20温度传感器_LCD1602显示_I2C通信_24C02存储_Proteus仿真_Keil开发_温度监测与智能调控系统_包含温度上下限设.zip

基于51单片机的嵌入式系统开发_DS18B20温度传感器_LCD1602显示_I2C通信_24C02存储_Proteus仿真_Keil开发_温度监测与智能调控系统_包含温度上下限设.zip
recommend-type

Web前端开发:CSS与HTML设计模式深入解析

《Pro CSS and HTML Design Patterns》是一本专注于Web前端设计模式的书籍,特别针对CSS(层叠样式表)和HTML(超文本标记语言)的高级应用进行了深入探讨。这本书籍属于Pro系列,旨在为专业Web开发人员提供实用的设计模式和实践指南,帮助他们构建高效、美观且可维护的网站和应用程序。 在介绍这本书的知识点之前,我们首先需要了解CSS和HTML的基础知识,以及它们在Web开发中的重要性。 HTML是用于创建网页和Web应用程序的标准标记语言。它允许开发者通过一系列的标签来定义网页的结构和内容,如段落、标题、链接、图片等。HTML5作为最新版本,不仅增强了网页的表现力,还引入了更多新的特性,例如视频和音频的内置支持、绘图API、离线存储等。 CSS是用于描述HTML文档的表现(即布局、颜色、字体等样式)的样式表语言。它能够让开发者将内容的表现从结构中分离出来,使得网页设计更加模块化和易于维护。随着Web技术的发展,CSS也经历了多个版本的更新,引入了如Flexbox、Grid布局、过渡、动画以及Sass和Less等预处理器技术。 现在让我们来详细探讨《Pro CSS and HTML Design Patterns》中可能包含的知识点: 1. CSS基础和选择器: 书中可能会涵盖CSS基本概念,如盒模型、边距、填充、边框、背景和定位等。同时还会介绍CSS选择器的高级用法,例如属性选择器、伪类选择器、伪元素选择器以及选择器的组合使用。 2. CSS布局技术: 布局是网页设计中的核心部分。本书可能会详细讲解各种CSS布局技术,包括传统的浮动(Floats)布局、定位(Positioning)布局,以及最新的布局模式如Flexbox和CSS Grid。此外,也会介绍响应式设计的媒体查询、视口(Viewport)单位等。 3. 高级CSS技巧: 这些技巧可能包括动画和过渡效果,以及如何优化性能和兼容性。例如,CSS3动画、关键帧动画、转换(Transforms)、滤镜(Filters)和混合模式(Blend Modes)。 4. HTML5特性: 书中可能会深入探讨HTML5的新标签和语义化元素,如`<article>`、`<section>`、`<nav>`等,以及如何使用它们来构建更加标准化和语义化的页面结构。还会涉及到Web表单的新特性,比如表单验证、新的输入类型等。 5. 可访问性(Accessibility): Web可访问性越来越受到重视。本书可能会介绍如何通过HTML和CSS来提升网站的无障碍访问性,比如使用ARIA标签(Accessible Rich Internet Applications)来增强屏幕阅读器的使用体验。 6. 前端性能优化: 性能优化是任何Web项目成功的关键。本书可能会涵盖如何通过优化CSS和HTML来提升网站的加载速度和运行效率。内容可能包括代码压缩、合并、避免重绘和回流、使用Web字体的最佳实践等。 7. JavaScript与CSS/HTML的交互: 在现代Web开发中,JavaScript与CSS及HTML的交云并用是不可或缺的。书中可能会讲解如何通过JavaScript动态地修改样式、操作DOM元素以及使用事件监听和响应用户交互。 8. Web框架和预处理器: 这本书可能会提到流行的Web开发框架和预处理器,比如Bootstrap、Foundation、Sass和Less等,它们是如何简化和加速开发流程的。 9. 测试和维护: 书中也可能包含关于如何测试网页以及如何持续优化和维护CSS和HTML代码的章节。例如,使用断言测试、自动化测试、性能分析工具等。 最后,鉴于文件名称列表中的“压缩包子文件”的表述,这可能是对“压缩包”文件的一种误译或误用,此处“压缩包”应该指的是包含该书籍PDF文件的压缩文件格式,如ZIP或RAR。而“Pro CSS and HTML Design Patterns.pdf”指的就是该书籍的PDF格式电子版文件。 以上所述,构成了《Pro CSS and HTML Design Patterns》一书可能包含的核心知识点。通过学习这些内容,Web前端开发者可以掌握更为高效和优雅的设计模式,从而在日常工作中更高效地解决实际问题。
recommend-type

Zotero 7数据同步:Attanger插件安装&设置,打造文献管理利器

/i.s3.glbimg.com/v1/AUTH_08fbf48bc0524877943fe86e43087e7a/internal_photos/bs/2018/L/j/ipKA8TRPyt4ptaG1DGww/t-m-01.png) # 1. Zotero 7数据同步简介 在数字时代,管理研究数据变得日益重要,而Zot
recommend-type

卷积神经网络的基础理论200字

<think>我们正在回答用户关于卷积神经网络(CNN)基础理论的询问。根据提供的引用资料,我们可以概述CNN的基本结构和工作原理。引用[1]提到CNN是一种前馈神经网络,信息从输入单向流动到输出。引用[2]指出CNN在图像处理等领域应用广泛,通过层次结构和参数调整实现模式识别。引用[3]说明CNN包含卷积层,使用卷积核提取局部特征,减少参数量,提高效率。引用[4]则提到了训练过程,包括前向传播、误差计算和反向传播(梯度下降)。因此,我们将从以下几个方面概述CNN:1.CNN的基本结构(卷积层、池化层、全连接层)2.卷积操作的工作原理(局部感受野、权重共享)3.训练过程(前向传播、损失函数、反
recommend-type

轻便实用的Java库类查询工具介绍

标题 "java2库类查询" 和描述表明,所提及的工具是一个专门用于查询Java库类的应用程序。此软件旨在帮助开发者快速地查找和引用Java的标准开发工具包(SDK)中包含的所有应用程序编程接口(API)类。通过这样的工具,开发者可以节省大量在官方文档或搜索引擎上寻找类定义和使用方法的时间。它被描述为轻巧且方便,这表明其占用的系统资源相对较少,同时提供直观的用户界面,使得查询过程简洁高效。 从描述中可以得出几个关键知识点: 1. Java SDK:Java的软件开发工具包(SDK)是Java平台的一部分,提供了一套用于开发Java应用软件的软件包和库。这些软件包通常被称为API,为开发者提供了编程界面,使他们能够使用Java语言编写各种类型的应用程序。 2. 库类查询:这个功能对于开发者来说非常关键,因为它提供了一个快速查找特定库类及其相关方法、属性和使用示例的途径。良好的库类查询工具可以帮助开发者提高工作效率,减少因查找文档而中断编程思路的时间。 3. 轻巧性:软件的轻巧性通常意味着它对计算机资源的要求较低。这样的特性对于资源受限的系统尤为重要,比如老旧的计算机、嵌入式设备或是当开发者希望最小化其开发环境占用空间时。 4. 方便性:软件的方便性通常关联于其用户界面设计,一个直观、易用的界面可以让用户快速上手,并减少在使用过程中遇到的障碍。 5. 包含所有API:一个优秀的Java库类查询软件应当能够覆盖Java所有标准API,这包括Java.lang、Java.util、Java.io等核心包,以及Java SE平台的所有其他标准扩展包。 从标签 "java 库 查询 类" 可知,这个软件紧密关联于Java编程语言的核心功能——库类的管理和查询。这些标签可以关联到以下知识点: - Java:一种广泛用于企业级应用、移动应用(如Android应用)、网站后端、大型系统和许多其他平台的编程语言。 - 库:在Java中,库是一组预打包的类和接口,它们可以被应用程序重复使用。Java提供了庞大的标准库,以支持各种常见的任务和功能。 - 查询:查询指的是利用软件工具搜索、定位和检索信息的过程。对于Java库类查询工具来说,这意味着可以通过类名、方法签名或其他标识符来查找特定的API条目。 最后,压缩包文件列表包含了两个文件:“java.dit”和“Java.exe”。其中“Java.exe”很可能是程序的可执行文件,而“java.dit”可能是一个数据文件,用于存储Java类的索引或数据。由于文件名后缀通常与文件类型相关联,但“dit”并不是一个常见的文件扩展名。这可能是一个特定于软件的自定义格式,或是一个打字错误。 总结来说,"java2库类查询" 是一个针对Java开发者的实用工具,它提供了一个轻量级、易用的平台来查询和定位Java标准库中的所有类和API。此工具对优化开发流程,减少查找Java类文档的时间大有裨益,尤其适合需要频繁查阅Java API的开发者使用。
recommend-type

【Zotero 7终极指南】:新手必备!Attanger插件全攻略与数据同步神技

# 1. Zotero 7与Attanger插件的介绍 在当今的学术研究和知识管理领域,高效的文献管理工具至关重要。Zotero 7作为一个流行的参考文献管理软件,因其强大的功能和用户友好的界面而受到专业人士的青睐。而Attanger插件则为Zotero 7带来了更多定制化和高级功能,极大地增强