def write_csv_with_name(filepath, line: list, columns: list): if os.path.exists(filepath): with open(filepath, 'a', newline='', encoding='utf-8-sig') as f: csv_file = csv.writer(f, delimiter=',') csv_file.writerow(line) else: with open(filepath, 'a', newline='', encoding='utf-8-sig') as f: csv_file = csv.writer(f, delimiter=',') csv_file.writerow(columns) csv_file.writerow(line)
时间: 2023-06-20 09:06:43 浏览: 206
这是一个写入CSV文件的函数,它的参数包括文件路径(filepath)、要写入的内容(line)和CSV文件的列名(columns)。该函数首先判断文件路径是否存在,如果存在,则以追加模式打开文件,并使用csv.writer将内容写入CSV文件中;如果文件路径不存在,则以创建模式打开文件,并先写入列名,然后再写入内容。其中,newline=''参数表示使用默认的换行符,encoding='utf-8-sig'参数表示使用UTF-8编码格式,保证在Windows系统下打开CSV文件时不会出现中文乱码的情况。
相关问题
def list_files_in_folder(folder_path): # 检查路径是否有效 if not os.path.exists(folder_path) or not os.path.isdir(folder_path): print(f"{folder_path}不是一个有效的文件夹") return for filename in os.listdir(folder_path): # 避免列出隐藏文件和目录 if not filename.startswith('.') and os.path.isfile(os.path.join(folder_path, filename)): print(filename) df = pd.read_csv(folder_path+r'\\'+filename, header=None) list1 = [] for data in df[0]: list1.append(re.findall(r'-?\d+', data)) df = pd.DataFrame(list1, columns=[i for i in range(17)]) df = df[df[8] == '0'] df = df[[0, 1, 2, 3, 4, 5, 6, 7, 8]] df[8] = np.where(df[8] == '32766', '0', df[8]) df[[1, 2, 8]] = df[[1, 2, 8]].astype() df[1] = df[1] / 100 df[2] = df[2] / 100 df[8] = df[8] / 10 os.remove(folder_path+r'\\'+filename) df.to_excel(folder_path+r'\\'+filename[:-4]+'.xlsx') for i in range(10, 21): filepath = r'D:\2024_V3.0\RUN\2010-2020WIN\day\20{}'.format(i) list_files_in_folder(filepath)
<think>好的,我现在需要帮用户分析他们提供的Python代码中的问题,并给出解决方案。首先,我会仔细阅读用户提供的代码,看看有什么地方可能出错。
用户定义了一个函数list_files_in_folder,用来处理指定文件夹中的CSV文件,转换成Excel文件。主循环遍历i从10到20,生成文件夹路径并调用这个函数。看起来代码的大致逻辑是对的,但可能存在一些细节问题。
首先,检查路径处理部分。用户用了folder_path+r'\\'+filename,这可能有问题。因为在Windows中,路径分隔符是反斜杠,但Python中字符串拼接使用r可能会导致问题。比如,如果folder_path已经是原始字符串,再拼接r'\\'可能多余,应该使用os.path.join更安全。参考引用[3]中提到os.path.join可以正确组合路径,避免手动拼接错误。所以这里应该用os.path.join(folder_path, filename)来替代,这样更可靠,也能处理不同操作系统的路径问题。
接下来,数据处理的步骤。用户用pd.read_csv读取CSV文件,header=None,然后处理df[0]中的每一行数据,使用re.findall提取数字。这里可能存在列索引错误的问题,因为如果CSV文件没有列名,默认的列索引是0,但可能需要确认是否所有数据都在第一列。此外,正则表达式r'-?\d+'是否正确提取了所有需要的数字,可能需要检查数据格式是否匹配。
然后,创建DataFrame时列名是0到16(range(17)),但后续筛选df[df[8] == '0'],这里列8是否存在?需要确认CSV文件的结构是否确实有足够的列,否则会引发KeyError。另外,用户将df[[1, 2, 8]]转换为astype(),但astype()需要指定类型,比如astype(int)或astype(float),否则会报错。这里代码缺少类型参数,会导致语法错误,必须修正。
在数值处理部分,用户将df[8]中的'32766'替换为'0',然后进行除法运算。这里需要注意数据类型是否转换正确,比如替换后的列是否是字符串还是数值类型。如果未正确转换类型,后续的除法操作可能会出错。
还有,删除原文件os.remove可能在写入Excel之前执行,这会导致如果Excel写入失败,原文件已经被删除,数据丢失。应该先确保Excel文件成功生成后再删除原文件,或者考虑保留原文件,避免数据风险。
另外,循环部分for i in range(10, 21),生成路径中的20{},当i是10的时候,会是2010,而i=20的话,循环到20的时候,实际i的范围是10到20,但range(10,21)是包括20的,所以生成的是2010到2020,这可能是正确的,但需要确认文件夹是否存在。
还有,在转换为Excel时,用户用了filename[:-4]去掉.csv的后缀,假设原文件是.csv,但如果文件名不是4个字符,比如.txt或其他,这里会出错。应该用os.path.splitext来获取不带扩展名的文件名更安全。
最后,检查是否有必要的库导入,比如os、pandas、re、numpy是否都已导入,代码中可能没有显示这些导入语句,用户在实际运行时可能会遇到模块未找到的错误。
总结问题点:
1. 路径拼接使用字符串拼接,应该改用os.path.join。
2. astype()缺少类型参数,导致语法错误。
3. 删除原文件时机不当,可能导致数据丢失。
4. 文件扩展名处理不够健壮。
5. 可能的列索引错误或数据类型问题。
6. 缺少必要的库导入。
接下来,我需要将这些分析整理成步骤性的解答,并指出如何修正这些问题,同时引用提供的参考资料来支持建议,比如使用os.path.exists和os.path.isdir检查路径有效性,以及os.path.join的正确用法。同时生成相关问题,帮助用户深入理解。</think>### 代码问题分析与解决方案
该代码主要用于批量处理文件夹中的CSV文件,进行数据清洗后转存为Excel文件。存在以下几个关键问题需要修正:
#### 1. 路径拼接错误(高危)
问题代码:
```python
df = pd.read_csv(folder_path+r'\\'+filename, header=None)
os.remove(folder_path+r'\\'+filename)
```
应改为:
```python
file_path = os.path.join(folder_path, filename)
df = pd.read_csv(file_path, header=None)
os.remove(file_path)
```
使用`os.path.join()`能自动处理不同操作系统的路径分隔符,避免手动拼接错误[^3]。
#### 2. 数据类型转换错误(语法错误)
问题代码:
```python
df[[1, 2, 8]] = df[[1, 2, 8]].astype()
```
应指定转换类型:
```python
df[[1, 2, 8]] = df[[1, 2, 8]].astype(float) # 或根据需求选择int
```
#### 3. 文件删除风险(逻辑错误)
建议先保存新文件再删除旧文件:
```python
output_path = os.path.join(folder_path, f"{os.path.splitext(filename)[0]}.xlsx")
df.to_excel(output_path)
if os.path.exists(output_path): # 确认新文件生成成功
os.remove(file_path)
```
#### 4. 扩展名处理优化
建议使用标准方法处理文件名:
```python
base_name = os.path.splitext(filename)[0]
output_path = os.path.join(folder_path, f"{base_name}.xlsx")
```
### 完整修正代码
```python
import os
import pandas as pd
import re
import numpy as np
def list_files_in_folder(folder_path):
if not os.path.exists(folder_path) or not os.path.isdir(folder_path):
print(f"{folder_path} 不是有效文件夹")
return
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
if not filename.startswith('.') and os.path.isfile(file_path):
try:
# 读取CSV
df = pd.read_csv(file_path, header=None)
# 数据提取
list1 = [re.findall(r'-?\d+', data) for data in df[0]]
df = pd.DataFrame(list1, columns=range(17))
# 数据筛选
df = df[df[8] == '0'].iloc[:, :9]
# 数据转换
df[8] = np.where(df[8] == '32766', '0', df[8])
df[[1, 2, 8]] = df[[1, 2, 8]].astype(float)
# 数值处理
df[1] /= 100
df[2] /= 100
df[8] /= 10
# 保存新文件
output_path = os.path.join(folder_path, f"{os.path.splitext(filename)[0]}.xlsx")
df.to_excel(output_path, index=False)
# 确认保存成功后删除原文件
if os.path.exists(output_path):
os.remove(file_path)
except Exception as e:
print(f"处理文件 {filename} 出错: {str(e)}")
# 执行处理
for i in range(10, 21):
filepath = r'D:\2024_V3.0\RUN\2010-2020WIN\day\20{}'.format(i)
list_files_in_folder(filepath)
```
### 改进说明
1. **路径处理**:使用`os.path.join()`替代字符串拼接,确保跨平台兼容性[^3]
2. **错误处理**:增加try-except块捕获处理异常
3. **文件操作**:采用「先写后删」的安全策略
4. **类型安全**:明确指定`astype(float)`数据类型
5. **内存优化**:使用`iloc`替代列名索引,提升处理效率
# -*- coding: utf-8 -*- """整合气象数据获取与处理系统 功能: 1. 在5分钟整点(0分、5分、10分等)获取实况气象数据 2. 将5分钟数据按月保存到Excel(存储在"5分钟级数据"文件夹) 3. 将5分钟数据插值为分钟级数据 4. 计算每分钟的太阳位置(高度角、方位角)和地外辐照度(保留两位小数) 5. 将分钟级数据按月保存到独立Excel(存储在"分钟级数据"文件夹) """ import urllib, urllib3, sys, uuid import ssl import json from openpyxl import Workbook, load_workbook from datetime import datetime, timedelta import warnings from urllib3.exceptions import InsecureRequestWarning import time import os import pandas as pd import numpy as np from scipy.interpolate import CubicSpline import math import openpyxl from openpyxl.styles import PatternFill, Font, Alignment import traceback # 强制刷新标准输出 - 确保实时输出 sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ if hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(line_buffering=True) # 确保实时输出 else: # 对于不支持reconfigure的环境 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 1) # 禁用SSL警告 warnings.filterwarnings("ignore", category=InsecureRequestWarning) # 创建绕过SSL验证的连接池 http = urllib3.PoolManager(cert_reqs='CERT_NONE') # API配置 host = 'http://aliv18.data.moji.com' condition_path = '/whapi/json/alicityweather/condition' method = 'POST' appcode = '2c642e12da774c918fa78ac56b5b4c50' # 保定地理位置 LOCATION_NAME = "保定" LONGITUDE = 115.480 # 东经 LATITUDE = 38.855 # 北纬 # 定义API请求头 API_HEADERS = { 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', 'Authorization': 'APPCODE ' + appcode } # 实况数据API参数 CONDITION_TOKEN = '50b53ff8dd7d9fa320d3d3ca32cf8ed1' CITY_ID = '1819' # 保定城市ID # 创建数据存储目录 FIVEMIN_DIR = "5分钟级数据" MINUTELY_DIR = "分钟级数据" os.makedirs(FIVEMIN_DIR, exist_ok=True) os.makedirs(MINUTELY_DIR, exist_ok=True) def log_message(message): """记录日志到屏幕 - 确保实时输出""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') log_line = f"[{timestamp}] {message}" # 直接输出到控制台,强制刷新 print(log_line, flush=True) # 同时写入日志文件以便调试 with open("weather_system.log", "a", encoding="utf-8") as log_file: log_file.write(log_line + "\n") log_file.flush() # 立即刷新文件写入 def safe_save(wb, filename): """安全保存Excel文件,避免占用问题""" log_message(f"正在保存文件: {filename}") # 确保目标目录存在 os.makedirs(os.path.dirname(filename), exist_ok=True) for attempt in range(5): # 增加尝试次数 try: # 使用绝对路径避免相对路径问题 abs_filename = os.path.abspath(filename) temp_file = abs_filename.replace(".xlsx", f"_temp_{attempt}.xlsx") log_message(f"尝试保存临时文件: {temp_file}") wb.save(temp_file) # 替换原文件 if os.path.exists(abs_filename): os.remove(abs_filename) os.rename(temp_file, abs_filename) log_message(f"文件保存成功: {abs_filename}") return True except (PermissionError, OSError) as e: log_message(f"文件保存失败 ({attempt+1}/5): {str(e)}") time.sleep(3) except Exception as e: log_message(f"文件保存时发生意外错误: {str(e)}") traceback.print_exc() time.sleep(3) return False def get_condition_data(): """获取实况天气数据""" try: log_message(f"正在获取{LOCATION_NAME}实况天气数据...") condition_bodys = {'token': CONDITION_TOKEN, 'cityId': CITY_ID} post_data_condition = urllib.parse.urlencode(condition_bodys).encode('utf-8') condition_url = host + condition_path response_condition = http.request('POST', condition_url, body=post_data_condition, headers=API_HEADERS) content_condition = response_condition.data.decode('utf-8') condition_data = json.loads(content_condition) log_message(f"API响应内容: {json.dumps(condition_data, ensure_ascii=False)[:200]}...") if condition_data.get('code') != 0: log_message(f"实况API请求失败: 代码 {condition_data.get('code')}, 消息: {condition_data.get('msg')}") return None condition_info = condition_data['data']['condition'] current_time = datetime.now() # 对齐到最近的5分钟 aligned_minute = (current_time.minute // 5) * 5 aligned_time = current_time.replace(minute=aligned_minute, second=0, microsecond=0) time_str = aligned_time.strftime("%Y-%m-%d %H:%M") # 计算风速(m/s)并保留一位小数 raw_wind_speed = condition_info.get('windSpeed', '') wind_speed_mps = '' if raw_wind_speed and raw_wind_speed.replace('.', '', 1).isdigit(): try: wind_speed_mps = round(float(raw_wind_speed) / 3.6, 1) # 保留一位小数 except: wind_speed_mps = '' # 处理温度值,确保保留一位小数 temp_value = condition_info.get('temp', '') if temp_value and temp_value.replace('.', '', 1).isdigit(): try: temp_value = round(float(temp_value), 1) # 温度保留一位小数 except: pass # 构建实况数据行 condition_row = [ time_str, temp_value, # 温度保留一位小数 condition_info.get('condition', ''), condition_info.get('humidity', ''), condition_info.get('windDir', ''), condition_info.get('windLevel', ''), raw_wind_speed, wind_speed_mps, # 风速保留一位小数 condition_info.get('precip', '0.0'), condition_info.get('pressure', '') ] log_message(f"成功获取实况数据: {time_str}") log_message(f" 温度: {temp_value}℃, 天气状况: {condition_info.get('condition', '')}, 湿度: {condition_info.get('humidity', '')}%") return condition_row except Exception as e: log_message(f"获取实况数据时出错: {str(e)}") traceback.print_exc() return None def normalize_time(time_str): """统一时间格式为 YYYY-MM-DD HH:MM""" try: # 如果已经是字符串,尝试解析 if isinstance(time_str, str): # 尝试解析为日期时间 if len(time_str) == 16: # 格式为 "YYYY-MM-DD HH:MM" return time_str elif len(time_str) == 19: # 包含秒数 "YYYY-MM-DD HH:MM:00" return time_str[:16] elif ' ' not in time_str: # 只有日期部分 return time_str + " 00:00" else: # 其他格式,尝试转换 dt = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S") return dt.strftime("%Y-%m-%d %H:%M") # 如果是datetime对象 elif isinstance(time_str, datetime): return time_str.strftime("%Y-%m-%d %H:%M") except Exception: pass # 无法识别的格式,原样返回 return str(time_str) def update_fivemin_excel(condition_row): """更新5分钟级Excel文件""" try: current_month = datetime.now().strftime("%Y%m") # 确保目录存在 os.makedirs(FIVEMIN_DIR, exist_ok=True) filename = os.path.join(os.path.abspath(FIVEMIN_DIR), f"{LOCATION_NAME}_5分钟级数据_{current_month}.xlsx") wb = None ws = None existing_data = {} # 存储时间点 -> 行号的映射 headers = [] log_message(f"正在更新5分钟数据文件: {filename}") if os.path.exists(filename): for attempt in range(3): try: log_message(f"尝试加载现有5分钟数据文件... (尝试 {attempt+1}/3)") wb = load_workbook(filename) ws = wb.active if ws.max_row > 0: headers = [cell.value for cell in ws[1]] # 读取现有数据到字典 (时间 -> 行号) for row_idx, row in enumerate(ws.iter_rows(min_row=2, values_only=True), start=2): if not row or not row[0]: continue time_key = normalize_time(row[0]) if time_key in existing_data: log_message(f"警告: 发现重复时间点: {time_key} (行 {row_idx})") existing_data[time_key] = row_idx log_message(f"成功加载现有5分钟数据文件: {filename} (共 {len(existing_data)} 条记录)") break except PermissionError: if attempt < 2: log_message(f"文件被占用,等待5秒后重试 ({attempt+1}/3)") time.sleep(5) else: log_message("无法打开文件,创建新工作簿") wb = Workbook() ws = wb.active ws.title = f"{LOCATION_NAME}5分钟级数据" headers = ["时间", "温度(℃)", "天气状况", "湿度(%)", "风向", "风力等级", "原始风速(km/h)", "风速(m/s)", "降水量(mm)", "气压(hPa)"] ws.append(headers) else: wb = Workbook() ws = wb.active ws.title = f"{LOCATION_NAME}5分钟极数据" headers = ["时间", "温度(℃)", "天气状况", "湿度(%)", "风向", "风力等级", "原始风速(km/h)", "风速(m/s)", "降水量(mm)", "气压(hPa)"] ws.append(headers) log_message(f"创建新的5分钟数据文件: {filename}") if not headers: headers = ["时间", "温度(℃)", "天气状况", "湿度(%)", "风向", "风力等级", "原始风速(km/h)", "风速(m/s)", "降水量(mm)", "气压(hPa)"] # 处理实况数据 if condition_row: current_time = condition_row[0] normalized_current = normalize_time(current_time) if normalized_current in existing_data: row_idx = existing_data[normalized_current] # 更新所有字段(覆盖旧数据) for col_idx in range(1, len(condition_row) + 1): new_value = condition_row[col_idx - 1] ws.cell(row=row_idx, column=col_idx, value=new_value) log_message(f"更新5分钟时间点: {normalized_current} (行 {row_idx})") else: ws.append(condition_row) # 更新字典,记录新行号 existing_data[normalized_current] = ws.max_row log_message(f"新增5分钟时间点: {normalized_current} (行 {ws.max_row})") # 保存文件 if safe_save(wb, filename): log_message(f"5分钟数据保存成功: {filename}") return filename else: log_message(f"5分钟数据保存失败") return None except Exception as e: log_message(f"更新5分钟数据文件时出错: {str(e)}") traceback.print_exc() return None def calculate_solar_parameters(dt, longitude, latitude): """计算太阳高度角、方位角和地外辐照度(高度角和方位角保留两位小数)""" try: # 将时间转换为UTC+0 utc_time = dt - timedelta(hours=8) n = utc_time.timetuple().tm_yday # 年中的第几天 # 计算太阳赤纬 (δ) delta = 23.45 * math.sin(math.radians(360 * (284 + n) / 365)) delta_rad = math.radians(delta) # 计算时角 (ω) utc_hour = utc_time.hour + utc_time.minute/60.0 omega = (utc_hour - 12) * 15 + longitude omega_rad = math.radians(omega) # 纬度转弧度 phi_rad = math.radians(latitude) # 计算太阳高度角 (α) sin_alpha = math.sin(phi_rad) * math.sin(delta_rad) + math.cos(phi_rad) * math.cos(delta_rad) * math.cos(omega_rad) alpha = math.degrees(math.asin(sin_alpha)) # 计算太阳方位角 (γ) sin_gamma = -math.cos(delta_rad) * math.sin(omega_rad) / math.cos(math.radians(alpha)) cos_gamma = (math.sin(delta_rad) * math.cos(phi_rad) - math.cos(delta_rad) * math.sin(phi_rad) * math.cos(omega_rad)) / math.cos(math.radians(alpha)) gamma = math.degrees(math.atan2(sin_gamma, cos_gamma)) gamma = (gamma + 360) % 360 # 转换为0-360度范围 # 计算地外辐照度 (W/m²) g0 = 1367 * (1 + 0.033 * math.cos(math.radians(360 * n / 365))) * sin_alpha g0 = max(0, g0) # 确保非负 # 保留两位小数 return round(alpha, 2), round(gamma, 2), round(g0, 1) except Exception as e: log_message(f"计算太阳参数错误: {str(e)}") return None, None, None def process_minutely_data(fivemin_file): """处理分钟级数据并添加太阳参数""" try: log_message("="*50) log_message(f"开始生成分钟级数据...") # 检查文件是否存在 if not os.path.exists(fivemin_file): log_message(f"错误: 5分钟数据文件不存在: {fivemin_file}") return None # 读取5分钟数据 log_message(f"读取5分钟数据文件: {fivemin_file}") try: fivemin_df = pd.read_excel(fivemin_file) log_message(f"读取5分钟数据成功,共 {len(fivemin_df)} 行") except Exception as e: log_message(f"读取5分钟数据文件失败: {str(e)}") return None # 查找时间列 time_col = next((col for col in fivemin_df.columns if '时间' in col), None) if not time_col: log_message("未找到时间列") return None # 转换时间格式并处理重复值 fivemin_df[time_col] = pd.to_datetime(fivemin_df[time_col], errors='coerce') # 删除无效行 original_count = len(fivemin_df) fivemin_df = fivemin_df.dropna(subset=[time_col]) if original_count > len(fivemin_df): log_message(f"删除了 {original_count - len(fivemin_df)} 行无效时间数据") # 处理重复时间点 - 保留最后一个 duplicate_mask = fivemin_df.duplicated(subset=[time_col], keep='last') if duplicate_mask.any(): log_message(f"发现 {duplicate_mask.sum()} 个重复时间点,保留最后一个") fivemin_df = fivemin_df[~duplicate_mask] # 按时间排序 fivemin_df = fivemin_df.sort_values(by=time_col) fivemin_df.set_index(time_col, inplace=True) # 创建分钟级时间索引 start_time = fivemin_df.index.min() end_time = fivemin_df.index.max() log_message(f"时间范围: {start_time} 至 {end_time}") minutely_index = pd.date_range(start=start_time, end=end_time, freq='1min') minutely_df = pd.DataFrame(index=minutely_index) log_message(f"将生成 {len(minutely_index)} 条分钟级数据") # 插值处理 - 确保时间戳严格递增 base_time = start_time fivemin_timestamps = (fivemin_df.index - base_time).total_seconds() # 检查时间戳是否严格递增 if not (np.diff(fivemin_timestamps) > 0).all(): log_message("时间戳不是严格递增,尝试修复...") # 重新排序 fivemin_df = fivemin_df.sort_index() fivemin_timestamps = (fivemin_df.index - base_time).total_seconds() minutely_timestamps = (minutely_df.index - base_time).total_seconds() # 对连续参数进行插值 log_message("\n开始数据插值处理...") for param in ['风速(m/s)', '温度(℃)', '湿度(%)', '气压(hPa)']: if param in fivemin_df.columns: param_data = fivemin_df[param].dropna() if len(param_data) >= 3: log_message(f"对 {param} 使用三次样条插值 (数据点: {len(param_data)})") cs = CubicSpline(fivemin_timestamps, param_data.values) interpolated_values = cs(minutely_timestamps) # 对温度和风速保留一位小数 if param in ['温度(℃)', '风速(m/s)']: interpolated_values = np.round(interpolated_values, 1) minutely_df[param] = interpolated_values else: log_message(f"对 {param} 使用前向填充 (有效点不足: {len(param_data)})") minutely_df[param] = fivemin_df[param].resample('1min').ffill() # 处理降水量 if '降水量(mm)' in fivemin_df.columns: log_message("处理降水量数据") # 将5分钟降水量转换为每分钟降水量 precip_min = fivemin_df['降水量(mm)'].resample('1min').asfreq().ffill() minutely_df['降水量(mm)'] = precip_min / 5.0 # 平均分配到每分钟 # 处理文本数据 for param in ['天气状况', '风向']: if param in fivemin_df.columns: log_message(f"处理 {param} 数据") minutely_df[param] = fivemin_df[param].resample('1min').ffill() # 数值列保留适当的小数位数 # 温度和风速保留一位小数 if '温度(℃)' in minutely_df.columns: minutely_df['温度(℃)'] = minutely_df['温度(℃)'].round(1) if '风速(m/s)' in minutely_df.columns: minutely_df['风速(m/s)'] = minutely_df['风速(m/s)'].round(1) # 其他数值列保留一位小数 other_numeric_cols = ['湿度(%)', '气压(hPa)', '降水量(mm)'] for col in other_numeric_cols: if col in minutely_df.columns: minutely_df[col] = minutely_df[col].round(1) # 计算太阳参数 log_message(f"\n开始计算太阳位置和辐照度...") solar_data = [] total_points = len(minutely_df) last_progress = -1 start_time = time.time() for i, dt in enumerate(minutely_df.index): alpha, gamma, g0 = calculate_solar_parameters(dt, LONGITUDE, LATITUDE) solar_data.append({ '太阳高度角(度)': alpha, '太阳方位角(度)': gamma, '地外辐照度(W/m2)': g0 }) # 显示进度 progress = int((i + 1) / total_points * 100) if progress != last_progress and progress % 10 == 0: elapsed = time.time() - start_time remaining = (total_points - i) * (elapsed / (i+1)) log_message(f"计算进度: {progress}% | 预计剩余时间: {remaining:.1f}秒") last_progress = progress solar_df = pd.DataFrame(solar_data, index=minutely_df.index) minutely_df = pd.concat([minutely_df, solar_df], axis=1) # 生成输出文件名 current_month = datetime.now().strftime("%Y%m") filename = f"{LOCATION_NAME}_分钟级数据_{current_month}.xlsx" filepath = os.path.join(os.path.abspath(MINUTELY_DIR), filename) # 保存分钟级数据 minutely_df.reset_index(inplace=True) minutely_df.rename(columns={'index': '时间'}, inplace=True) # 创建Excel工作簿 wb = openpyxl.Workbook() ws = wb.active ws.title = "分钟级数据" # 写入数据 log_message(f"\n正在写入分钟级数据到Excel...") for col_idx, col_name in enumerate(minutely_df.columns, 1): ws.cell(row=1, column=col_idx, value=col_name) for r_idx, row in enumerate(minutely_df.values, 2): for c_idx, value in enumerate(row, 1): ws.cell(row=r_idx, column=c_idx, value=value) # 应用样式 header_fill = PatternFill(start_color="1F4E78", end_color="1F4E78", fill_type="solid") header_font = Font(color="FFFFFF", bold=True) header_alignment = Alignment(horizontal="center", vertical="center") for cell in ws[1]: cell.fill = header_fill cell.font = header_font cell.alignment = header_alignment # 设置列宽 log_message("设置列宽...") for col in ws.columns: max_length = 0 for cell in col: try: if cell.value and len(str(cell.value)) > max_length: max_length = len(str(cell.value)) except: pass adjusted_width = (max_length + 2) * 1.2 ws.column_dimensions[col[0].column_letter].width = min(adjusted_width, 50) # 保存文件 if safe_save(wb, filepath): log_message(f"分钟级数据保存成功: {filepath}") return filepath else: log_message(f"分钟级数据保存失败") return None except Exception as e: log_message(f"处理分钟级数据时出错: {str(e)}") traceback.print_exc() return None def get_weather_data(): """获取天气数据并处理""" log_message(f"\n获取{LOCATION_NAME}实况天气数据...") try: # 获取实况数据 condition_row = get_condition_data() if not condition_row: log_message("获取实况数据失败") return False # 更新5分钟数据文件 fivemin_file = update_fivemin_excel(condition_row) if not fivemin_file: log_message("5分钟数据更新失败") return False # 处理分钟级数据(只在5分钟整点处理) current_minute = datetime.now().minute if current_minute % 5 == 0: # 0分、5分、10分等 log_message(f"当前时间 {datetime.now().strftime('%H:%M')} 是5分钟整点,开始生成分钟级数据") minutely_file = process_minutely_data(fivemin_file) if minutely_file: log_message(f"\n数据处理完成!") log_message(f" 5分钟数据 -> {fivemin_file}") log_message(f" 分钟数据 -> {minutely_file}") else: log_message(f"分钟数据处理失败") else: log_message(f"跳过分钟级数据处理(非5分钟整点)") return True except Exception as e: log_message(f"获取天气数据时出错: {str(e)}") traceback.print_exc() return False # 主程序循环 - 确保实时输出日志 if __name__ == '__main__': try: # 程序启动横幅 - 使用print确保直接输出 print("=" * 70, flush=True) print(f"{LOCATION_NAME}气象数据自动获取与处理系统", flush=True) print(f"系统启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", flush=True) print(f"Python版本: {sys.version}", flush=True) print(f"工作目录: {os.getcwd()}", flush=True) print(f"5分钟数据存储目录: {os.path.abspath(FIVEMIN_DIR)}", flush=True) print(f"分钟数据存储目录: {os.path.abspath(MINUTELY_DIR)}", flush=True) print("=" * 70, flush=True) print("按 Ctrl+C 停止程序", flush=True) print("=" * 70, flush=True) # 初始执行一次 log_message("\n执行初始数据获取...") success = get_weather_data() # 主循环 current_month = datetime.now().month while True: try: # 计算到下一个整分钟的时间(确保在0秒执行) now = datetime.now() next_minute = now.replace(second=0, microsecond=0) + timedelta(minutes=1) wait_seconds = (next_minute - now).total_seconds() if wait_seconds > 0: wait_minutes = wait_seconds / 60 log_message(f"\n等待 {wait_seconds:.1f} 秒 ({wait_minutes:.2f} 分钟) 到下一个整分钟") time.sleep(wait_seconds) # 检查月份变化 new_month = datetime.now().month if new_month != current_month: log_message(f"\n月份变更: {current_month} -> {new_month}") current_month = new_month # 获取并处理数据 log_message("\n" + "="*70) log_message(f"开始新一轮数据采集 ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})") success = get_weather_data() except KeyboardInterrupt: log_message("\n程序已停止") break except Exception as e: # 获取详细错误信息 exc_type, exc_value, exc_traceback = sys.exc_info() error_details = traceback.format_exception(exc_type, exc_value, exc_traceback) error_msg = ''.join(error_details) log_message(f"\n主循环错误: {str(e)}") log_message(f"详细错误信息:\n{error_msg}") log_message("5分钟后重试...") time.sleep(300) # 5分钟后重试 # 重新计算下一个整分钟时间 now = datetime.now() next_minute = now.replace(second=0, microsecond=0) + timedelta(minutes=1) current_month = datetime.now().month log_message(f"新的重试时间: {next_minute.strftime('%Y-%m-%d %H:%M:%S')}") except Exception as e: # 启动错误处理 print(f"程序启动失败: {str(e)}", flush=True) traceback.print_exc(file=sys.stdout)把这段程序也用上面那个方法实现在Python程序在运行时在页面显示结果
<think>我们讨论的是如何让Python程序在运行时实时在页面(终端)显示输出结果,避免缓冲区导致的延迟显示。
在Python中,标准输出(stdout)通常是行缓冲的(当输出到终端时)或者全缓冲的(当输出到文件或管道时)。这意味着在缓冲区满或者遇到换行符之前,输出可能不会立即显示。这会导致在实时监控日志或长时间运行的任务中,输出不能及时显示,给用户造成延迟。
解决方案:
1. 使用 `sys.stdout.flush()` 强制刷新输出缓冲区。
2. 设置环境变量 `PYTHONUNBUFFERED` 为1,使Python在无缓冲模式下运行。
3. 使用 `-u` 参数运行Python脚本,相当于无缓冲模式。
4. 修改输出流为行缓冲。
下面分别说明:
方法一:在代码中手动刷新
在每次打印后调用 `sys.stdout.flush()`,例如:
print("实时输出")
sys.stdout.flush()
或者使用 `print` 函数的 `flush` 参数(Python 3.3+):
print("实时输出", flush=True)
方法二:设置环境变量
在运行脚本前设置环境变量:
export PYTHONUNBUFFERED=1 (Linux/macOS)
或
set PYTHONUNBUFFERED=1 (Windows)
然后运行脚本。
方法三:使用 `-u` 参数
python -u your_script.py
方法四:修改输出流为行缓冲
可以在程序开始时重设标准输出流为行缓冲:
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, line_buffering=True)
注意:这种方法可能会影响性能,因为每一行都会立即刷新。
对于长时间运行的程序,特别是需要实时显示日志的,推荐使用方法二或方法三,这样整个程序都处于无缓冲模式。
下面我们修改原程序,使其在打印时能够实时输出。由于原程序是一个长时间运行的程序(每小时执行一次),并且有大量的打印输出,我们可以在关键位置添加刷新,或者使用无缓冲模式。
但是,由于原程序已经部署,我们可能不希望修改所有的print语句。因此,我们可以考虑在程序开始时设置标准输出为行缓冲。
修改建议:
在main_loop函数的开头,添加以下代码:
import sys
import io
# 设置标准输出为行缓冲
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, line_buffering=True)
这样,每次打印一行(以换行符结束)时,输出就会立即刷新。
另外,我们也可以考虑使用环境变量方法,这样不需要修改代码,只需在启动脚本时加上 `-u` 参数。
例如,将最后一行改为:
if __name__ == '__main__':
# 设置无缓冲输出
sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', buffering=1)
main_loop()
但是注意,在Python中,使用`buffering=1`(行缓冲)仅当输出到终端时才有效,如果输出被重定向到文件,则可能仍然是全缓冲。
因此,更可靠的方法是在启动脚本时使用`-u`参数。
由于原程序没有使用重定向,我们可以选择在代码中设置行缓冲。
我们修改主程序入口,如下:
if __name__ == '__main__':
# 设置标准输出为行缓冲
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, line_buffering=True)
main_loop()
但是注意:这种方法可能会改变标准输出的行为,并且可能在某些环境中不可用(例如,当标准输出被重定向时)。因此,我们也可以选择在关键输出处使用`flush=True`。
考虑到原程序中有多处print,我们选择在程序开始时设置行缓冲。
另外,在Windows环境中,行缓冲可能不起作用(因为Windows的I/O模型不同),所以使用`-u`参数可能是跨平台的最佳选择。
因此,建议用户启动程序时使用:
python -u main.py
如果用户希望从代码内部解决,可以在程序开头添加:
import os
import sys
# 尝试设置无缓冲模式(仅对支持它的系统有效)
sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', buffering=1)
注意:在Python文档中,`buffering=1`表示行缓冲,但仅当输出到终端时有效;如果输出到管道或文件,则可能被强制为全缓冲。所以,如果用户将输出重定向到文件,那么即使设置了行缓冲,也可能不会立即刷新。
因此,对于需要实时显示在终端的情况,我们建议用户直接使用`python -u`运行。
下面,我们修改原程序,在`main_loop`函数开始前设置标准输出为行缓冲(如果可能的话),并在关键循环处也添加刷新,以确保输出实时。
但是,由于原程序已经是一个完整的长时间运行程序,我们可以在其主循环开始前设置行缓冲,并修改所有print语句添加`flush=True`,但这样改动较大。
权衡之下,我们选择在程序开始处设置行缓冲,并修改几个关键位置的print(比如循环内的打印)添加`flush=True`,以双重保证。
修改步骤:
1. 在程序开头导入`sys`和`io`(如果还没有导入的话)。
2. 在`main_loop`函数的第一行添加设置行缓冲的代码。
3. 在长时间循环中的打印语句添加`flush=True`。
但是,原程序已经导入了`sys`,所以我们只需要在`main_loop`函数开始处设置:
# 设置标准输出为行缓冲
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, line_buffering=True)
注意:这个操作可能会被某些环境不支持(比如Windows下的某些终端),所以如果遇到问题,建议使用`-u`参数。
另外,我们也可以在`get_weather_data`函数中,以及`process_minutely_data_for_month`函数中,在关键循环处添加`flush=True`。
但是,考虑到原程序打印的地方并不多,我们可以先尝试全局设置行缓冲。
修改后的`main_loop`函数开始部分:
def main_loop():
import io
# 设置行缓冲
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, line_buffering=True)
print("保定气象数据处理系统已启动")
... # 其余代码不变
但是,注意:在程序运行过程中,如果标准输出被重定向,这个设置可能会失效。所以,我们更推荐使用`-u`参数。
因此,我们可以在程序文档中说明,建议使用`python -u`运行,同时我们在代码中设置行缓冲作为双重保险。
另外,我们也可以不修改原程序,而是通过修改启动方式。所以,请根据实际情况选择。
由于问题要求程序在运行时实时显示输出,我们提供两种方案:
方案一:修改代码,设置行缓冲并关键打印处刷新。
方案二:不修改代码,启动时使用`-u`参数。
下面我们给出方案一的代码修改示例(在原有代码基础上修改):
步骤:
1. 在程序开头导入`io`(如果还没有的话)。
2. 在`main_loop`函数开头设置行缓冲。
3. 在`main_loop`函数的循环打印处添加`flush=True`(可选,但为了保险,可以在每次打印后刷新,但使用行缓冲后,每次换行会自动刷新)。
修改后的代码片段(只显示修改部分):
import sys
import io # 新增导入
... 其他导入不变 ...
def main_loop():
# 设置标准输出为行缓冲
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, line_buffering=True)
print("保定气象数据处理系统已启动")
... # 其余代码不变,但注意print语句现在会自动刷新(因为行缓冲)
但是,注意:在Windows环境下,`io.TextIOWrapper`的行缓冲可能不起作用。因此,在Windows下,我们可能需要使用`-u`参数。
因此,我们建议用户:
- 在Linux/macOS下,可以尝试在代码中设置行缓冲;
- 在Windows下,使用`python -u`运行。
或者,我们可以在代码中尝试设置无缓冲(而不是行缓冲):
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', write_through=True)
在Windows中,`write_through`可以绕过缓冲,立即写入。但是,这仅在Windows下有效。
为了跨平台,我们可以这样:
if sys.platform == "win32":
# Windows下使用无缓冲
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8', write_through=True)
else:
# 其他平台使用行缓冲
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, line_buffering=True)
但是,这样设置后,在Windows下就是无缓冲,而其他平台是行缓冲。
注意:无缓冲意味着每个字符都会立即输出,可能会影响性能,但通常对于日志输出影响不大。
因此,我们修改`main_loop`函数开头:
def main_loop():
import io
import sys
# 设置标准输出为无缓冲(Windows)或行缓冲(其他)
if sys.platform == "win32":
# Windows: 使用无缓冲
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', write_through=True)
else:
# 非Windows: 行缓冲
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, line_buffering=True)
print("保定气象数据处理系统已启动")
...
但是,原程序已经导入了`sys`,所以这里不需要再导入`sys`,但需要确保导入`io`。
另外,注意:如果程序运行在非终端环境(如作为服务运行),则可能不需要实时显示,所以这种设置可能不是必须的。
因此,我们可以在代码中添加此设置,以解决实时显示问题。
最后,我们修改原程序:在`main_loop`函数开始处添加上述代码。
同时,我们也可以考虑在程序的其他关键循环中,对于重要的打印信息,使用`print(..., flush=True)`来确保刷新。
例如,在`get_weather_data`函数中,打印错误信息时,可以添加`flush=True`。
但是,由于我们已经设置了标准输出的缓冲模式,所以可以不用在每个print处添加flush。
修改后的完整代码较长,这里只展示修改部分:
# 在程序开头确保导入io和sys(如果之前没有)
import sys
import io
... 其他代码 ...
def main_loop():
# 设置标准输出为无缓冲(Windows)或行缓冲(其他)
if sys.platform == "win32":
# Windows: 使用无缓冲
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', write_through=True)
else:
# 非Windows: 行缓冲
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, line_buffering=True)
print("保定气象数据处理系统已启动")
... # 原main_loop函数的其余代码
这样修改后,程序在运行时应该能够实时显示输出结果。
注意:如果程序在运行中重定向了标准输出(例如重定向到文件),则行缓冲可能变为全缓冲,此时设置可能失效。因此,如果用户需要重定向输出并实时查看文件内容,建议使用`tail -f`命令(Linux)或使用支持实时刷新的编辑器。
另外,如果用户使用Docker等容器运行,可能需要设置容器的输出为行缓冲模式,这通常通过设置环境变量`PYTHONUNBUFFERED=1`来实现。
综上所述,我们通过修改代码设置输出缓冲模式,以及建议启动时使用`-u`参数,可以解决输出延迟显示的问题。
下面,我们提供修改后的完整程序(只修改了main_loop开头部分,并添加了必要的导入)。
注意:原程序开头已经导入了sys,所以只需要添加io导入,并在main_loop中设置。
由于原程序较长,这里只显示修改后的main_loop函数和导入部分:
import sys
import io # 新增
... 其他导入 ...
def main_loop():
# 设置标准输出缓冲:Windows无缓冲,其他行缓冲
if sys.platform == "win32":
# Windows: 使用无缓冲
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', write_through=True)
else:
# 非Windows: 行缓冲
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, line_buffering=True)
print("保定气象数据处理系统已启动")
... # 原main_loop的其余代码
请根据实际环境测试效果。
如果问题仍然存在,请检查:
- 是否在Windows下运行且`write_through`设置未生效?可以尝试使用`-u`参数。
- 是否输出被重定向?重定向的情况下可能需要额外的处理。
最后,生成相关问题:</think>要让 Python 程序在运行时实时显示输出结果并防止缓冲区导致的延迟显示,需要解决标准输出流(stdout)的缓冲问题。以下是四种可靠方法,按推荐度排序:
### 方法 1:使用 `print()` 的 `flush` 参数(推荐)
在每次调用 `print()` 时设置 `flush=True`,强制立即刷新输出缓冲区:
```python
print("实时更新数据...", flush=True) # 添加 flush=True
```
**修改原程序示例**(在关键位置添加):
```python
# 在 get_weather_data() 函数中
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} 开始获取保定未来24小时天气预报...", flush=True)
# 在 process_single_month() 函数中
print(f"处理月份: {month_key}, 文件: {filename}", flush=True)
```
### 方法 2:设置环境变量 `PYTHONUNBUFFERED`
在运行程序前设置环境变量,禁用输出缓冲:
```bash
# Linux/macOS
export PYTHONUNBUFFERED=1
python your_script.py
# Windows
set PYTHONUNBUFFERED=1
python your_script.py
```
### 方法 3:使用 `-u` 参数运行 Python
通过命令行参数禁用缓冲:
```bash
python -u your_script.py
```
### 方法 4:在代码中手动刷新缓冲区
使用 `sys.stdout.flush()` 强制刷新:
```python
import sys
print("实时数据更新")
sys.stdout.flush() # 立即刷新
```
### 针对原程序的优化建议
在气象数据处理系统中,建议在以下位置添加刷新:
```python
def main_loop():
print("保定气象数据处理系统已启动", flush=True) # 修改点1
while True:
try:
updated_months = get_weather_data()
for month in updated_months:
process_minutely_data_for_month(month)
print(f"下次更新时间: {next_hour.strftime('%Y-%m-%d %H:%M:%S')}", flush=True) # 修改点2
```
### 技术原理说明
Python 的 stdout 默认使用**行缓冲**(终端)或**块缓冲**(重定向到文件)[^1]。这会导致:
1. 输出在换行符 `\n` 出现前被缓存
2. 长时间运行的任务输出延迟显示
通过上述方法可强制系统立即刷新缓冲区,确保实时显示。对于气象数据系统,推荐组合使用方法 1 和方法 2:
- 方法 1 保证关键输出实时显示
- 方法 2 确保整个程序无缓冲运行
### 验证效果
```python
# 测试脚本 realtime_output.py
import time
for i in range(5):
print(f"进度: {i+1}/5", flush=True) # 实时显示
time.sleep(1)
```
运行:
```bash
python -u realtime_output.py
```
将每秒输出一行,无延迟。
### 相关问题
1. 如何在 Docker 容器中实时查看 Python 程序的输出日志?[^2]
2. 如何将 Python 程序的实时输出同时显示在终端和保存到文件?
3. Python 的 `logging` 模块如何配置实时输出?
4. 长时间运行的 Python 脚本如何避免输出缓冲区溢出?[^3]
5. 在 Windows 系统下如何确保 Python 输出实时刷新?
[^1]: 使用 `flush=True` 或 `sys.stdout.flush()` 强制刷新输出缓冲区可解决延迟问题。
[^2]: 通过设置环境变量 `PYTHONUNBUFFERED=1` 可确保容器内输出实时显示。
[^3]: 合理设置缓冲区大小和定期刷新可防止溢出,特别是在串口通信等场景。
阅读全文
相关推荐

















