runfile('D:/股票量化数据库/股票量化数据库/大涨预测训练程序.py', wdir='D:/股票量化数据库/股票量化数据库') 开始加载股票数据... 加载上证数据: 100%|██████████| 5690/5690 [01:27<00:00, 65.34it/s] 加载深证数据: 100%|██████████| 5556/5556 [00:32<00:00, 170.32it/s] 成功加载 11246 只股票数据 开始股票聚类分析... 聚类特征提取: 100%|██████████| 11246/11246 [00:00<00:00, 80161.96it/s] 聚类分析失败: 没有有效的聚类数据
时间: 2025-07-19 11:25:31 浏览: 6
<think>根据错误信息,聚类分析失败是因为没有有效的聚类数据。问题可能出现在聚类特征提取环节。让我们检查聚类特征提取的代码:
在`StockCluster.fit`方法中,我们遍历每只股票,尝试计算`Config.CLUSTER_FEATURES`中每个特征的均值和标准差。但是,在计算这些特征时,我们假设这些特征已经在DataFrame中。然而,在聚类分析阶段,我们还没有进行特征工程(特征工程是在准备训练数据集时进行的),所以这些特征(如'volatility', 'volume_change'等)在原始的股票数据中并不存在。
因此,我们需要在聚类分析之前,先对每只股票进行特征工程,生成聚类所需的特征。
解决方案:
1. 在`StockCluster.fit`方法中,对每只股票的数据先进行特征工程(使用`FeatureEngineer.transform`),然后再计算聚类特征。
2. 由于特征工程需要时间,我们可以考虑在聚类特征提取之前先对数据进行特征计算。
修改`StockCluster.fit`方法:
```python
def fit(self, stock_data):
"""基于股票特征进行聚类"""
print("开始股票聚类分析...")
cluster_data = []
# 特征工程实例
feature_engineer = FeatureEngineer(self.config)
# 提取聚类特征
for stock_code, df in tqdm(stock_data.items(), desc="聚类特征提取"):
if len(df) < 30: # 跳过数据量不足的股票
continue
try:
# 先进行特征工程,生成所需特征
df = feature_engineer.transform(df.copy())
# 计算特征统计量
features = {}
for feat in self.config.CLUSTER_FEATURES:
if feat in df.columns:
features[f"{feat}_mean"] = df[feat].mean()
features[f"{feat}_std"] = df[feat].std()
if features:
features['stock_code'] = stock_code
cluster_data.append(features)
except Exception as e:
print(f"股票 {stock_code} 聚类特征提取失败: {str(e)}")
if not cluster_data:
raise ValueError("没有有效的聚类数据")
cluster_df = pd.DataFrame(cluster_data).set_index('stock_code')
# 数据清洗
cluster_df = cluster_df.replace([np.inf, -np.inf], np.nan)
cluster_df = cluster_df.fillna(cluster_df.mean())
# 标准化和聚类
scaled_features = self.scaler.fit_transform(cluster_df)
self.kmeans.fit(scaled_features)
cluster_df['cluster'] = self.kmeans.labels_
# 保存聚类结果到实例属性,以便后续使用
self.cluster_df = cluster_df
print(f"股票聚类完成,共分为 {self.config.CLUSTER_NUM} 个类别")
return cluster_df
```
注意:这里我们使用了`df.copy()`来避免修改原始数据,因为原始数据在后续训练中还会使用。同时,我们捕获了异常并打印,以便知道哪只股票出了问题。
另外,在`StockCluster.transform`方法中,我们使用了`self.cluster_df`,但在`fit`方法中我们并没有将其保存为属性,所以我们在`fit`方法中增加了`self.cluster_df = cluster_df`。
同时,在`transform`方法中,我们原本是使用`self.cluster_df`,但实际上我们并不需要它,因为我们只需要使用聚类模型(kmeans和scaler)来预测新数据的类别。所以我们可以移除对`self.cluster_df`的依赖。但是,我们已经在`fit`方法中保存了,所以也可以保留。不过,在`transform`方法中我们并没有用到`self.cluster_df`,所以实际上我们可以删除`transform`方法中关于`self.cluster_df`的检查,或者修改为检查`self.kmeans`是否存在。
修改`StockCluster.transform`方法中的检查:
```python
def transform(self, df):
"""为单只股票数据添加聚类特征"""
if not hasattr(self, 'kmeans') or not hasattr(self, 'scaler'):
raise ValueError("聚类模型尚未训练")
# ... 其余代码不变
```
另外,在`StockPredictor.prepare_dataset`方法中,我们在处理每只股票时,先进行了特征工程,然后进行聚类转换(即添加聚类特征)。但是,在聚类转换中,我们调用了`self.cluster_model.transform(df)`,而在这个方法中,我们需要用到`self.cluster_model`的`kmeans`和`scaler`,这些是在`fit`方法中训练的。所以,在调用`prepare_dataset`之前,必须先调用`cluster_model.fit`,我们在主函数中已经这样做了。
因此,我们只需要修改`StockCluster.fit`方法,在聚类特征提取前先进行特征工程。
此外,我们还需要注意,在特征工程中,我们计算了很多技术指标,其中一些可能由于数据长度不足而无法计算(例如计算20日均线需要至少20个数据点)。因此,我们跳过数据量不足30的股票。在特征工程后,我们再次检查数据,确保没有NaN值。
最后,由于我们修改了聚类特征提取过程,需要重新运行程序。
完整修改后的`StockCluster`类:
```python
class StockCluster:
def __init__(self, config):
self.config = config
self.scaler = StandardScaler()
self.kmeans = KMeans(
n_clusters=config.CLUSTER_NUM,
random_state=42,
n_init=10
)
def fit(self, stock_data):
"""基于股票特征进行聚类"""
print("开始股票聚类分析...")
cluster_data = []
# 特征工程实例
feature_engineer = FeatureEngineer(self.config)
# 提取聚类特征
for stock_code, df in tqdm(stock_data.items(), desc="聚类特征提取"):
if len(df) < 30: # 跳过数据量不足的股票
continue
try:
# 先进行特征工程,生成所需特征
df_engineered = feature_engineer.transform(df.copy())
# 计算特征统计量
features = {}
for feat in self.config.CLUSTER_FEATURES:
if feat in df_engineered.columns:
features[f"{feat}_mean"] = df_engineered[feat].mean()
features[f"{feat}_std"] = df_engineered[feat].std()
if features:
features['stock_code'] = stock_code
cluster_data.append(features)
except Exception as e:
print(f"股票 {stock_code} 聚类特征提取失败: {str(e)}")
if not cluster_data:
raise ValueError("没有有效的聚类数据")
cluster_df = pd.DataFrame(cluster_data).set_index('stock_code')
# 数据清洗
cluster_df = cluster_df.replace([np.inf, -np.inf], np.nan)
cluster_df = cluster_df.fillna(cluster_df.mean())
# 标准化和聚类
scaled_features = self.scaler.fit_transform(cluster_df)
self.kmeans.fit(scaled_features)
cluster_df['cluster'] = self.kmeans.labels_
# 保存聚类结果到实例属性,以便后续使用
self.cluster_df = cluster_df
print(f"股票聚类完成,共分为 {self.config.CLUSTER_NUM} 个类别")
return cluster_df
def transform(self, df):
"""为单只股票数据添加聚类特征"""
if not hasattr(self, 'kmeans') or not hasattr(self, 'scaler'):
raise ValueError("聚类模型尚未训练")
# 添加聚类特征统计量
cluster_features = {}
for feat in self.config.CLUSTER_FEATURES:
if feat in df.columns:
cluster_features[f"{feat}_mean"] = df[feat].mean()
cluster_features[f"{feat}_std"] = df[feat].std()
# 预测聚类标签
cluster_features_df = pd.DataFrame([cluster_features])
scaled_features = self.scaler.transform(cluster_features_df)
cluster_label = self.kmeans.predict(scaled_features)[0]
# 添加聚类特征
df['cluster'] = cluster_label
for feat in self.config.CLUSTER_FEATURES:
df[f'cluster_{feat}_mean'] = cluster_features.get(f"{feat}_mean", 0)
df[f'cluster_{feat}_std'] = cluster_features.get(f"{feat}_std", 0)
return df
```
现在,在聚类分析阶段,我们会对每只股票的数据进行特征工程,然后计算聚类特征(均值和标准差)。这样就能确保在聚类分析时能够获取到所需的特征。
注意:由于特征工程需要时间,而且我们处理的是上万只股票,所以聚类分析可能会变慢。但考虑到我们只计算一次,并且是在训练模型之前,所以是可以接受的。
请尝试运行修改后的代码,应该可以解决“没有有效的聚类数据”的问题。
如果问题仍然存在,请检查特征工程后的数据是否包含`Config.CLUSTER_FEATURES`中定义的特征,以及是否有NaN值被正确填充。</think>根据错误信息"聚类分析失败: 没有有效的聚类数据",问题出在聚类特征提取阶段。让我们深入分析并修复这个问题:
## 问题诊断
1. **聚类特征提取失败**:在`StockCluster.fit()`方法中,我们尝试为每只股票计算特征统计量(均值和标准差),但最终没有收集到任何有效数据
2. **根本原因**:
- 聚类特征(如'volatility', 'volume_change'等)在原始数据中不存在
- 特征工程尚未在聚类阶段执行
- 原始数据可能缺少必要的列
## 解决方案
我们需要在聚类分析前执行特征工程,确保聚类特征可用:
### 修改后的StockCluster类
```python
class StockCluster:
def __init__(self, config):
self.config = config
self.scaler = StandardScaler()
self.kmeans = KMeans(
n_clusters=config.CLUSTER_NUM,
random_state=42,
n_init=10
)
# 添加特征工程实例
self.feature_engineer = FeatureEngineer(config)
def fit(self, stock_data):
"""基于股票特征进行聚类"""
print("开始股票聚类分析...")
cluster_data = []
# 特征工程实例
feature_engineer = FeatureEngineer(self.config)
# 提取聚类特征
for stock_code, df in tqdm(stock_data.items(), desc="聚类特征提取"):
if len(df) < 30: # 跳过数据量不足的股票
continue
try:
# 先执行特征工程
df_engineered = self.feature_engineer.transform(df.copy())
# 计算特征统计量
features = {'stock_code': stock_code}
for feat in self.config.CLUSTER_FEATURES:
if feat in df_engineered.columns:
features[f"{feat}_mean"] = df_engineered[feat].mean()
features[f"{feat}_std"] = df_engineered[feat].std()
else:
print(f"警告: 股票 {stock_code} 缺少特征 {feat}")
cluster_data.append(features)
except Exception as e:
print(f"股票 {stock_code} 聚类特征提取失败: {str(e)}")
if not cluster_data:
raise ValueError("没有有效的聚类数据")
cluster_df = pd.DataFrame(cluster_data).set_index('stock_code')
# 数据清洗
cluster_df = cluster_df.replace([np.inf, -np.inf], np.nan)
cluster_df = cluster_df.fillna(cluster_df.mean())
# 标准化和聚类
scaled_features = self.scaler.fit_transform(cluster_df)
self.kmeans.fit(scaled_features)
cluster_df['cluster'] = self.kmeans.labels_
# 保存聚类结果
self.cluster_df = cluster_df
# 打印聚类分布
print("聚类分布统计:")
print(cluster_df['cluster'].value_counts())
print(f"股票聚类完成,共分为 {self.config.CLUSTER_NUM} 个类别")
return cluster_df
```
### 增强数据加载功能
```python
def load_stock_data(sh_dir, sz_dir):
"""从目录加载所有股票数据"""
print("开始加载股票数据...")
stock_data = {}
# 检查目录是否存在
if not os.path.exists(sh_dir):
print(f"错误:上证数据目录不存在 - {sh_dir}")
return stock_data
if not os.path.exists(sz_dir):
print(f"错误:深证数据目录不存在 - {sz_dir}")
return stock_data
# 加载上证数据
sh_files = [f for f in os.listdir(sh_dir) if f.endswith('.csv')]
for file in tqdm(sh_files, desc="加载上证数据"):
stock_code = file.split('.')[0]
try:
file_path = os.path.join(sh_dir, file)
df = pd.read_csv(file_path)
# 检查必要列是否存在
required_columns = ['open', 'high', 'low', 'close', 'volume']
if not all(col in df.columns for col in required_columns):
print(f"跳过上证股票 {file}: 缺少必要列")
continue
# 添加日期列
if 'date' not in df.columns:
if 'trade_date' in df.columns:
df['date'] = pd.to_datetime(df['trade_date'], format='%Y%m%d')
else:
print(f"跳过上证股票 {file}: 缺少日期列")
continue
# 添加市场前缀
stock_data[f"sh{stock_code}"] = df
except Exception as e:
print(f"加载上证股票 {file} 失败: {str(e)}")
# 加载深证数据
sz_files = [f for f in os.listdir(sz_dir) if f.endswith('.csv')]
for file in tqdm(sz_files, desc="加载深证数据"):
stock_code = file.split('.')[0]
try:
file_path = os.path.join(sz_dir, file)
df = pd.read_csv(file_path)
# 检查必要列是否存在
required_columns = ['open', 'high', 'low', 'close', 'volume']
if not all(col in df.columns for col in required_columns):
print(f"跳过深证股票 {file}: 缺少必要列")
continue
# 添加日期列
if 'date' not in df.columns:
if 'trade_date' in df.columns:
df['date'] = pd.to_datetime(df['trade_date'], format='%Y%m%d')
else:
print(f"跳过深证股票 {file}: 缺少日期列")
continue
# 添加市场前缀
stock_data[f"sz{stock_code}"] = df
except Exception as e:
print(f"加载深证股票 {file} 失败: {str(e)}")
print(f"成功加载 {len(stock_data)} 只股票数据")
# 检查数据质量
if stock_data:
sample_key = list(stock_data.keys())[0]
sample_df = stock_data[sample_key]
print("\n数据示例:")
print(f"股票代码: {sample_key}")
print(f"时间范围: {sample_df['date'].min()} 到 {sample_df['date'].max()}")
print(f"数据列: {list(sample_df.columns)}")
print(f"记录数: {len(sample_df)}")
return stock_data
```
### 增强特征工程类
```python
class FeatureEngineer:
def __init__(self, config):
self.config = config
# 定义特征计算顺序
self.feature_steps = [
self._add_basic_features,
self._add_moving_averages,
self._add_oscillators,
self._add_volatility_indicators,
self._add_volume_indicators
]
def transform(self, df):
"""使用TA-Lib计算技术指标"""
# 确保数据按日期排序
if 'date' in df.columns:
df = df.sort_values('date')
# 按顺序执行特征计算
for step in self.feature_steps:
df = step(df)
# 处理NaN和无穷大值
df = df.replace([np.inf, -np.inf], np.nan)
# 先尝试前向填充,然后后向填充
df = df.fillna(method='ffill').fillna(method='bfill')
# 最后填充0
df = df.fillna(0)
return df
def _add_basic_features(self, df):
"""添加基本特征"""
# 价格变化
df['price_change'] = df['close'].pct_change()
# 交易量变化
df['volume_change'] = df['volume'].pct_change().replace([np.inf, -np.inf], np.nan)
# 波动率
df['volatility'] = df['high'] - df['low']
return df
def _add_moving_averages(self, df):
"""添加移动平均线"""
# 简单移动平均
df['MA5'] = ta.SMA(df['close'], timeperiod=5)
df['MA10'] = ta.SMA(df['close'], timeperiod=10)
df['MA20'] = ta.SMA(df['close'], timeperiod=20)
# 指数移动平均
df['EMA12'] = ta.EMA(df['close'], timeperiod=12)
df['EMA26'] = ta.EMA(df['close'], timeperiod=26)
return df
def _add_oscillators(self, df):
"""添加震荡指标"""
# RSI
df['RSI14'] = ta.RSI(df['close'], timeperiod=14)
# MACD
macd, macd_signal, macd_hist = ta.MACD(df['close'], fastperiod=12, slowperiod=26, signalperiod=9)
df['MACD'] = macd
df['MACD_signal'] = macd_signal
df['MACD_hist'] = macd_hist
# 随机指标
slowk, slowd = ta.STOCH(df['high'], df['low'], df['close'], fastk_period=5, slowk_period=3, slowd_period=3)
df['STOCH_k'] = slowk
df['STOCH_d'] = slowd
return df
def _add_volatility_indicators(self, df):
"""添加波动率指标"""
# 布林带
upper, middle, lower = ta.BBANDS(df['close'], timeperiod=20)
df['BB_upper'] = upper
df['BB_middle'] = middle
df['BB_lower'] = lower
df['BB_width'] = (upper - lower) / middle
# ATR
df['ATR14'] = ta.ATR(df['high'], df['low'], df['close'], timeperiod=14)
return df
def _add_volume_indicators(self, df):
"""添加交易量指标"""
# OBV
df['OBV'] = ta.OBV(df['close'], df['volume'])
# ADX
df['ADX14'] = ta.ADX(df['high'], df['low'], df['close'], timeperiod=14)
# CCI
df['CCI14'] = ta.CCI(df['high'], df['low'], df['close'], timeperiod=14)
# MFI
df['MFI'] = ta.MFI(df['high'], df['low'], df['close'], df['volume'], timeperiod=14)
return df
```
## 完整修复方案
```python
import os
import sys
import time
import joblib
import numpy as np
import pandas as pd
import talib as ta
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.model_selection import TimeSeriesSplit, RandomizedSearchCV
from sklearn.metrics import classification_report, roc_auc_score
from sklearn.utils import resample
import psutil
import warnings
warnings.filterwarnings('ignore')
# 配置类
class Config:
MODEL_NAME = "stock_prediction_model"
PREDICT_FEATURES = [
'open', 'high', 'low', 'close', 'volume',
'price_change', 'volume_change', 'volatility',
'MA5', 'MA10', 'MA20', 'EMA12', 'EMA26',
'RSI14', 'MACD', 'MACD_signal', 'MACD_hist',
'BB_upper', 'BB_middle', 'BB_lower', 'BB_width',
'ATR14', 'OBV', 'ADX14', 'CCI14',
'ROC', 'WILLR', 'MFI', 'STOCH_k', 'STOCH_d'
]
CLUSTER_FEATURES = [
'volatility', 'volume_change', 'price_change',
'RSI14', 'MACD_hist', 'BB_width'
]
CLUSTER_NUM = 8
TARGET_DAYS = 5
# 数据路径
SH_DATA_DIR = "D:/股票量化数据库/股票csv数据/上证/"
SZ_DATA_DIR = "D:/股票量化数据库/股票csv数据/深证/"
MAX_MEMORY_MB = 4000 # 最大内存限制4GB
# 内存监控函数
def print_memory_usage():
process = psutil.Process(os.getpid())
mem = process.memory_info().rss / (1024 ** 2) # MB
print(f"当前内存使用: {mem:.2f} MB")
if mem > Config.MAX_MEMORY_MB:
print(f"警告:内存使用超过 {Config.MAX_MEMORY_MB}MB!")
return mem
# 特征工程类 - 增强版
class FeatureEngineer:
def __init__(self, config):
self.config = config
# 定义特征计算顺序
self.feature_steps = [
self._add_basic_features,
self._add_moving_averages,
self._add_oscillators,
self._add_volatility_indicators,
self._add_volume_indicators
]
def transform(self, df):
"""使用TA-Lib计算技术指标"""
# 确保数据按日期排序
if 'date' in df.columns:
df = df.sort_values('date')
# 按顺序执行特征计算
for step in self.feature_steps:
df = step(df)
# 处理NaN和无穷大值
df = df.replace([np.inf, -np.inf], np.nan)
# 先尝试前向填充,然后后向填充
df = df.fillna(method='ffill').fillna(method='bfill')
# 最后填充0
df = df.fillna(0)
return df
def _add_basic_features(self, df):
"""添加基本特征"""
# 价格变化
df['price_change'] = df['close'].pct_change()
# 交易量变化
df['volume_change'] = df['volume'].pct_change().replace([np.inf, -np.inf], np.nan)
# 波动率
df['volatility'] = df['high'] - df['low']
return df
def _add_moving_averages(self, df):
"""添加移动平均线"""
# 简单移动平均
df['MA5'] = ta.SMA(df['close'], timeperiod=5)
df['MA10'] = ta.SMA(df['close'], timeperiod=10)
df['MA20'] = ta.SMA(df['close'], timeperiod=20)
# 指数移动平均
df['EMA12'] = ta.EMA(df['close'], timeperiod=12)
df['EMA26'] = ta.EMA(df['close'], timeperiod=26)
return df
def _add_oscillators(self, df):
"""添加震荡指标"""
# RSI
df['RSI14'] = ta.RSI(df['close'], timeperiod=14)
# MACD
macd, macd_signal, macd_hist = ta.MACD(df['close'], fastperiod=12, slowperiod=26, signalperiod=9)
df['MACD'] = macd
df['MACD_signal'] = macd_signal
df['MACD_hist'] = macd_hist
# 随机指标
slowk, slowd = ta.STOCH(df['high'], df['low'], df['close'], fastk_period=5, slowk_period=3, slowd_period=3)
df['STOCH_k'] = slowk
df['STOCH_d'] = slowd
return df
def _add_volatility_indicators(self, df):
"""添加波动率指标"""
# 布林带
upper, middle, lower = ta.BBANDS(df['close'], timeperiod=20)
df['BB_upper'] = upper
df['BB_middle'] = middle
df['BB_lower'] = lower
df['BB_width'] = (upper - lower) / middle
# ATR
df['ATR14'] = ta.ATR(df['high'], df['low'], df['close'], timeperiod=14)
return df
def _add_volume_indicators(self, df):
"""添加交易量指标"""
# OBV
df['OBV'] = ta.OBV(df['close'], df['volume'])
# ADX
df['ADX14'] = ta.ADX(df['high'], df['low'], df['close'], timeperiod=14)
# CCI
df['CCI14'] = ta.CCI(df['high'], df['low'], df['close'], timeperiod=14)
# MFI
df['MFI'] = ta.MFI(df['high'], df['low'], df['close'], df['volume'], timeperiod=14)
return df
# 聚类模型类 - 修复版
class StockCluster:
def __init__(self, config):
self.config = config
self.scaler = StandardScaler()
self.kmeans = KMeans(
n_clusters=config.CLUSTER_NUM,
random_state=42,
n_init=10
)
# 添加特征工程实例
self.feature_engineer = FeatureEngineer(config)
def fit(self, stock_data):
"""基于股票特征进行聚类"""
print("开始股票聚类分析...")
cluster_data = []
# 提取聚类特征
for stock_code, df in tqdm(stock_data.items(), desc="聚类特征提取"):
if len(df) < 30: # 跳过数据量不足的股票
continue
try:
# 先执行特征工程
df_engineered = self.feature_engineer.transform(df.copy())
# 计算特征统计量
features = {'stock_code': stock_code}
for feat in self.config.CLUSTER_FEATURES:
if feat in df_engineered.columns:
features[f"{feat}_mean"] = df_engineered[feat].mean()
features[f"{feat}_std"] = df_engineered[feat].std()
else:
print(f"警告: 股票 {stock_code} 缺少特征 {feat}")
cluster_data.append(features)
except Exception as e:
print(f"股票 {stock_code} 聚类特征提取失败: {str(e)}")
if not cluster_data:
raise ValueError("没有有效的聚类数据")
cluster_df = pd.DataFrame(cluster_data).set_index('stock_code')
# 数据清洗
cluster_df = cluster_df.replace([np.inf, -np.inf], np.nan)
cluster_df = cluster_df.fillna(cluster_df.mean())
# 标准化和聚类
scaled_features = self.scaler.fit_transform(cluster_df)
self.kmeans.fit(scaled_features)
cluster_df['cluster'] = self.kmeans.labels_
# 保存聚类结果
self.cluster_df = cluster_df
# 打印聚类分布
print("聚类分布统计:")
print(cluster_df['cluster'].value_counts())
print(f"股票聚类完成,共分为 {self.config.CLUSTER_NUM} 个类别")
return cluster_df
def transform(self, df):
"""为单只股票数据添加聚类特征"""
if not hasattr(self, 'kmeans') or not hasattr(self, 'scaler'):
raise ValueError("聚类模型尚未训练")
# 添加聚类特征统计量
cluster_features = {}
for feat in self.config.CLUSTER_FEATURES:
if feat in df.columns:
cluster_features[f"{feat}_mean"] = df[feat].mean()
cluster_features[f"{feat}_std"] = df[feat].std()
# 预测聚类标签
cluster_features_df = pd.DataFrame([cluster_features])
scaled_features = self.scaler.transform(cluster_features_df)
cluster_label = self.kmeans.predict(scaled_features)[0]
# 添加聚类特征
df['cluster'] = cluster_label
for feat in self.config.CLUSTER_FEATURES:
df[f'cluster_{feat}_mean'] = cluster_features.get(f"{feat}_mean", 0)
df[f'cluster_{feat}_std'] = cluster_features.get(f"{feat}_std", 0)
return df
# 目标创建类
class TargetCreator:
def __init__(self, config):
self.config = config
def create_targets(self, df):
"""创建预测目标:未来5日收益率的分类标签"""
# 确保数据按日期排序
if 'date' in df.columns:
df = df.sort_values('date')
# 计算未来收益率
df['future_return'] = df['close'].shift(-self.config.TARGET_DAYS) / df['close'] - 1
# 创建分类目标
conditions = [
(df['future_return'] < -0.05),
(df['future_return'] < -0.02),
(df['future_return'] < 0.02),
(df['future_return'] < 0.05),
(df['future_return'] >= 0.05)
]
choices = [0, 1, 2, 3, 4] # 0:大跌, 1:小跌, 2:震荡, 3:小涨, 4:大涨
df['target'] = np.select(conditions, choices, default=2)
# 删除最后几天无法计算目标的记录
df = df.dropna(subset=['future_return'])
return df
# 股票预测模型类
class StockPredictor:
def __init__(self, config):
self.config = config
self.feature_engineer = FeatureEngineer(config)
self.cluster_model = StockCluster(config)
self.target_creator = TargetCreator(config)
self.model = None
def prepare_dataset(self, stock_data):
"""准备训练数据集"""
print("准备训练数据集...")
all_data = []
stock_codes = list(stock_data.keys())
batch_size = 500
for i in tqdm(range(0, len(stock_codes), batch_size), desc="处理股票数据"):
batch_codes = stock_codes[i:i+batch_size]
batch_data = []
for stock_code in batch_codes:
try:
df = stock_data[stock_code].copy()
# 特征工程
df = self.feature_engineer.transform(df)
# 聚类分析
df = self.cluster_model.transform(df)
# 创建目标
df = self.target_creator.create_targets(df)
# 保留所需特征
batch_data.append(df[self.config.PREDICT_FEATURES + ['cluster'] + ['target']])
except Exception as e:
print(f"处理股票 {stock_code} 失败: {str(e)}")
if batch_data:
batch_df = pd.concat(batch_data, axis=0)
all_data.append(batch_df)
del batch_data, batch_df
print_memory_usage()
if not all_data:
raise ValueError("没有有效数据用于训练")
full_df = pd.concat(all_data, axis=0)
full_df = full_df.dropna()
# 均衡采样(解决类别不平衡)
min_samples = min(full_df['target'].value_counts())
balanced_df = full_df.groupby('target').apply(lambda x: resample(x, replace=False, n_samples=min_samples, random_state=42))
balanced_df = balanced_df.reset_index(drop=True)
X = balanced_df[self.config.PREDICT_FEATURES + ['cluster']].astype(np.float32)
y = balanced_df['target'].astype(np.int8)
print(f"数据集准备完成,样本数: {len(X)}, 特征数: {len(X.columns)}")
print(f"目标类别分布: \n{y.value_counts()}")
return X, y
def train_model(self, X, y):
"""训练预测模型"""
print("开始训练模型...")
print_memory_usage()
# 使用内存效率更高的模型
model = HistGradientBoostingClassifier(
random_state=42,
early_stopping=True,
max_iter=100,
learning_rate=0.1,
max_depth=5,
min_samples_leaf=10
)
# 简化参数搜索
param_dist = {
'learning_rate': [0.05, 0.1],
'max_depth': [3, 5],
'min_samples_leaf': [5, 10]
}
# 减少交叉验证折数
tscv = TimeSeriesSplit(n_splits=3)
search = RandomizedSearchCV(
estimator=model,
param_distributions=param_dist,
n_iter=3, # 减少参数组合数量
cv=tscv,
scoring='roc_auc_ovo',
n_jobs=-1,
verbose=1,
random_state=42
)
try:
search.fit(X, y)
self.model = search.best_estimator_
print(f"模型训练完成,最佳参数: {search.best_params_}")
print(f"最佳模型得分: {search.best_score_:.4f}")
return self.model
except Exception as e:
print(f"训练模型时出错: {str(e)}")
# 回退到简单模型
try:
model.fit(X, y)
self.model = model
print("使用默认参数模型完成训练")
return self.model
except Exception as e2:
print(f"回退模型训练失败: {str(e2)}")
return None
def save_model(self):
"""保存模型和相关信息"""
if not self.model:
raise ValueError("模型尚未训练")
model_path = f"{self.config.MODEL_NAME}.pkl"
joblib.dump({
'model': self.model,
'feature_engineer': self.feature_engineer,
'cluster_model': self.cluster_model,
'config': self.config
}, model_path)
# 保存特征重要性
if hasattr(self.model, 'feature_importances_'):
features = self.config.PREDICT_FEATURES + ['cluster']
importances = self.model.feature_importances_
feat_imp = pd.DataFrame({'Feature': features, 'Importance': importances})
feat_imp = feat_imp.sort_values('Importance', ascending=False)
feat_imp.to_csv(f"{self.config.MODEL_NAME}_feature_importances.csv", index=False)
# 可视化
plt.figure(figsize=(12, 8))
sns.barplot(x='Importance', y='Feature', data=feat_imp.head(20))
plt.title('Feature Importances')
plt.tight_layout()
plt.savefig(f"{self.config.MODEL_NAME}_feature_importances.png")
plt.close()
print(f"模型已保存到: {model_path}")
return model_path
# 数据加载函数 - 增强版
def load_stock_data(sh_dir, sz_dir):
"""从目录加载所有股票数据"""
print("开始加载股票数据...")
stock_data = {}
# 检查目录是否存在
if not os.path.exists(sh_dir):
print(f"错误:上证数据目录不存在 - {sh_dir}")
return stock_data
if not os.path.exists(sz_dir):
print(f"错误:深证数据目录不存在 - {sz_dir}")
return stock_data
# 加载上证数据
sh_files = [f for f in os.listdir(sh_dir) if f.endswith('.csv')]
for file in tqdm(sh_files, desc="加载上证数据"):
stock_code = file.split('.')[0]
try:
file_path = os.path.join(sh_dir, file)
df = pd.read_csv(file_path)
# 检查必要列是否存在
required_columns = ['open', 'high', 'low', 'close', 'volume']
if not all(col in df.columns for col in required_columns):
print(f"跳过上证股票 {file}: 缺少必要列")
continue
# 添加日期列
if 'date' not in df.columns:
if 'trade_date' in df.columns:
df['date'] = pd.to_datetime(df['trade_date'], format='%Y%m%d')
elif 'datetime' in df.columns:
df['date'] = pd.to_datetime(df['datetime'])
else:
print(f"跳过上证股票 {file}: 缺少日期列")
continue
# 添加市场前缀
stock_data[f"sh{stock_code}"] = df
except Exception as e:
print(f"加载上证股票 {file} 失败: {str(e)}")
# 加载深证数据
sz_files = [f for f in os.listdir(sz_dir) if f.endswith('.csv')]
for file in tqdm(sz_files, desc="加载深证数据"):
stock_code = file.split('.')[0]
try:
file_path = os.path.join(sz_dir, file)
df = pd.read_csv(file_path)
# 检查必要列是否存在
required_columns = ['open', 'high', 'low', 'close', 'volume']
if not all(col in df.columns for col in required_columns):
print(f"跳过深证股票 {file}: 缺少必要列")
continue
# 添加日期列
if 'date' not in df.columns:
if 'trade_date' in df.columns:
df['date'] = pd.to_datetime(df['trade_date'], format='%Y%m%d')
elif 'datetime' in df.columns:
df['date'] = pd.to_datetime(df['datetime'])
else:
print(f"跳过深证股票 {file}: 缺少日期列")
continue
# 添加市场前缀
stock_data[f"sz{stock_code}"] = df
except Exception as e:
print(f"加载深证股票 {file} 失败: {str(e)}")
print(f"成功加载 {len(stock_data)} 只股票数据")
# 检查数据质量
if stock_data:
sample_key = list(stock_data.keys())[0]
sample_df = stock_data[sample_key]
print("\n数据示例:")
print(f"股票代码: {sample_key}")
print(f"时间范围: {sample_df['date'].min()} 到 {sample_df['date'].max()}")
print(f"数据列: {list(sample_df.columns)}")
print(f"记录数: {len(sample_df)}")
return stock_data
# 主函数
def main():
start_time = time.time()
config = Config()
# 加载数据
stock_data = load_stock_data(config.SH_DATA_DIR, config.SZ_DATA_DIR)
if not stock_data:
print("错误:没有加载到任何股票数据,程序终止")
return
# 初始化预测器
predictor = StockPredictor(config)
# 聚类分析
try:
print("开始聚类分析...")
cluster_df = predictor.cluster_model.fit(stock_data)
# 保存聚类结果
cluster_df.to_csv(f"{config.MODEL_NAME}_clusters.csv")
print(f"聚类结果已保存到: {config.MODEL_NAME}_clusters.csv")
except Exception as e:
print(f"聚类分析失败: {str(e)}")
return
# 准备数据集
try:
print("准备训练数据集...")
X, y = predictor.prepare_dataset(stock_data)
except Exception as e:
print(f"准备数据集失败: {str(e)}")
return
# 训练模型
model = predictor.train_model(X, y)
if not model:
print("模型训练失败")
return
# 保存模型
try:
model_path = predictor.save_model()
except Exception as e:
print(f"保存模型失败: {str(e)}")
return
# 输出结果
print(f"模型训练完成,耗时: {time.time()-start_time:.2f}秒")
print(f"模型文件: {model_path}")
print(f"特征重要性文件: {config.MODEL_NAME}_feature_importances.csv")
print(f"特征重要性图: {config.MODEL_NAME}_feature_importances.png")
print(f"聚类结果文件: {config.MODEL_NAME}_clusters.csv")
if __name__ == "__main__":
main()
```
## 主要修复内容
1. **聚类特征工程前置**:
- 在聚类分析前执行特征工程,确保特征可用
- 添加详细的错误日志和警告信息
- 打印聚类分布统计
2. **数据加载增强**:
- 添加必要列检查(open, high, low, close, volume)
- 支持多种日期列格式(trade_date, datetime)
- 添加数据质量检查,打印样本数据信息
3. **特征工程重构**:
- 分步骤计算特征,提高可维护性
- 优化NaN值处理流程(前向填充->后向填充->0填充)
- 添加数据排序,确保时间序列正确
4. **错误处理增强**:
- 所有关键步骤添加详细错误日志
- 添加回退机制(当参数搜索失败时使用默认参数)
- 保存聚类结果到CSV文件
5. **数据质量检查**:
- 打印目标类别分布
- 打印聚类分布统计
- 打印样本数据信息
## 使用说明
1. **数据要求**:
- CSV文件必须包含以下列:open, high, low, close, volume
- 必须包含日期列(date, trade_date或datetime)
2. **运行程序**:
```bash
python 大涨预测训练程序.py
```
3. **输出文件**:
- `stock_prediction_model.pkl` - 完整模型
- `stock_prediction_model_feature_importances.csv` - 特征重要性
- `stock_prediction_model_feature_importances.png` - 特征重要性图
- `stock_prediction_model_clusters.csv` - 聚类结果
4. **监控输出**:
- 程序会显示详细进度和内存使用
- 关键步骤会打印统计信息
- 错误会明确提示原因
此修复方案直接解决了聚类特征提取失败的问题,通过将特征工程前置到聚类阶段,确保所有必要的技术指标在聚类分析前已经计算完成。同时增强了数据质量检查和错误处理机制,使程序更加健壮。
阅读全文
相关推荐


















