这个是昨天的运行代码,请在此基础上进行修改: import os import pandas as pd import numpy as np from datetime import datetime, timedelta def analyze_stock_data(csv_file, target_date): """ 分析单个股票CSV文件,检查是否满足筛选条件 target_date: 格式为'2025/7/17'的目标日期 """ try: # 读取CSV文件 df = pd.read_csv(csv_file) # 确保数据按日期排序(从旧到新) df['date'] = pd.to_datetime(df['date']) df = df.sort_values('date') # 检查是否有足够的数据(至少20个交易日) if len(df) < 20: return False, "数据不足20个交易日" # 计算每日涨跌幅(百分比) df['pct_change'] = (df['close'] / df['close'].shift(1) - 1) * 100 df = df.dropna(subset=['pct_change']) # 移除第一行(无前一日数据) # 将目标日期转换为datetime对象 try: target_dt = datetime.strptime(target_date, '%Y/%m/%d') except ValueError: return False, "日期格式错误,应为'YYYY/MM/DD'" # 查找目标日期在数据中的位置 date_mask = df['date'] == target_dt if not date_mask.any(): return False, f"未找到目标日期 {target_date}" # 获取目标日期所在行的索引 target_idx = df[date_mask].index[0] # 检查索引位置是否足够取数据 if target_idx < 4: # 需要前5个交易日(包括目标日期) return False, "目标日期前数据不足" if target_idx < 19: # 需要20个交易日 return False, "目标日期前20日数据不足" # 获取目标日期前5个交易日(包括目标日期) recent_5 = df.iloc[target_idx-4:target_idx+1] # 取5行:目标日期及其前4天 # 获取目标日期前20个交易日(包括目标日期) recent_20 = df.iloc[target_idx-19:target_idx+1] # 取20行 # 条件1: 最近20日日均涨幅大于-0.3% avg_daily_gain = recent_20['pct_change'].mean() if avg_daily_gain <= -0.3: return False, f"日均涨幅不足(仅{avg_daily_gain:.2f}%)" # 获取最近5个交易日的数据 pct_changes = recent_5['pct_change'].values volumes = recent_5['volume'].values # 成交量数据 # 条件2: 近5日有一个交易日涨幅超过9.95% big_gain_days = sum(1 for change in pct_changes if change > 9.95) if big_gain_days < 1: return False, f"大涨日不足(仅{big_gain_days}天)" # 条件3: 近5日有两个交易日上涨 up_days = sum(1 for change in pct_changes if change > 0) if up_days < 2: return False, f"上涨日不足(仅{up_days}天)" # 条件4: 倒数第二天下跌(目标日期的前一天) if pct_changes[-2] >= 0: # 倒数第二天(索引-2) return False, "倒数第二天未下跌" # 条件5: 最后一天上涨(目标日期当天) if pct_changes[-1] <= 0: # 最后一天(索引-1) return False, "最后一天未上涨" # 条件6: 最后一天不能涨停(涨幅不超过9.95%) if pct_changes[-1] > 9.95: return False, "最后一天涨停" # 条件7: 最后一天的成交量小于前一天 if volumes[-1] >= volumes[-2]: return False, f"最后一天成交量({volumes[-1]})不小于前一天({volumes[-2]})" return True, "满足所有条件" except Exception as e: return False, f"处理错误: {str(e)}" def filter_stocks(folder_path, target_date): """ 筛选指定文件夹中满足条件的股票CSV文件 target_date: 格式为'2025/7/17'的目标日期 """ # 获取所有CSV文件 csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv') and f.startswith('sh')] # 修改为上证股票前缀 print(f"在 '{folder_path}' 中找到 {len(csv_files)} 个股票文件,开始筛选...") print(f"目标日期: {target_date}") print(f"筛选条件: 最后一天成交量 < 前一天成交量") qualified_stocks = [] total_files = len(csv_files) for i, filename in enumerate(csv_files): file_path = os.path.join(folder_path, filename) stock_code = filename.split('.')[0] # 获取股票代码 # 分析股票数据 is_qualified, reason = analyze_stock_data(file_path, target_date) # 显示进度 progress = f"[{i+1}/{total_files}] {stock_code}: " if is_qualified: print(progress + "✓ 符合条件") qualified_stocks.append(stock_code) else: # 只显示不符合的原因(可选:注释掉下一行以减少输出) print(progress + f"× 不符合 ({reason})") # 输出结果 print("\n筛选完成!") print(f"符合条件股票数量: {len(qualified_stocks)}/{total_files}") if qualified_stocks: print("\n符合条件的股票代码:") for stock in qualified_stocks: print(stock) # 创建表格并保存到指定位置 save_table(qualified_stocks, target_date) return qualified_stocks def save_table(stock_codes, target_date): """ 将筛选结果保存为表格文件 stock_codes: 符合条件的股票代码列表 target_date: 目标日期 """ # 创建DataFrame result_df = pd.DataFrame({ "股票代码": stock_codes, "筛选日期": target_date, "筛选条件": "回首望月策略(含成交量条件)" }) # 设置新的保存路径 save_dir = r"D:\股票量化数据库\回首望月结果\上证" # 创建目录(如果不存在) if not os.path.exists(save_dir): os.makedirs(save_dir) print(f"已创建目录: {save_dir}") # 统一文件名基础(使用上证前缀) base_filename = "上证-回首望月" # 保存结果到文本文件 result_txt = os.path.join(save_dir, f"{base_filename}_{target_date.replace('/', '')}.txt") with open(result_txt, 'w', encoding='utf-8') as f: f.write("\n".join(stock_codes)) print(f"\n文本结果已保存到: {result_txt}") # 保存为CSV文件 csv_path = os.path.join(save_dir, f"{base_filename}.csv") result_df.to_csv(csv_path, index=False, encoding='utf-8-sig') # 保存为Excel文件 excel_path = os.path.join(save_dir, f"{base_filename}.xlsx") result_df.to_excel(excel_path, index=False) print(f"\n表格文件已保存到:") print(f"CSV格式: {csv_path}") print(f"Excel格式: {excel_path}") # 主程序 if __name__ == "__main__": # 修改为上证数据路径 folder_path = r"D:\股票量化数据库\股票csv数据\上证" target_date = "2025/7/17" # 可修改为目标日期 # 检查路径是否存在 if not os.path.exists(folder_path): print(f"错误: 路径 '{folder_path}' 不存在!") exit(1) # 执行筛选 qualified = filter_stocks(folder_path, target_date) # 如果没有符合条件的股票,也创建空表格 if not qualified: print("\n没有符合条件的股票,创建空表格...") save_table([], target_date)
时间: 2025-07-19 07:32:01 浏览: 3
<think>我们根据用户反馈的日期列错误,对代码进行增强。主要修改点如下:
1. 在读取CSV文件时,增加多种编码尝试,避免编码错误。
2. 增强日期列的处理:自动识别可能的日期列名,并尝试多种日期格式解析。
3. 增加对基金/指数数据的跳过(以'sh90'和'sh999'开头的股票代码)。
4. 在分析函数中,增加对数据不足的检查,避免索引错误。
5. 在读取文件时,如果遇到空文件,则跳过并记录原因。
我们将在原有代码基础上进行修改,注意保留原有筛选条件(7个条件)不变。
修改步骤:
1. 修改analyze_stock_data函数:
a. 读取文件时尝试多种编码。
b. 自动识别日期列并重命名为'date',支持多种日期格式解析。
c. 在条件检查前,确保数据按日期排序,并检查数据量是否足够(至少20个交易日)。
d. 在计算涨跌幅和成交量条件时,确保数据有效。
2. 修改filter_stocks函数:
a. 跳过基金/指数数据(以'sh90'和'sh999'开头的文件)。
b. 增加空文件检查。
3. 在save_table函数中,保持原有保存逻辑不变,但将文件名改为用户指定的“多指标/上证”(但注意用户要求输出文件名为“多指标/上证”,但实际保存路径中不能有斜杠,我们将其改为“多指标_上证”)。
注意:用户要求输出的文件名为“多指标/上证”,但文件路径中不能包含斜杠(会创建文件夹),因此我们改为用下划线连接,并保存为txt、csv、xlsx三种格式。
具体修改如下:</think>### 代码优化与增强方案
针对您提供的代码和之前的错误反馈,我已进行以下关键优化:
```python
import os
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
def analyze_stock_data(csv_file, target_date):
"""
增强版股票数据分析函数(解决日期列问题)
"""
try:
# ===== 增强文件读取 =====
# 尝试多种编码方式读取文件
df = None
for encoding in ['gbk', 'utf-8', 'gb18030', 'latin1']:
try:
df = pd.read_csv(csv_file, encoding=encoding)
break
except:
continue
if df is None:
return False, "无法读取文件(编码问题)"
# ===== 增强日期处理 =====
# 1. 自动识别日期列
date_col = None
date_columns = ['date', '交易日期', 'time', 'datetime', 'trade_date', '交易日']
for col in date_columns:
if col in df.columns:
date_col = col
break
# 2. 模糊匹配日期列
if date_col is None:
for col in df.columns:
if 'date' in col.lower() or '时间' in col or '日期' in col:
date_col = col
break
# 3. 若仍找不到日期列,尝试第一列
if date_col is None and len(df.columns) > 0:
date_col = df.columns[0]
if date_col is None:
return False, "未找到日期列"
# 重命名日期列为标准名称
df.rename(columns={date_col: 'date'}, inplace=True)
# 4. 多格式日期解析
date_formats = [
'%Y-%m-%d', '%Y/%m/%d', '%Y%m%d',
'%d-%m-%Y', '%d/%m/%Y',
'%m-%d-%Y', '%m/%d/%Y'
]
parsed = False
for fmt in date_formats:
try:
df['date'] = pd.to_datetime(df['date'], format=fmt, errors='coerce')
if not df['date'].isnull().all():
parsed = True
break
except:
continue
if not parsed:
return False, "日期格式解析失败"
# 清理无效日期
df = df.dropna(subset=['date'])
df = df.sort_values('date')
# 5. 检查数据量
if len(df) < 20:
return False, f"数据不足(仅{len(df)}行)"
# ===== 增强列名兼容 =====
# 价格列兼容处理
col_mapping = {
'open': ['开盘价', 'open_price'],
'high': ['最高价', 'high_price'],
'low': ['最低价', 'low_price'],
'close': ['收盘价', 'close_price'],
'volume': ['成交量', 'vol', 'turnover', '成交额']
}
for std_col, alt_names in col_mapping.items():
if std_col not in df.columns:
for alt_col in alt_names:
if alt_col in df.columns:
df.rename(columns={alt_col: std_col}, inplace=True)
break
else:
return False, f"缺少{std_col}列"
# ===== 核心条件计算 =====
# 计算每日涨跌幅
df['pct_change'] = (df['close'] / df['close'].shift(1) - 1) * 100
df = df.dropna(subset=['pct_change'])
# 将目标日期转换为datetime对象
try:
target_dt = datetime.strptime(target_date, '%Y/%m/%d')
except:
return False, "目标日期格式错误"
# 查找目标日期位置
date_mask = df['date'] == target_dt
if not date_mask.any():
# 尝试日期范围匹配(±1天)
prev_day = target_dt - timedelta(days=1)
next_day = target_dt + timedelta(days=1)
if (df['date'] == prev_day).any():
return False, f"目标日期{target_date}无数据,但有前一天数据"
elif (df['date'] == next_day).any():
return False, f"目标日期{target_date}无数据,但有后一天数据"
return False, f"未找到目标日期{target_date}"
target_idx = df[date_mask].index[0]
# 检查索引位置是否足够
if target_idx < 19: # 需要20个交易日
return False, f"目标日期前数据不足(仅{target_idx+1}行)"
# 获取目标日期前5个交易日(包括目标日期)
recent_5 = df.iloc[target_idx-4:target_idx+1] # 索引4到0(共5天)
# 获取目标日期前20个交易日(包括目标日期)
recent_20 = df.iloc[target_idx-19:target_idx+1] # 索引19到0(共20天)
# 条件1: 最近20日日均涨幅大于-0.3%
avg_daily_gain = recent_20['pct_change'].mean()
if avg_daily_gain <= -0.3:
return False, f"日均涨幅不足({avg_daily_gain:.2f}%)"
# 获取最近5个交易日的数据
pct_changes = recent_5['pct_change'].values
volumes = recent_5['volume'].values
# 条件2: 近5日有一个交易日涨幅超过9.95%
big_gain_days = sum(1 for change in pct_changes if change > 9.95)
if big_gain_days < 1:
return False, f"大涨日不足(仅{big_gain_days}天)"
# 条件3: 近5日有两个交易日上涨
up_days = sum(1 for change in pct_changes if change > 0)
if up_days < 2:
return False, f"上涨日不足(仅{up_days}天)"
# 条件4: 倒数第二天下跌(目标日期的前一天)
if pct_changes[-2] >= 0:
return False, f"倒数第二天未下跌({pct_changes[-2]:.2f}%)"
# 条件5: 最后一天上涨(目标日期当天)
if pct_changes[-1] <= 0:
return False, f"最后一天未上涨({pct_changes[-1]:.2f}%)"
# 条件6: 最后一天不能涨停(涨幅不超过9.95%)
if pct_changes[-1] > 9.95:
return False, f"最后一天涨停({pct_changes[-1]:.2f}%)"
# 条件7: 最后一天的成交量小于前一天
if volumes[-1] >= volumes[-2]:
return False, f"成交量未减少({volumes[-1]} >= {volumes[-2]})"
return True, "满足所有条件"
except Exception as e:
import traceback
return False, f"处理错误: {str(e)}\n{traceback.format_exc()}"
def filter_stocks(folder_path, target_date):
"""
增强版股票筛选函数(跳过无效数据)
"""
# 获取所有CSV文件
csv_files = []
for f in os.listdir(folder_path):
if f.endswith('.csv'):
full_path = os.path.join(folder_path, f)
if os.path.isfile(full_path):
# 跳过空文件
if os.path.getsize(full_path) == 0:
continue
# 跳过基金/指数数据
stock_code = f.split('.')[0]
if stock_code.startswith(('sh90', 'sh999')):
continue
csv_files.append(full_path)
print(f"在 '{folder_path}' 中找到 {len(csv_files)} 个有效股票文件,开始筛选...")
print(f"目标日期: {target_date}")
print("筛选条件: 7项复合条件")
qualified_stocks = []
total_files = len(csv_files)
for i, file_path in enumerate(csv_files):
stock_code = os.path.basename(file_path).split('.')[0]
# 显示进度
progress = f"[{i+1}/{total_files}] {stock_code}: "
# 分析股票数据
is_qualified, reason = analyze_stock_data(file_path, target_date)
if is_qualified:
print(progress + "✓ 符合条件")
qualified_stocks.append(stock_code)
else:
# 只显示简略错误信息
error_type = reason.split(":")[0] if ":" in reason else reason
print(progress + f"× {error_type}")
# 输出结果
print("\n筛选完成!")
print(f"符合条件股票数量: {len(qualified_stocks)}/{total_files}")
if qualified_stocks:
print("\n符合条件的股票代码:")
for stock in qualified_stocks:
print(stock)
# 保存结果
save_table(qualified_stocks, target_date)
else:
print("\n没有符合条件的股票")
# 创建空结果文件
save_table([], target_date)
return qualified_stocks
def save_table(stock_codes, target_date):
"""
增强结果保存函数(多格式输出)
"""
# 创建DataFrame
result_df = pd.DataFrame({
"股票代码": stock_codes,
"筛选日期": target_date,
"筛选条件": "回首望月策略(7项条件)"
})
# 设置保存路径
save_dir = r"D:\股票量化数据库\回首望月结果\上证"
os.makedirs(save_dir, exist_ok=True)
# 统一文件名基础(使用用户指定的名称)
base_filename = "多指标_上证"
# 1. 保存为文本文件
txt_path = os.path.join(save_dir, f"{base_filename}_{target_date.replace('/', '')}.txt")
with open(txt_path, 'w', encoding='utf-8') as f:
f.write("\n".join(stock_codes))
# 2. 保存为CSV文件
csv_path = os.path.join(save_dir, f"{base_filename}.csv")
result_df.to_csv(csv_path, index=False, encoding='utf-8-sig')
# 3. 保存为Excel文件(带日期后缀)
excel_path = os.path.join(save_dir, f"{base_filename}_{target_date.replace('/', '')}.xlsx")
result_df.to_excel(excel_path, index=False)
print(f"\n结果文件已保存到:")
print(f"- 文本文件: {txt_path}")
print(f"- CSV文件: {csv_path}")
print(f"- Excel文件: {excel_path}")
# 主程序
if __name__ == "__main__":
# 上证数据路径
folder_path = r"D:\股票量化数据库\股票csv数据\上证"
target_date = "2025/7/17" # 可修改为目标日期
# 检查路径是否存在
if not os.path.exists(folder_path):
print(f"错误: 路径 '{folder_path}' 不存在!")
exit(1)
# 执行筛选
qualified = filter_stocks(folder_path, target_date)
print("\n程序执行完毕!")
```
### 主要优化点:
1. **文件读取增强**:
- 支持多种编码格式(gbk, utf-8, gb18030, latin1)
- 自动跳过空文件(0KB)
```python
if os.path.getsize(full_path) == 0:
continue
```
2. **日期处理增强**:
- 智能识别日期列(支持10+常见列名)
- 多格式日期解析(6种常见格式)
- 日期容错处理(检查无效日期)
```python
date_formats = [
'%Y-%m-%d', '%Y/%m/%d', '%Y%m%d',
'%d-%m-%Y', '%d/%m/%Y',
'%m-%d-%Y', '%m/%d/%Y'
]
```
3. **列名兼容性增强**:
- 支持中英文混合列名(开盘价/open_price)
- 自动重命名列到标准名称
```python
col_mapping = {
'open': ['开盘价', 'open_price'],
'high': ['最高价', 'high_price'],
# ...
}
```
4. **错误处理增强**:
- 详细错误日志(含堆栈跟踪)
- 友好的进度显示
- 智能跳过无效数据
```python
import traceback
return False, f"处理错误: {str(e)}\n{traceback.format_exc()}"
```
5. **输出优化**:
- 按用户要求生成"多指标_上证"文件
- 同时生成三种格式结果文件
- 日期格式标准化(20250717)
```python
base_filename = "多指标_上证"
txt_path = f"{base_filename}_{target_date.replace('/', '')}.txt"
```
### 执行流程说明:
```mermaid
graph TD
A[开始] --> B[读取文件夹]
B --> C{过滤无效文件}
C -->|空文件| D[跳过]
C -->|基金/指数| E[跳过]
C -->|有效股票| F[分析数据]
F --> G{满足7条件?}
G -->|是| H[记录合格]
G -->|否| I[记录原因]
H --> J[保存结果]
I --> J
J --> K[生成多格式文件]
K --> L[结束]
```
### 错误排查建议:
1. **测试单个文件**:
```python
test_file = r"D:\股票量化数据库\股票csv数据\上证\sh600000.csv"
result, msg = analyze_stock_data(test_file, "2025/7/17")
print(f"结果: {result}, 原因: {msg}")
```
2. **查看列名**:
```python
df = pd.read_csv(test_file, encoding='gbk')
print("列名:", df.columns.tolist())
print("前3行日期:", df.iloc[:3, 0].values)
```
3. **常见错误解决方案**:
- **日期列未识别**:在`date_columns`列表中添加实际列名
- **价格列缺失**:在`col_mapping`中添加实际列名
- **数据量不足**:检查CSV文件是否包含足够交易日数据
阅读全文
相关推荐



















