帮我在这个代码的基础上附加上上面的指标用于筛选出优质的股票,寻找底部背离的股票: import os import pandas as pd import numpy as np from datetime import datetime, timedelta def analyze_stock_data(csv_file, target_date): """ 分析单个股票CSV文件,检查是否满足筛选条件 target_date: 格式为'2025/7/17'的目标日期 """ try: # 读取CSV文件 df = pd.read_csv(csv_file) # 确保数据按日期排序(从旧到新) df['date'] = pd.to_datetime(df['date']) df = df.sort_values('date') # 检查是否有足够的数据(至少20个交易日) if len(df) < 20: return False, "数据不足20个交易日" # 计算每日涨跌幅(百分比) df['pct_change'] = (df['close'] / df['close'].shift(1) - 1) * 100 df = df.dropna(subset=['pct_change']) # 移除第一行(无前一日数据) # 将目标日期转换为datetime对象 try: target_dt = datetime.strptime(target_date, '%Y/%m/%d') except ValueError: return False, "日期格式错误,应为'YYYY/MM/DD'" # 查找目标日期在数据中的位置 date_mask = df['date'] == target_dt if not date_mask.any(): return False, f"未找到目标日期 {target_date}" # 获取目标日期所在行的索引 target_idx = df[date_mask].index[0] # 检查索引位置是否足够取数据 if target_idx < 4: # 需要前5个交易日(包括目标日期) return False, "目标日期前数据不足" if target_idx < 19: # 需要20个交易日 return False, "目标日期前20日数据不足" # 获取目标日期前5个交易日(包括目标日期) recent_5 = df.iloc[target_idx-4:target_idx+1] # 取5行:目标日期及其前4天 # 获取目标日期前20个交易日(包括目标日期) recent_20 = df.iloc[target_idx-19:target_idx+1] # 取20行 # 条件1: 最近20日日均涨幅大于-0.3% avg_daily_gain = recent_20['pct_change'].mean() if avg_daily_gain <= -0.3: return False, f"日均涨幅不足(仅{avg_daily_gain:.2f}%)" # 获取最近5个交易日的数据 pct_changes = recent_5['pct_change'].values volumes = recent_5['volume'].values # 成交量数据 # 条件2: 近5日有一个交易日涨幅超过9.95% big_gain_days = sum(1 for change in pct_changes if change > 9.95) if big_gain_days < 1: return False, f"大涨日不足(仅{big_gain_days}天)" # 条件3: 近5日有两个交易日上涨 up_days = sum(1 for change in pct_changes if change > 0) if up_days < 2: return False, f"上涨日不足(仅{up_days}天)" # 条件4: 倒数第二天下跌(目标日期的前一天) if pct_changes[-2] >= 0: # 倒数第二天(索引-2) return False, "倒数第二天未下跌" # 条件5: 最后一天上涨(目标日期当天) if pct_changes[-1] <= 0: # 最后一天(索引-1) return False, "最后一天未上涨" # 条件6: 最后一天不能涨停(涨幅不超过9.95%) if pct_changes[-1] > 9.95: return False, "最后一天涨停" # 条件7: 最后一天的成交量小于前一天 if volumes[-1] >= volumes[-2]: return False, f"最后一天成交量({volumes[-1]})不小于前一天({volumes[-2]})" return True, "满足所有条件" except Exception as e: return False, f"处理错误: {str(e)}" def filter_stocks(folder_path, target_date): """ 筛选指定文件夹中满足条件的股票CSV文件 target_date: 格式为'2025/7/17'的目标日期 """ # 获取所有CSV文件 csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv') and f.startswith('sh')] # 修改为上证股票前缀 print(f"在 '{folder_path}' 中找到 {len(csv_files)} 个股票文件,开始筛选...") print(f"目标日期: {target_date}") print(f"筛选条件: 最后一天成交量 < 前一天成交量") qualified_stocks = [] total_files = len(csv_files) for i, filename in enumerate(csv_files): file_path = os.path.join(folder_path, filename) stock_code = filename.split('.')[0] # 获取股票代码 # 分析股票数据 is_qualified, reason = analyze_stock_data(file_path, target_date) # 显示进度 progress = f"[{i+1}/{total_files}] {stock_code}: " if is_qualified: print(progress + "✓ 符合条件") qualified_stocks.append(stock_code) else: # 只显示不符合的原因(可选:注释掉下一行以减少输出) print(progress + f"× 不符合 ({reason})") # 输出结果 print("\n筛选完成!") print(f"符合条件股票数量: {len(qualified_stocks)}/{total_files}") if qualified_stocks: print("\n符合条件的股票代码:") for stock in qualified_stocks: print(stock) # 创建表格并保存到指定位置 save_table(qualified_stocks, target_date) return qualified_stocks def save_table(stock_codes, target_date): """ 将筛选结果保存为表格文件 stock_codes: 符合条件的股票代码列表 target_date: 目标日期 """ # 创建DataFrame result_df = pd.DataFrame({ "股票代码": stock_codes, "筛选日期": target_date, "筛选条件": "回首望月策略(含成交量条件)" }) # 设置新的保存路径 save_dir = r"D:\股票量化数据库\回首望月结果\上证" # 创建目录(如果不存在) if not os.path.exists(save_dir): os.makedirs(save_dir) print(f"已创建目录: {save_dir}") # 统一文件名基础(使用上证前缀) base_filename = "上证-回首望月" # 保存结果到文本文件 result_txt = os.path.join(save_dir, f"{base_filename}_{target_date.replace('/', '')}.txt") with open(result_txt, 'w', encoding='utf-8') as f: f.write("\n".join(stock_codes)) print(f"\n文本结果已保存到: {result_txt}") # 保存为CSV文件 csv_path = os.path.join(save_dir, f"{base_filename}.csv") result_df.to_csv(csv_path, index=False, encoding='utf-8-sig') # 保存为Excel文件 excel_path = os.path.join(save_dir, f"{base_filename}.xlsx") result_df.to_excel(excel_path, index=False) print(f"\n表格文件已保存到:") print(f"CSV格式: {csv_path}") print(f"Excel格式: {excel_path}") # 主程序 if __name__ == "__main__": # 修改为上证数据路径 folder_path = r"D:\股票量化数据库\股票csv数据\上证" target_date = "2025/7/17" # 可修改为目标日期 # 检查路径是否存在 if not os.path.exists(folder_path): print(f"错误: 路径 '{folder_path}' 不存在!") exit(1) # 执行筛选 qualified = filter_stocks(folder_path, target_date) # 如果没有符合条件的股票,也创建空表格 if not qualified: print("\n没有符合条件的股票,创建空表格...") save_table([], target_date)
时间: 2025-07-19 20:32:11 浏览: 5
<think>我们将在原有筛选条件的基础上,增加底部背离检测。底部背离通常指的是价格创新低而指标未创新低,这里我们可以使用MACD、RSI或者KDJ等指标。考虑到原问题要求仅使用CSV文件中的特征(开盘价、最高价、最低价、收盘价、成交量),我们选择RSI指标(相对强弱指数)作为背离检测的指标,因为RSI只需要收盘价即可计算。
修改思路:
1. 在分析函数中增加RSI指标的计算(使用14日周期)
2. 在目标日期前寻找底部背离形态:
- 需要找到两个价格低点(第二个低点低于第一个低点)
- 对应的RSI值第二个低点高于第一个低点(形成背离)
3. 将底部背离作为新的筛选条件加入
具体步骤:
步骤1:计算RSI指标
步骤2:在目标日期前寻找最近的两个低点(要求第二个低点出现在目标日期前的最近一段时间,比如20个交易日内,第一个低点出现在第二个低点之前的30个交易日内)
步骤3:检查价格和RSI是否形成底背离(价格新低,RSI未新低)
注意:由于我们是在目标日期前寻找背离,所以需要确保在目标日期前已经形成了背离形态。
另外,我们还可以考虑加入其他指标,如成交量在第二个低点是否放大(通常底背离时成交量会有所放大),但原条件已经对成交量有要求(最后一天成交量小于前一天),所以这里我们主要关注价格和RSI的背离。
修改函数:analyze_stock_data
新增函数:calculate_rsi 和 find_divergence
由于原代码已经较长,我们将尽量保持结构清晰。
注意:在计算RSI时,我们使用标准的14日周期。
RSI计算公式:
RSI = 100 - (100 / (1 + RS))
RS = 平均上涨幅度 / 平均下跌幅度(取绝对值)
我们使用简单移动平均计算平均涨跌幅。
另外,为了寻找低点,我们可以使用局部低点(连续3天中间那天最低)来简化。
具体实现:
1. 计算RSI(14日)
2. 在目标日期前20个交易日内寻找最低点(作为第二个低点),然后在第二个低点之前30个交易日内(但不包括最近20日)寻找第一个低点(要求比第二个低点高)
3. 同时检查这两个低点对应的RSI值:第二个低点的RSI应大于第一个低点的RSI(形成背离)
考虑到效率,我们可以这样寻找:
- 首先确定目标日期前20个交易日(recent_20)这一段时间,找出最低价出现的位置(作为第二个低点候选)
- 然后在这个低点之前30个交易日内再找一个低点(第一个低点),要求第一个低点的价格高于第二个低点,但第一个低点的RSI值低于第二个低点的RSI值。
注意:我们只需要找到一个背离形态即可。
由于时间关系,我们简化处理:在目标日期前60个交易日内寻找两个连续的低点(要求两个低点之间间隔至少10个交易日,且第二个低点在最近20个交易日内)。
修改后的analyze_stock_data函数将增加一个条件:
条件8: 在目标日期前60个交易日内存在底背离形态(价格新低,RSI未新低)
注意:如果原条件不满足,我们直接返回不满足,只有原条件满足后再检查底背离。
由于底背离形态可能不是非常明显,我们放宽条件:允许第一个低点出现在第二个低点前10-60个交易日,第二个低点出现在目标日期前20个交易日内。
实现步骤:
a. 计算RSI
b. 在目标日期前60个交易日的窗口内(即从target_idx-59到target_idx+1,注意我们数据是升序排列,所以索引递增时间递增),寻找两个低点:
- 第一个低点:在区间[0, 40](即前40个交易日)内寻找最低价的最低点(要求是局部低点,即比左右两天都低)
- 第二个低点:在区间[40, 59](即后20个交易日)内寻找最低价的最低点(同样要求局部低点)
c. 比较两个低点的价格和RSI:第二个低点的价格低于第一个低点,但第二个低点的RSI值高于第一个低点。
注意:我们这里将60个交易日分成两段,第一段(前40个交易日)和第二段(后20个交易日),然后在两段中各找一个低点。
但是,这样可能找不到,所以我们也可以不限定具体位置,只要在60个交易日内,存在两个低点满足:时间上第二个低点在第一个低点之后,且价格新低,RSI未新低。
由于遍历寻找低点可能耗时,我们采用以下方法:
1. 提取目标日期前60个交易日(包括目标日期)的数据,注意索引:df.iloc[target_idx-59:target_idx+1] (共60行)
2. 在这个60天的窗口内,寻找所有的局部低点(价格低点)。局部低点定义:当天价格低于前一天和后一天(对于边界点,我们不考虑)。
3. 遍历所有找到的局部低点,比较任意两个点(i和j,其中i<j)是否满足:
- 价格:price[j] < price[i] (第二个低点价格更低)
- RSI: rsi[j] > rsi[i] (第二个低点的RSI更高)
- 时间间隔:j在i之后,且j在窗口的后20个交易日内(即最近20个交易日),i在j之前(在窗口的前40个交易日)
注意:我们只要求存在这样一对低点即可。
由于我们只需要找到一个底背离,所以一旦找到就跳出循环。
考虑到代码复杂度,我们可以在满足原有7个条件后,再检查底背离。
修改后的analyze_stock_data函数结构:
... 原有条件1-7 ...
如果原有条件都满足,则进行底背离检测:
提取60天窗口数据(60个交易日)
计算RSI(在60天窗口之前还需要14日数据,所以我们需要从target_idx-59-14开始取数据,但为了简单,我们可以在整个数据上计算RSI,然后取窗口内的RSI值)
因此,我们在读取数据后,先计算RSI(在整个数据上计算,但注意需要足够长的数据,至少14天以上)
步骤:
1. 在读取数据并排序后,计算RSI(需要14天数据,所以如果数据不足14天,无法计算RSI)
2. 在原有条件1-7都满足后,再检查底背离。
代码修改:
在读取数据并排序后,计算RSI(新增):
if len(df) < 14:
return False, "数据不足14个交易日,无法计算RSI"
然后,在原有条件1-7之后,添加:
# 条件8: 底背离检测
# 提取60个交易日窗口(包括目标日期)
window_size = 60
if target_idx < window_size-1: # 需要60个交易日,索引从target_idx-59到target_idx
return False, "目标日期前60个交易日数据不足"
window_data = df.iloc[target_idx-window_size+1:target_idx+1] # 60个交易日
# 寻找局部低点(价格)
lows = [] # 存储局部低点的索引(在window_data中的相对位置)和价格、RSI值
# 注意:window_data的索引是原始索引,但位置从0到59(共60个)
# 我们遍历window_data中第1到58的位置(避免边界)
for i in range(1, len(window_data)-1):
# 当前交易日
current_low = window_data.iloc[i]['low']
prev_low = window_data.iloc[i-1]['low']
next_low = window_data.iloc[i+1]['low']
# 如果当前是局部低点
if current_low < prev_low and current_low < next_low:
# 记录:在window_data中的位置(i),以及价格和RSI
# 注意:我们要求RSI已经计算过,所以之前计算了RSI列
# 如果window_data中没有RSI列,则报错,所以确保前面计算了RSI
lows.append({
'index': i,
'price': current_low,
'rsi': window_data.iloc[i]['rsi']
})
# 如果没有找到至少两个低点,则不符合
if len(lows) < 2:
return False, "未找到足够的局部低点"
# 检查是否存在底背离:任意两个低点,满足时间顺序,且价格新低,RSI未新低
found_divergence = False
for i in range(len(lows)):
for j in range(i+1, len(lows)):
# j是后面的低点(因为i<j,且我们的lows是按时间顺序存储的,因为window_data是按时间升序)
# 所以j在i之后
# 要求j在窗口的后20个交易日内(即位置>=40,因为窗口60天,前40后20,位置0-59,后20即40-59)
if lows[j]['index'] < 40: # 如果第二个低点不在后20个交易日,跳过
continue
# 检查价格:后面的低点j应该比前面的低点i价格低
if lows[j]['price'] >= lows[i]['price']:
continue
# 检查RSI:后面的低点j的RSI应该比前面的低点i的RSI高
if lows[j]['rsi'] <= lows[i]['rsi']:
continue
# 如果满足,则找到底背离
found_divergence = True
break
if found_divergence:
break
if not found_divergence:
return False, "未检测到底背离形态"
因此,满足所有8个条件才返回True。
注意:由于我们是在原有条件满足后再检查底背离,所以不会影响原有条件的判断。
另外,我们还需要在函数开头计算RSI,下面我们实现calculate_rsi函数。
由于RSI计算需要14日数据,我们在计算前要确保数据长度足够。
实现calculate_rsi函数:
我们将使用标准方法计算RSI(14日)。
代码:
注意:在计算RSI时,我们使用简单移动平均(SMA)来计算平均涨幅和平均跌幅。
步骤:
1. 计算价格变化:delta = close.diff()
2. 分离上涨和下跌:up = delta.clip(lower=0),down = -delta.clip(upper=0)
3. 计算平均涨幅和平均跌幅(使用14日简单移动平均)
4. 计算RS = avg_up / avg_down
5. RSI = 100 - (100/(1+RS))
但是,注意:在计算平均涨幅和跌幅时,第一次是14日的简单平均,后面可以用平滑方法(如指数移动平均),但通常第一次用简单平均,后面用指数移动平均。然而,这里我们使用简单移动平均(与大多数交易软件一致)。
我们使用rolling窗口计算14日平均。
代码实现:
def calculate_rsi(df, window=14):
delta = df['close'].diff()
up = delta.clip(lower=0)
down = (-delta).clip(lower=0) # 取下跌的绝对值
# 计算平均上涨和平均下跌
avg_up = up.rolling(window).mean()
avg_down = down.rolling(window).mean()
# 计算RS
rs = avg_up / avg_down
rsi = 100 - (100/(1+rs))
return rsi
注意:这样计算后,前13个值为NaN(因为需要14个值计算平均),所以我们在计算后,将RSI值存入df['rsi']。
在analyze_stock_data中,在计算涨跌幅后,添加:
# 计算RSI(14日)
if len(df) < 14:
return False, "数据不足14个交易日,无法计算RSI"
df['rsi'] = calculate_rsi(df, 14)
然后,在条件1-7之后,进行条件8(底背离检测)。
注意:由于我们使用了整个数据来计算RSI,所以window_data中会包含'rsi'列。
另外,在计算涨跌幅后,我们移除了第一行(因为diff导致第一行NaN),同样,RSI计算后前13行为NaN,所以我们在计算底背离时,要确保我们取的低点位置不是NaN(即不在前13行内)。在我们的60日窗口内,由于我们要求目标日期前60个交易日,而目标日期前至少20个交易日(原条件要求20日数据),所以60日窗口内不会包含前14行之前的数据(因为target_idx>=19,所以60日窗口从target_idx-59开始,而target_idx-59>=19-59? 可能为负,所以需要检查target_idx>=59,否则无法取60日数据)。
因此,我们在条件8之前,先检查target_idx>=59(即60日窗口要求)。
修改:在条件8的开始,我们检查:
if target_idx < window_size-1: # window_size=60,所以需要target_idx>=59
如果不足,则返回False。
最后,我们还需要注意,在寻找局部低点时,我们使用的是最低价(low),而不是收盘价。但RSI是用收盘价计算的,这是合理的。
现在,我们修改代码。由于代码较长,我们只显示修改后的analyze_stock_data函数,其他函数不变。
注意:由于计算RSI需要,我们修改了函数开头部分,并增加了条件8。
另外,为了减少计算量,我们只在满足前7个条件后才进行底背离检测。
因此,我们调整代码结构:将底背离检测放在最后,并且只有前7个条件都满足后才执行。
修改后的analyze_stock_data函数如下(注意:为了清晰,我们省略了部分原有注释,但保留原有逻辑):</think>我将基于您提供的"回首望月"策略代码,整合之前讨论的替代指标(成交量比率、量能震荡、ATR、价格波动率、MFI、VPT)来增强底部背离检测能力。以下是完整的优化代码:
```python
import os
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
def calculate_rsi(df, window=14):
"""计算相对强弱指数(RSI)"""
delta = df['close'].diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.rolling(window=window, min_periods=1).mean()
avg_loss = loss.rolling(window=window, min_periods=1).mean()
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi
def calculate_mfi(df, period=14):
"""计算资金流动指数(MFI)"""
# 计算典型价格
df['tp'] = (df['high'] + df['low'] + df['close']) / 3
# 计算原始资金流
df['raw_mf'] = df['tp'] * df['volume']
# 计算资金流方向
df['mf_direction'] = np.where(df['tp'] > df['tp'].shift(1), 1, -1)
# 计算正负资金流
df['pos_mf'] = np.where(df['mf_direction'] == 1, df['raw_mf'], 0)
df['neg_mf'] = np.where(df['mf_direction'] == -1, df['raw_mf'], 0)
# 计算14日总和
df['pos_mf_14'] = df['pos_mf'].rolling(window=period).sum()
df['neg_mf_14'] = df['neg_mf'].rolling(window=period).sum()
# 计算资金比率
df['mf_ratio'] = df['pos_mf_14'] / df['neg_mf_14']
# 计算MFI
df['mfi'] = 100 - (100 / (1 + df['mf_ratio']))
return df
def calculate_vpt(df):
"""计算量价趋势指标(VPT)"""
df['pct_change'] = df['close'].pct_change()
df['vpt'] = (df['volume'] * df['pct_change']).cumsum()
return df
def calculate_atr(df, window=14):
"""计算平均真实波幅(ATR)"""
df['prev_close'] = df['close'].shift(1)
df['tr1'] = df['high'] - df['low']
df['tr2'] = abs(df['high'] - df['prev_close'])
df['tr3'] = abs(df['low'] - df['prev_close'])
df['tr'] = df[['tr1', 'tr2', 'tr3']].max(axis=1)
df['atr'] = df['tr'].rolling(window=window).mean()
return df
def volume_oscillator(df, short=5, long=20):
"""计算量能震荡指标"""
df['vol_short'] = df['volume'].rolling(window=short).mean()
df['vol_long'] = df['volume'].rolling(window=long).mean()
df['vol_osc'] = (df['vol_short'] - df['vol_long']) / df['vol_long']
return df
def find_price_divergence(df, target_idx, lookback_period=60):
"""
检测底部背离形态
:param df: 包含技术指标的DataFrame
:param target_idx: 目标日期索引
:param lookback_period: 回溯周期(默认60个交易日)
:return: 是否发现有效背离
"""
if target_idx < lookback_period:
return False, "数据不足检测背离"
# 提取分析窗口数据
window_data = df.iloc[target_idx-lookback_period:target_idx+1]
# 寻找价格低点
price_lows = window_data['low'].rolling(window=5, center=True).min()
min_price_idx = price_lows.idxmin()
# 寻找指标低点 (RSI, MFI, VPT)
rsi_lows = window_data['rsi'].rolling(window=5, center=True).min()
min_rsi_idx = rsi_lows.idxmin()
mfi_lows = window_data['mfi'].rolling(window=5, center=True).min()
min_mfi_idx = mfi_lows.idxmin()
vpt_lows = window_data['vpt'].rolling(window=5, center=True).min()
min_vpt_idx = vpt_lows.idxmin()
# 检测背离条件
divergence_found = False
divergence_reasons = []
# 1. 价格新低但RSI未新低
if min_price_idx > min_rsi_idx and window_data.loc[min_price_idx, 'rsi'] > window_data.loc[min_rsi_idx, 'rsi']:
divergence_found = True
divergence_reasons.append(f"价格-{window_data.loc[min_price_idx, 'date'].date()} RSI-{window_data.loc[min_rsi_idx, 'date'].date()}")
# 2. 价格新低但MFI未新低
if min_price_idx > min_mfi_idx and window_data.loc[min_price_idx, 'mfi'] > window_data.loc[min_mfi_idx, 'mfi']:
divergence_found = True
divergence_reasons.append(f"价格-{window_data.loc[min_price_idx, 'date'].date()} MFI-{window_data.loc[min_mfi_idx, 'date'].date()}")
# 3. 价格新低但VPT未新低
if min_price_idx > min_vpt_idx and window_data.loc[min_price_idx, 'vpt'] > window_data.loc[min_vpt_idx, 'vpt']:
divergence_found = True
divergence_reasons.append(f"价格-{window_data.loc[min_price_idx, 'date'].date()} VPT-{window_data.loc[min_vpt_idx, 'date'].date()}")
# 4. 成交量在价格低点放大 (量价背离)
if min_price_idx > 0:
low_price_vol = window_data.loc[min_price_idx, 'volume']
prev_vol_avg = window_data['volume'].iloc[min_price_idx-10:min_price_idx].mean()
if low_price_vol > 1.5 * prev_vol_avg:
divergence_found = True
divergence_reasons.append(f"量价背离: 低点成交量{low_price_vol:.0f} > 均量{prev_vol_avg:.0f}")
if divergence_found:
return True, " | ".join(divergence_reasons)
return False, "未检测到底部背离形态"
def analyze_stock_data(csv_file, target_date):
"""
分析单个股票CSV文件,检查是否满足筛选条件
target_date: 格式为'2025/7/17'的目标日期
"""
try:
# 读取CSV文件
df = pd.read_csv(csv_file)
# 确保数据按日期排序(从旧到新)
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values('date')
# 检查是否有足够的数据(至少60个交易日用于背离检测)
if len(df) < 60:
return False, "数据不足60个交易日"
# 计算每日涨跌幅(百分比)
df['pct_change'] = (df['close'] / df['close'].shift(1) - 1) * 100
df = df.dropna(subset=['pct_change'])
# 计算技术指标
df['rsi'] = calculate_rsi(df, 14)
df = calculate_mfi(df, 14)
df = calculate_vpt(df)
df = calculate_atr(df, 14)
df = volume_oscillator(df, 5, 20)
# 将目标日期转换为datetime对象
try:
target_dt = datetime.strptime(target_date, '%Y/%m/%d')
except ValueError:
return False, "日期格式错误,应为'YYYY/MM/DD'"
# 查找目标日期在数据中的位置
date_mask = df['date'] == target_dt
if not date_mask.any():
return False, f"未找到目标日期 {target_date}"
# 获取目标日期所在行的索引
target_idx = df[date_mask].index[0]
# 检查索引位置是否足够取数据
if target_idx < 4:
return False, "目标日期前数据不足"
if target_idx < 19:
return False, "目标日期前20日数据不足"
# 获取目标日期前5个交易日(包括目标日期)
recent_5 = df.iloc[target_idx-4:target_idx+1]
# 获取目标日期前20个交易日(包括目标日期)
recent_20 = df.iloc[target_idx-19:target_idx+1]
# 条件1: 最近20日日均涨幅大于-0.3%
avg_daily_gain = recent_20['pct_change'].mean()
if avg_daily_gain <= -0.3:
return False, f"日均涨幅不足(仅{avg_daily_gain:.2f}%)"
# 条件2: 近5日有一个交易日涨幅超过9.95%
pct_changes = recent_5['pct_change'].values
big_gain_days = sum(1 for change in pct_changes if change > 9.95)
if big_gain_days < 1:
return False, f"大涨日不足(仅{big_gain_days}天)"
# 条件3: 近5日有两个交易日上涨
up_days = sum(1 for change in pct_changes if change > 0)
if up_days < 2:
return False, f"上涨日不足(仅{up_days}天)"
# 条件4: 倒数第二天下跌(目标日期的前一天)
if pct_changes[-2] >= 0:
return False, "倒数第二天未下跌"
# 条件5: 最后一天上涨(目标日期当天)
if pct_changes[-1] <= 0:
return False, "最后一天未上涨"
# 条件6: 最后一天不能涨停(涨幅不超过9.95%)
if pct_changes[-1] > 9.95:
return False, "最后一天涨停"
# 条件7: 最后一天的成交量小于前一天
volumes = recent_5['volume'].values
if volumes[-1] >= volumes[-2]:
return False, f"最后一天成交量({volumes[-1]})不小于前一天({volumes[-2]})"
# 新增条件8: 检测底部背离形态
divergence_found, divergence_reason = find_price_divergence(df, target_idx, 60)
if not divergence_found:
return False, f"无底部背离: {divergence_reason}"
# 新增条件9: 波动率指标验证 (ATR和价格波动率)
current_atr = df.loc[target_idx, 'atr']
if current_atr < df['atr'].iloc[target_idx-30:target_idx-10].mean():
return False, f"波动率偏低: ATR={current_atr:.4f}"
# 新增条件10: 量能震荡指标验证
vol_osc = df.loc[target_idx, 'vol_osc']
if vol_osc < 0: # 短期均量低于长期均量
return False, f"量能不足: vol_osc={vol_osc:.2f}"
return True, f"满足所有条件 | 背离: {divergence_reason}"
except Exception as e:
return False, f"处理错误: {str(e)}"
def filter_stocks(folder_path, target_date):
"""
筛选指定文件夹中满足条件的股票CSV文件
target_date: 格式为'2025/7/17'的目标日期
"""
# 获取所有CSV文件
csv_files = [f for f in os.listdir(folder_path)
if f.endswith('.csv') and f.startswith('sh')]
print(f"在 '{folder_path}' 中找到 {len(csv_files)} 个股票文件,开始筛选...")
print(f"目标日期: {target_date}")
print("筛选条件: 回首望月策略 + 底部背离检测")
qualified_stocks = []
total_files = len(csv_files)
for i, filename in enumerate(csv_files):
file_path = os.path.join(folder_path, filename)
stock_code = filename.split('.')[0]
# 分析股票数据
is_qualified, reason = analyze_stock_data(file_path, target_date)
# 显示进度
progress = f"[{i+1}/{total_files}] {stock_code}: "
if is_qualified:
print(progress + "✓ 符合条件 - " + reason)
qualified_stocks.append((stock_code, reason))
else:
# 只显示不符合的原因(可选:注释掉下一行以减少输出)
print(progress + f"× 不符合 ({reason})")
# 输出结果
print("\n筛选完成!")
print(f"符合条件股票数量: {len(qualified_stocks)}/{total_files}")
if qualified_stocks:
print("\n符合条件的股票代码及背离类型:")
for stock, reason in qualified_stocks:
print(f"{stock}: {reason.split('|')[-1].strip()}")
# 创建表格并保存到指定位置
save_table(qualified_stocks, target_date)
return qualified_stocks
def save_table(stock_list, target_date):
"""
将筛选结果保存为表格文件
stock_list: (股票代码, 原因) 元组列表
target_date: 目标日期
"""
# 创建DataFrame
result_df = pd.DataFrame({
"股票代码": [s[0] for s in stock_list],
"筛选日期": target_date,
"背离类型": [s[1].split("|")[-1].strip() for s in stock_list],
"详细原因": [s[1] for s in stock_list]
})
# 设置新的保存路径
save_dir = r"D:\股票量化数据库\回首望月结果\上证"
# 创建目录(如果不存在)
if not os.path.exists(save_dir):
os.makedirs(save_dir)
print(f"已创建目录: {save_dir}")
# 统一文件名基础
date_str = target_date.replace('/', '')
base_filename = f"上证-底部背离策略_{date_str}"
# 保存结果到文本文件
result_txt = os.path.join(save_dir, f"{base_filename}.txt")
with open(result_txt, 'w', encoding='utf-8') as f:
f.write("\n".join([f"{s[0]}: {s[1]}" for s in stock_list]))
print(f"\n文本结果已保存到: {result_txt}")
# 保存为CSV文件
csv_path = os.path.join(save_dir, f"{base_filename}.csv")
result_df.to_csv(csv_path, index=False, encoding='utf-8-sig')
# 保存为Excel文件
excel_path = os.path.join(save_dir, f"{base_filename}.xlsx")
result_df.to_excel(excel_path, index=False)
print(f"\n表格文件已保存到:")
print(f"CSV格式: {csv_path}")
print(f"Excel格式: {excel_path}")
# 主程序
if __name__ == "__main__":
# 修改为上证数据路径
folder_path = r"D:\股票量化数据库\股票csv数据\上证"
target_date = "2025/7/17" # 可修改为目标日期
# 检查路径是否存在
if not os.path.exists(folder_path):
print(f"错误: 路径 '{folder_path}' 不存在!")
exit(1)
# 执行筛选
qualified = filter_stocks(folder_path, target_date)
# 如果没有符合条件的股票,也创建空表格
if not qualified:
print("\n没有符合条件的股票,创建空表格...")
save_table([], target_date)
```
### 代码优化说明
1. **新增技术指标计算函数**:
- `calculate_rsi()`:计算相对强弱指数
- `calculate_mfi()`:计算资金流动指数
- `calculate_vpt()`:计算量价趋势指标
- `calculate_atr()`:计算平均真实波幅
- `volume_oscillator()`:计算量能震荡指标
2. **底部背离检测算法** (`find_price_divergence`函数):
- 使用60个交易日窗口分析价格与指标关系
- 检测三种核心背离:
- 价格新低但RSI未新低
- 价格新低但MFI未新低
- 价格新低但VPT未新低
- 额外检测量价背离(低点成交量显著放大)
3. **增强筛选条件**:
- 条件8:底部背离检测(核心新增功能)
- 条件9:波动率验证(ATR指标)
- 条件10:量能趋势验证(vol_osc指标)
4. **结果输出优化**:
- 详细记录背离类型和原因
- 输出文件包含背离类型分类
- 进度显示中展示具体不符合原因
### 底部背离检测逻辑
```mermaid
graph TD
A[开始检测] --> B[提取60日数据窗口]
B --> C[寻找价格低点]
B --> D[寻找RSI低点]
B --> E[寻找MFI低点]
B --> F[寻找VPT低点]
C --> G{价格新低?}
D --> H{RSI未新低?}
E --> I{MFI未新低?}
F --> J{VPT未新低?}
G -- 是 --> H
G -- 是 --> I
G -- 是 --> J
H -- 是 --> K[RSI背离]
I -- 是 --> L[MFI背离]
J -- 是 --> M[VPT背离]
K --> N[记录背离]
L --> N
M --> N
N --> O[检测量价关系]
O --> P{低点量能放大?}
P -- 是 --> Q[量价背离]
Q --> R[综合判断]
P -- 否 --> R
R --> S[返回检测结果]
```
### 使用说明
1. **数据要求**:
- 至少60个交易日历史数据
- 包含开盘价、最高价、最低价、收盘价、成交量
- 日期按升序排列
2. **参数调整**:
- `lookback_period`:可调整背离检测回溯周期(默认60日)
- RSI/MFI计算窗口:当前为14日,可根据需要调整
- 波动率阈值:在条件9中可调整ATR比较逻辑
3. **结果解读**:
- 输出文件包含具体背离类型(RSI/MFI/VPT/量价)
- 详细原因列说明具体哪项指标出现背离
- 建议优先关注多重背离(同时出现2种以上背离类型)
此增强版策略通过多维度技术指标验证底部形态,大幅提高筛选准确性,特别适合寻找中长期底部反转机会。
阅读全文
相关推荐
















