import pandas as pd import numpy as np def detect_fog_events(df, fog_threshold=1000, buffer_thresholds=(2000, 3000), max_gap_hours=12, min_duration='30T'): """ 最终修复版:准确统计每个大雾事件前的能见度下降时间点 """ df = df.copy().sort_index() df['is_fog'] = df['visibility'] < fog_threshold # 步骤1:检测原始事件并过滤短事件 fog_changes = df['is_fog'].ne(df['is_fog'].shift()).cumsum() fog_groups = df.groupby(fog_changes) raw_events = [] for _, group in fog_groups: if group['is_fog'].iloc[0]: event_start = group.index[0] event_end = group.index[-1] duration = event_end - event_start if duration >= pd.Timedelta(min_duration): raw_events.append({'start': event_start, 'end': event_end}) # 步骤2:合并相邻事件 merged_events = [] if raw_events: current_start, current_end = raw_events[0]['start'], raw_events[0]['end'] for event in raw_events[1:]: if (event['start'] - current_end) <= pd.Timedelta(hours=max_gap_hours): current_end = event['end'] else: merged_events.append({'start': current_start, 'end': current_end}) current_start, current_end = event['start'], event['end'] merged_events.append({'start': current_start, 'end': current_end}) # 步骤3:为每个合并后事件重新计算关键时间点 final_events = [] for event in merged_events: start = event['start'] end = event['end'] # 关键修复:从大雾开始时间点(start)向前查找最近的阈值突破点 pre_window = df.loc[:start].iloc[::-1] # 倒序数据:从start往前查 # 查找能见度首次低于3000的时间点 (最近一次突破) pre_3000 = pre_window[pre_window['visibility'] < 3000] pre_3000m = pre_3000.index[0] if not pre_3000.empty else pd.NaT # 查找能见度首次低于2000的时间点 pre_2000 = pre_window[pre_window['visibility'] < 2000] pre_2000m = pre_2000.index[0] if not pre_2000.empty else pd.NaT # 同理处理后恢复时间点 post_window = df.loc[end:] post_2000m = post_window[post_window['visibility'] > 2000].index[0] if not post_window.empty else pd.NaT post_3000m = post_window[post_window['visibility'] > 3000].index[0] if not post_window.empty else pd.NaT final_events.append({ 'pre_3000m': pre_3000m, 'pre_2000m': pre_2000m, 'start': start, 'end': end, 'post_2000m': post_2000m, 'post_3000m': post_3000m, 'duration': end - start }) # 最终过滤和耗时计算 result_df = pd.DataFrame(final_events) result_df = result_df[result_df['duration'] >= pd.Timedelta(min_duration)] result_df['下降3000m耗时'] = result_df['start'] - result_df['pre_3000m'] result_df['下降2000m耗时'] = result_df['start'] - result_df['pre_2000m'] result_df['恢复2000m耗时'] = result_df['post_2000m'] - result_df['end'] result_df['恢复3000m耗时'] = result_df['post_3000m'] - result_df['end'] return result_df[['pre_3000m', 'pre_2000m', 'start', 'end', 'post_2000m', 'post_3000m', 'duration', '下降3000m耗时', '下降2000m耗时', '恢复2000m耗时', '恢复3000m耗时']] # 测试案例 test_data = { 'time': pd.date_range('2023-01-01', periods=24, freq='H'), 'visibility': [ 5000, 4000, 3500, 3200, # 未达到阈值 2800, 1500, 800, 800, # 事件1前:3000m突破点在 3:00,2000m在4:00 1500, 2000, 2500, 3000, # 恢复 5000, 4500, 3200, 1800, # 事件2前:3000m在14:00,2000m在15:00 800, 800, 1500, 2000, # 恢复 3000, 5000, 5000 ] } df_test = pd.DataFrame(test_data).set_index('time') fog_events = detect_fog_events(df_test) print(fog_events) 在之前你提供的代码基础上增加实现返回大雾前3000-2000米、2000-1000米期间、大雾期间、恢复至2000米、3000米阶段期间的平均风速、风向、相对湿度、温度、露点、温度露点差、总云量、气压
时间: 2025-06-19 07:21:51 浏览: 20
<think>好的,我需要帮助用户在他们已有的Python函数中增加功能,以返回大雾事件各个阶段的平均气象参数,比如风速、风向、湿度等。首先,我需要理解现有的代码结构和功能,然后确定如何修改它来满足新的需求。
用户提供的函数`detect_fog_events`主要是检测大雾事件,并记录时间点,如开始、结束以及能见度下降到不同阈值的时间。现在需要在结果中加入各个阶段的平均气象数据。根据用户的要求,这些阶段包括:
1. 大雾前3000米到2000米期间
2. 大雾前2000米到1000米期间(即大雾开始前)
3. 大雾期间(能见度低于1000米)
4. 恢复至2000米期间
5. 恢复至3000米期间
每个阶段需要计算的平均参数包括风速、风向、相对湿度、温度、露点、温度露点差、总云量、气压等。
首先,我需要明确每个阶段的时间范围:
- **3000-2000米阶段**:从`pre_3000m`到`pre_2000m`
- **2000-1000米阶段**:从`pre_2000m`到`start`(即大雾事件开始)
- **大雾期间**:从`start`到`end`
- **恢复2000米阶段**:从`end`到`post_2000m`
- **恢复3000米阶段**:从`end`到`post_3000m`
接下来,我需要考虑如何在这些时间段内提取对应的数据,并计算平均值。需要注意的是,这些时间段可能不存在(比如,如果某个阈值未被突破,时间点可能为NaT),所以在处理时需要检查时间有效性,避免错误。
接下来,分析现有代码的结构。函数的大致流程是:
1. 标记能见度低于阈值的时间点(`is_fog`列)
2. 分组并生成原始事件,合并相邻事件
3. 对每个合并后的事件,计算前后阈值突破的时间点
4. 生成结果DataFrame,包含各个时间点和耗时
现在需要在步骤3中,为每个事件不仅记录时间点,还要提取对应时间段内的气象数据,并计算平均值。这需要在生成`final_events`时,为每个事件添加这些统计信息。
考虑到可能有多个气象参数需要处理,可以将参数列表定义为一个变量,方便后续循环处理。例如:
```python
meteorological_params = ['wind_speed', 'wind_direction', 'humidity', 'temperature', 'dew_point', 'cloud_cover', 'pressure']
```
然后,对于每个阶段的时间段,需要提取对应的数据切片,并计算各参数的平均值。例如,对于3000-2000米阶段:
```python
if pd.notnull(pre_3000m) and pd.notnull(pre_2000m):
mask = (df.index >= pre_3000m) & (df.index <= pre_2000m)
stage_data = df.loc[mask]
for param in meteorological_params:
event[f'{param}_3000_2000'] = stage_data[param].mean()
else:
# 处理无效时间段,可能填入NaN
for param in meteorological_params:
event[f'{param}_3000_2000'] = np.nan
```
同样的逻辑适用于其他阶段。但需要注意,有些阶段可能不存在,比如`pre_2000m`可能为NaT,这时候对应的阶段数据就无法计算,应设为NaN。
接下来,需要确认原始数据DataFrame `df`中是否包含这些气象参数。用户提供的测试数据中只有`visibility`,但实际应用中,`df`应该包含其他需要的列。因此,在函数中需要确保这些列存在,或者在代码中进行适当的异常处理。
此外,温度露点差(temperature minus dew point)可能需要计算,因为原始数据可能没有这个字段。所以需要在处理数据时添加该列:
```python
df['temp_dew_diff'] = df['temperature'] - df['dew_point']
```
然后将`temp_dew_diff`加入到需要统计的参数列表中。
现在,整合这些思路到代码中。修改后的函数步骤可能如下:
1. 复制并排序数据,创建`is_fog`列
2. 检测原始事件并合并
3. 对每个合并的事件,确定各阶段的时间段
4. 对于每个时间段,提取数据并计算各气象参数的平均值
5. 将结果添加到事件字典中
6. 最后生成结果DataFrame,包含新的统计列
需要注意的问题:
- 时间段的有效性:需要检查开始和结束时间是否为NaT,或者开始时间是否在结束时间之后(比如恢复阶段的时间可能不存在)。
- 数据切片时,确保时间索引正确,特别是当结束时间不在数据范围内时,需要使用`df.loc[start:end]`,但需要考虑左闭右闭还是左闭右开。Pandas的切片是末端包含的,如果索引是时间且精确匹配的话。
- 处理空数据的情况,比如某个阶段没有数据点,此时平均值应为NaN。
- 添加温度露点差的计算。
现在,具体修改代码:
在函数开始时,添加温度露点差:
```python
df = df.copy().sort_index()
df['temp_dew_diff'] = df['temperature'] - df['dew_point'] # 新增列
df['is_fog'] = df['visibility'] < fog_threshold
```
定义需要统计的气象参数列表:
```python
met_params = ['wind_speed', 'wind_direction', 'relative_humidity',
'temperature', 'dew_point', 'temp_dew_diff',
'total_cloud_cover', 'pressure']
```
然后,在处理每个事件时,为每个阶段计算平均值:
在步骤3的循环中,处理每个`event`时:
```python
# 阶段定义
stages = [
('3000_2000', pre_3000m, pre_2000m),
('2000_1000', pre_2000m, start),
('fog_duration', start, end),
('recovery_2000', end, post_2000m),
('recovery_3000', end, post_3000m),
]
for stage_name, stage_start, stage_end in stages:
# 检查时间有效性
if pd.isnull(stage_start) or pd.isnull(stage_end) or stage_start >= stage_end:
# 时间段无效,填入NaN
for param in met_params:
event[f'{param}_{stage_name}'] = np.nan
continue
# 提取数据
mask = (df.index >= stage_start) & (df.index <= stage_end)
stage_df = df.loc[mask]
# 计算各参数的平均值
for param in met_params:
if param in stage_df.columns:
event[f'{param}_{stage_name}'] = stage_df[param].mean()
else:
# 如果参数不存在,设为NaN
event[f'{param}_{stage_name}'] = np.nan
```
这将为每个阶段生成对应的参数平均值。需要注意列名是否存在,比如测试数据中可能没有这些列,所以需要处理KeyError,或者确保输入数据包含这些列。
在测试案例中,用户提供的`df_test`可能没有这些气象参数,因此需要用户确保输入数据包含所需列,或者在代码中添加默认处理,比如如果列不存在则忽略或设为NaN。
最后,生成的`result_df`将包含所有新增的列,需要在返回时包含它们。
修改后的函数返回的DataFrame将包括原有的时间点、耗时,以及各阶段的气象参数平均值。用户需要确保输入的数据包含这些气象参数,否则结果中会出现NaN。
测试时,用户需要提供包含这些气象参数的测试数据,否则无法验证新增功能是否正确。例如,在测试数据中添加`wind_speed`等字段。
综上,修改后的代码应遍历每个阶段,计算各参数的平均值,并将结果添加到事件字典中,最后整合到结果DataFrame中。同时需要处理可能的时间无效或数据缺失的情况。</think>我将通过分步骤说明如何扩展代码,并在最后提供完整代码。以下是关键实现步骤:
**步骤1:添加气象参数处理逻辑**
```python
# 在函数开头添加温度露点差计算
df = df.copy().sort_index()
df['temp_dew_diff'] = df['temperature'] - df['dew_point'] # 新增计算列
# 定义需要分析的指标列表
met_columns = ['wind_speed', 'wind_direction', 'relative_humidity',
'temperature', 'dew_point', 'temp_dew_diff',
'total_cloud_cover', 'pressure']
```
**步骤2:定义时间阶段**
```python
# 在步骤3的事件处理循环中添加:
for event in merged_events:
# ...原有时间点计算逻辑...
# 定义需要分析的五个阶段
stages = [
('3000_2000', pre_3000m, pre_2000m),
('2000_1000', pre_2000m, start),
('fog_period', start, end),
('recovery_2000', end, post_2000m),
('recovery_3000', end, post_3000m)
]
# 为每个阶段计算气象指标
for stage_name, start_time, end_time in stages:
# 检查时间有效性
if pd.isnull(start_time) or pd.isnull(end_time) or start_time >= end_time:
for col in met_columns:
event[f'{col}_{stage_name}'] = np.nan
continue
# 提取时间段数据
stage_data = df.loc[start_time:end_time]
# 计算各指标平均值
for col in met_columns:
event[f'{col}_{stage_name}'] = stage_data[col].mean()
```
**步骤3:调整结果输出**
```python
# 在最终结果中增加新列
return result_df[['pre_3000m', 'pre_2000m', 'start', 'end',
'post_2000m', 'post_3000m', 'duration',
'下降3000m耗时', '下降2000m耗时',
'恢复2000m耗时', '恢复3000m耗时',
*[f'{col}_3000_2000' for col in met_columns],
*[f'{col}_2000_1000' for col in met_columns],
*[f'{col}_fog_period' for col in met_columns],
*[f'{col}_recovery_2000' for col in met_columns],
*[f'{col}_recovery_3000' for col in met_columns]]]
```
**完整代码实现**:
```python
import pandas as pd
import numpy as np
def detect_fog_events(df, fog_threshold=1000, buffer_thresholds=(2000, 3000),
max_gap_hours=12, min_duration='30T'):
"""
增强版:统计各阶段气象参数
"""
# 预处理
df = df.copy().sort_index()
df['temp_dew_diff'] = df['temperature'] - df['dew_point']
df['is_fog'] = df['visibility'] < fog_threshold
met_columns = ['wind_speed', 'wind_direction', 'relative_humidity',
'temperature', 'dew_point', 'temp_dew_diff',
'total_cloud_cover', 'pressure']
# 步骤1:检测原始事件
fog_changes = df['is_fog'].ne(df['is_fog'].shift()).cumsum()
fog_groups = df.groupby(fog_changes)
raw_events = []
for _, group in fog_groups:
if group['is_fog'].iloc[0]:
event = {
'start': group.index[0],
'end': group.index[-1]
}
raw_events.append(event)
# 步骤2:合并事件
merged_events = []
if raw_events:
current = raw_events[0]
for event in raw_events[1:]:
if (event['start'] - current['end']) <= pd.Timedelta(hours=max_gap_hours):
current['end'] = event['end']
else:
merged_events.append(current)
current = event
merged_events.append(current)
# 步骤3:计算关键时间点和气象数据
final_events = []
for event in merged_events:
start = event['start']
end = event['end']
# 查找阈值时间点
pre_window = df.loc[:start].iloc[::-1]
pre_3000m = pre_window[pre_window['visibility'] < 3000].index[0] if not pre_window.empty else pd.NaT
pre_2000m = pre_window[pre_window['visibility'] < 2000].index[0] if not pre_window.empty else pd.NaT
post_window = df.loc[end:]
post_2000m = post_window[post_window['visibility'] > 2000].index[0] if not post_window.empty else pd.NaT
post_3000m = post_window[post_window['visibility'] > 3000].index[0] if not post_window.empty else pd.NaT
# 定义分析阶段
stages = [
('3000_2000', pre_3000m, pre_2000m),
('2000_1000', pre_2000m, start),
('fog_period', start, end),
('recovery_2000', end, post_2000m),
('recovery_3000', end, post_3000m)
]
# 初始化事件数据
event_data = {
'pre_3000m': pre_3000m,
'pre_2000m': pre_2000m,
'start': start,
'end': end,
'post_2000m': post_2000m,
'post_3000m': post_3000m,
'duration': end - start
}
# 计算各阶段气象数据
for stage_name, stage_start, stage_end in stages:
if pd.isnull(stage_start) or pd.isnull(stage_end) or stage_start >= stage_end:
for col in met_columns:
event_data[f'{col}_{stage_name}'] = np.nan
continue
stage_data = df.loc[stage_start:stage_end]
for col in met_columns:
event_data[f'{col}_{stage_name}'] = stage_data[col].mean()
final_events.append(event_data)
# 生成结果
result_df = pd.DataFrame(final_events)
result_df = result_df[result_df['duration'] >= pd.Timedelta(min_duration)]
# 计算耗时
time_diff_cols = {
'下降3000m耗时': 'start - pre_3000m',
'下降2000m耗时': 'start - pre_2000m',
'恢复2000m耗时': 'post_2000m - end',
'恢复3000m耗时': 'post_3000m - end'
}
for col, expr in time_diff_cols.items():
result_df[col] = result_df.eval(expr)
return result_df
```
**使用示例**:
```python
# 扩展测试数据(需包含气象参数)
test_data = {
'time': pd.date_range('2023-01-01', periods=24, freq='H'),
'visibility': [5000,4000,3500,3200,2800,1500,800,800,1500,2000,2500,3000,
5000,4500,3200,1800,800,800,1500,2000,3000,5000,5000],
'wind_speed': np.random.uniform(0,5,24),
'relative_humidity': np.random.uniform(50,100,24),
'temperature': np.random.uniform(5,15,24),
'dew_point': np.random.uniform(3,12,24),
'total_cloud_cover': np.random.randint(0,10,24),
'pressure': np.random.uniform(1010,1030,24)
}
df_test = pd.DataFrame(test_data).set_index('time')
fog_events = detect_fog_events(df_test)
print(fog_events.filter(regex='wind_speed|humidity')) # 查看部分气象参数
```
**输出说明**:
最终结果将包含以下列:
- 原有时间点和耗时信息
- 各气象参数在各阶段的平均值,例如:
- `wind_speed_3000_2000`:3000→2000米阶段的平均风速
- `relative_humidity_fog_period`:大雾期间的相对湿度
- `temperature_recovery_3000`:恢复到3000米阶段的温度
**注意事项**:
1. 输入数据必须包含所有需要的气象参数列
2. 若某个阶段不存在(如未突破3000米阈值),对应参数将显示为NaN
3. 各阶段时间区间为闭区间[start, end]
阅读全文
相关推荐














